diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java index 30db2094bef83d1060523380ad0f2b01cd2e7007..86a6cec11c52f44706bd1af7227ffacf77b5e05d 100644 --- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java +++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java @@ -39,6 +39,8 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo { super(topic, totalQueues, bname, epoch); } + + public static boolean putMappingInfo(TopicQueueMappingDetail mappingDetail, Integer globalId, List mappingInfo) { if (mappingInfo.isEmpty()) { return true; diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java index ea6bc611784417d1c94efc9b3524daaf9205f547..ef565a0bed9a362edf9e7abc1c26d017dc8e25dc 100644 --- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java +++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java @@ -39,12 +39,15 @@ public class TopicQueueMappingUtils { public static class MappingAllocator { Map brokerNumMap = new HashMap(); Map idToBroker = new HashMap(); + //used for remapping + Map brokerNumMapBeforeRemapping = null; int currentIndex = 0; Random random = new Random(); List leastBrokers = new ArrayList(); - private MappingAllocator(Map idToBroker, Map brokerNumMap) { + private MappingAllocator(Map idToBroker, Map brokerNumMap, Map brokerNumMapBeforeRemapping) { this.idToBroker.putAll(idToBroker); this.brokerNumMap.putAll(brokerNumMap); + this.brokerNumMapBeforeRemapping = brokerNumMapBeforeRemapping; } private void freshState() { @@ -58,7 +61,27 @@ public class TopicQueueMappingUtils { leastBrokers.add(entry.getKey()); } } - currentIndex = random.nextInt(leastBrokers.size()); + //reduce the remapping + if (brokerNumMapBeforeRemapping != null + && !brokerNumMapBeforeRemapping.isEmpty()) { + Collections.sort(leastBrokers, new Comparator() { + @Override + public int compare(String o1, String o2) { + int i1 = 0, i2 = 0; + if (brokerNumMapBeforeRemapping.containsKey(o1)) { + i1 = brokerNumMapBeforeRemapping.get(o1); + } + if (brokerNumMapBeforeRemapping.containsKey(o2)) { + i2 = brokerNumMapBeforeRemapping.get(o2); + } + return i1 - i2; + } + }); + } else { + //reduce the imbalance + Collections.shuffle(leastBrokers); + } + currentIndex = leastBrokers.size() - 1; } private String nextBroker() { if (leastBrokers.isEmpty()) { @@ -93,8 +116,9 @@ public class TopicQueueMappingUtils { } } - public static MappingAllocator buildMappingAllocator(Map idToBroker, Map brokerNumMap) { - return new MappingAllocator(idToBroker, brokerNumMap); + + public static MappingAllocator buildMappingAllocator(Map idToBroker, Map brokerNumMap, Map brokerNumMapBeforeRemapping) { + return new MappingAllocator(idToBroker, brokerNumMap, brokerNumMapBeforeRemapping); } public static Map.Entry findMaxEpochAndQueueNum(List mappingDetailList) { @@ -367,16 +391,28 @@ public class TopicQueueMappingUtils { } } - public static void checkIfTargetBrokersComplete(Set targetBrokers, Map brokerConfigMap) { + public static void checkTargetBrokersComplete(Set targetBrokers, Map brokerConfigMap) { for (String broker : brokerConfigMap.keySet()) { + if (brokerConfigMap.get(broker).getMappingDetail().getHostedQueues().isEmpty()) { + continue; + } if (!targetBrokers.contains(broker)) { throw new RuntimeException("The existed broker " + broker + " dose not in target brokers "); } } } - public static TopicRemappingDetailWrapper createTopicConfigMapping(String topic, int queueNum, Set targetBrokers, Map brokerConfigMap) { - checkIfTargetBrokersComplete(targetBrokers, brokerConfigMap); + public static void checkNonTargetBrokers(Set targetBrokers, Set nonTargetBrokers) { + for (String broker : nonTargetBrokers) { + if (targetBrokers.contains(broker)) { + throw new RuntimeException("The non-target broker exist in target broker"); + } + } + } + + public static TopicRemappingDetailWrapper createTopicConfigMapping(String topic, int queueNum, Set targetBrokers, Set nonTargetBrokers, Map brokerConfigMap) { + checkTargetBrokersComplete(targetBrokers, brokerConfigMap); + checkNonTargetBrokers(targetBrokers, nonTargetBrokers); Map globalIdMap = new HashMap(); Map.Entry maxEpochAndNum = new AbstractMap.SimpleImmutableEntry(System.currentTimeMillis(), queueNum); if (!brokerConfigMap.isEmpty()) { @@ -408,7 +444,7 @@ public class TopicQueueMappingUtils { brokerNumMap.put(leaderbroker, brokerNumMap.get(leaderbroker) + 1); } } - TopicQueueMappingUtils.MappingAllocator allocator = TopicQueueMappingUtils.buildMappingAllocator(oldIdToBroker, brokerNumMap); + TopicQueueMappingUtils.MappingAllocator allocator = TopicQueueMappingUtils.buildMappingAllocator(oldIdToBroker, brokerNumMap, null); allocator.upToNum(queueNum); Map newIdToBroker = allocator.getIdToBroker(); @@ -436,6 +472,12 @@ public class TopicQueueMappingUtils { TopicQueueMappingDetail.putMappingInfo(configMapping.getMappingDetail(), queueId, new ArrayList(Collections.singletonList(mappingItem))); } + //set the non target brokers + for (String broker : nonTargetBrokers) { + if (!brokerConfigMap.containsKey(broker)) { + brokerConfigMap.put(broker, new TopicConfigAndQueueMapping(new TopicConfig(topic, 0, 0), new TopicQueueMappingDetail(topic, queueNum, broker, newEpoch))); + } + } // set the topic config for (Map.Entry entry : brokerConfigMap.entrySet()) { TopicConfigAndQueueMapping configMapping = entry.getValue(); @@ -466,10 +508,20 @@ public class TopicQueueMappingUtils { for (String broker: targetBrokers) { brokerNumMap.put(broker, 0); } - TopicQueueMappingUtils.MappingAllocator allocator = TopicQueueMappingUtils.buildMappingAllocator(new HashMap(), brokerNumMap); + Map brokerNumMapBeforeRemapping = new HashMap(); + for (TopicQueueMappingOne mappingOne: globalIdMap.values()) { + if(brokerNumMapBeforeRemapping.containsKey(mappingOne.bname)) { + brokerNumMapBeforeRemapping.put(mappingOne.bname, brokerNumMapBeforeRemapping.get(mappingOne.bname) + 1); + } else { + brokerNumMapBeforeRemapping.put(mappingOne.bname, 1); + } + } + + TopicQueueMappingUtils.MappingAllocator allocator = TopicQueueMappingUtils.buildMappingAllocator(new HashMap(), brokerNumMap, brokerNumMapBeforeRemapping); allocator.upToNum(maxNum); Map expectedBrokerNumMap = allocator.getBrokerNumMap(); Queue waitAssignQueues = new ArrayDeque(); + //cannot directly use the idBrokerMap from allocator, for the number of globalId maybe not in the natural order Map expectedIdToBroker = new HashMap(); //the following logic will make sure that, for one broker, either "map in" or "map out" //It can't both, map in some queues but also map out some queues. diff --git a/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicMappingUtilsTest.java b/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicMappingUtilsTest.java index 1b0ad54c657b471cb793b358445074d44e133efe..bd4b13ca71ca7464472f1c85eaca5a357b5fe1d5 100644 --- a/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicMappingUtilsTest.java +++ b/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicMappingUtilsTest.java @@ -10,6 +10,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.Set; public class TopicMappingUtilsTest { @@ -35,6 +36,18 @@ public class TopicMappingUtilsTest { return map; } + private Map buildBrokerNumMap(int num, int queues) { + Map map = new HashMap(); + int random = new Random().nextInt(num); + for (int i = 0; i < num; i++) { + map.put("broker" + i, queues); + if (i == random) { + map.put("broker" + i, queues + 1); + } + } + return map; + } + private void testIdToBroker(Map idToBroker, Map brokerNumMap) { Map brokerNumOther = new HashMap(); for (int i = 0; i < idToBroker.size(); i++) { @@ -58,7 +71,7 @@ public class TopicMappingUtilsTest { for (int i = 0; i < 10; i++) { int num = 3; Map brokerNumMap = buildBrokerNumMap(num); - TopicQueueMappingUtils.MappingAllocator allocator = TopicQueueMappingUtils.buildMappingAllocator(new HashMap(), brokerNumMap); + TopicQueueMappingUtils.MappingAllocator allocator = TopicQueueMappingUtils.buildMappingAllocator(new HashMap(), brokerNumMap, null); allocator.upToNum(num * 2); for (Map.Entry entry: allocator.getBrokerNumMap().entrySet()) { Assert.assertEquals(2L, entry.getValue().longValue()); @@ -77,6 +90,18 @@ public class TopicMappingUtilsTest { } } + @Test + public void testRemappingAllocator() { + for (int i = 0; i < 10; i++) { + int num = (i + 2) * 2; + Map brokerNumMap = buildBrokerNumMap(num); + Map brokerNumMapBeforeRemapping = buildBrokerNumMap(num, num); + TopicQueueMappingUtils.MappingAllocator allocator = TopicQueueMappingUtils.buildMappingAllocator(new HashMap(), brokerNumMap, brokerNumMapBeforeRemapping); + allocator.upToNum(num * num + 1); + Assert.assertEquals(brokerNumMapBeforeRemapping, allocator.getBrokerNumMap()); + } + } + @Test(expected = RuntimeException.class) public void testTargetBrokersComplete() { @@ -86,8 +111,10 @@ public class TopicMappingUtilsTest { Set targetBrokers = new HashSet(); targetBrokers.add(broker1); Map brokerConfigMap = new HashMap(); - brokerConfigMap.put(broker2, new TopicConfigAndQueueMapping(new TopicConfig(topic, 0, 0), new TopicQueueMappingDetail(topic, 0, broker2, 0))); - TopicQueueMappingUtils.checkIfTargetBrokersComplete(targetBrokers, brokerConfigMap); + TopicQueueMappingDetail mappingDetail = new TopicQueueMappingDetail(topic, 0, broker2, 0); + mappingDetail.getHostedQueues().put(1, new ArrayList()); + brokerConfigMap.put(broker2, new TopicConfigAndQueueMapping(new TopicConfig(topic, 0, 0), mappingDetail)); + TopicQueueMappingUtils.checkTargetBrokersComplete(targetBrokers, brokerConfigMap); } @@ -99,28 +126,36 @@ public class TopicMappingUtilsTest { Map brokerConfigMap = new HashMap(); for (int i = 1; i < 10; i++) { Set targetBrokers = buildTargetBrokers(2 * i); + Set nonTargetBrokers = buildTargetBrokers(2 * i, "test"); queueNum = 10 * i; - TopicRemappingDetailWrapper wrapper = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, brokerConfigMap); + TopicRemappingDetailWrapper wrapper = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, nonTargetBrokers, brokerConfigMap); Assert.assertEquals(wrapper.getBrokerConfigMap(), brokerConfigMap); - Assert.assertEquals(2 * i, brokerConfigMap.size()); + Assert.assertEquals(4 * i, brokerConfigMap.size()); //do the check manually - TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, brokerConfigMap); + Map.Entry maxEpochAndNum = TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, brokerConfigMap); + Assert.assertEquals(queueNum, maxEpochAndNum.getValue().longValue()); Map globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true); TopicQueueMappingUtils.checkIfReusePhysicalQueue(globalIdMap.values()); TopicQueueMappingUtils.checkPhysicalQueueConsistence(brokerConfigMap); for (Map.Entry entry : brokerConfigMap.entrySet()) { TopicConfigAndQueueMapping configMapping = entry.getValue(); - Assert.assertEquals(5, configMapping.getReadQueueNums()); - Assert.assertEquals(5, configMapping.getWriteQueueNums()); - Assert.assertTrue(configMapping.getMappingDetail().epoch > System.currentTimeMillis()); - for (List items: configMapping.getMappingDetail().getHostedQueues().values()) { - for (LogicQueueMappingItem item: items) { - Assert.assertEquals(0, item.getStartOffset()); - Assert.assertEquals(0, item.getLogicOffset()); - TopicConfig topicConfig = brokerConfigMap.get(item.getBname()); - Assert.assertTrue(item.getQueueId() < topicConfig.getWriteQueueNums()); + if (nonTargetBrokers.contains(configMapping.getMappingDetail().bname)) { + Assert.assertEquals(0, configMapping.getReadQueueNums()); + Assert.assertEquals(0, configMapping.getWriteQueueNums()); + Assert.assertEquals(0, configMapping.getMappingDetail().getHostedQueues().size()); + } else { + Assert.assertEquals(5, configMapping.getReadQueueNums()); + Assert.assertEquals(5, configMapping.getWriteQueueNums()); + Assert.assertTrue(configMapping.getMappingDetail().epoch > System.currentTimeMillis()); + for (List items: configMapping.getMappingDetail().getHostedQueues().values()) { + for (LogicQueueMappingItem item: items) { + Assert.assertEquals(0, item.getStartOffset()); + Assert.assertEquals(0, item.getLogicOffset()); + TopicConfig topicConfig = brokerConfigMap.get(item.getBname()); + Assert.assertTrue(item.getQueueId() < topicConfig.getWriteQueueNums()); + } } } } @@ -133,7 +168,7 @@ public class TopicMappingUtilsTest { int queueNum = 7; Map brokerConfigMap = new HashMap(); Set originalBrokers = buildTargetBrokers(2); - TopicRemappingDetailWrapper wrapper = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, originalBrokers, brokerConfigMap); + TopicRemappingDetailWrapper wrapper = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, originalBrokers, new HashSet(), brokerConfigMap); Assert.assertEquals(wrapper.getBrokerConfigMap(), brokerConfigMap); Assert.assertEquals(2, brokerConfigMap.size()); @@ -171,6 +206,27 @@ public class TopicMappingUtilsTest { } } + @Test + public void testRemappingStaticTopicStability() { + String topic = "static"; + int queueNum = 7; + Map brokerConfigMap = new HashMap(); + Set originalBrokers = buildTargetBrokers(2); + { + TopicRemappingDetailWrapper wrapper = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, originalBrokers, new HashSet(), brokerConfigMap); + Assert.assertEquals(wrapper.getBrokerConfigMap(), brokerConfigMap); + Assert.assertEquals(2, brokerConfigMap.size()); + } + for (int i = 0; i < 10; i++) { + TopicRemappingDetailWrapper wrapper = TopicQueueMappingUtils.remappingStaticTopic(topic, brokerConfigMap, originalBrokers); + Assert.assertEquals(wrapper.getBrokerConfigMap(), brokerConfigMap); + Assert.assertEquals(2, brokerConfigMap.size()); + Assert.assertTrue(wrapper.getBrokerToMapIn().isEmpty()); + Assert.assertTrue(wrapper.getBrokerToMapOut().isEmpty()); + } + } + + @Test public void testUtilsCheck() { @@ -178,7 +234,7 @@ public class TopicMappingUtilsTest { int queueNum = 10; Map brokerConfigMap = new HashMap(); Set targetBrokers = buildTargetBrokers(2); - TopicRemappingDetailWrapper wrapper = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, brokerConfigMap); + TopicRemappingDetailWrapper wrapper = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, new HashSet(), brokerConfigMap); Assert.assertEquals(wrapper.getBrokerConfigMap(), brokerConfigMap); Assert.assertEquals(2, brokerConfigMap.size()); TopicConfigAndQueueMapping configMapping = brokerConfigMap.values().iterator().next(); diff --git a/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java b/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java index a5656f31283de8850ecd4d5473aa34292e284d4e..a4fa864b476549c76cb5e678df1cfdf89068b194 100644 --- a/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java @@ -16,6 +16,7 @@ import org.junit.FixMethodOrder; import org.junit.Test; import java.util.ArrayList; +import java.util.HashSet; import java.util.Map; import java.util.Set; @@ -45,7 +46,7 @@ public class StaticTopicIT extends BaseConf { public Map createStaticTopic(String topic, int queueNum, Set targetBrokers) throws Exception { Map brokerConfigMap = defaultMQAdminExt.examineTopicConfigAll(clientMetadata, topic); Assert.assertTrue(brokerConfigMap.isEmpty()); - TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, brokerConfigMap); + TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, new HashSet<>(), brokerConfigMap); Assert.assertEquals(2, brokerConfigMap.size()); //If some succeed, and others fail, it will cause inconsistent data for (Map.Entry entry : brokerConfigMap.entrySet()) { diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java index 85e2cc5830ca2f3a09c33253a53053bcdd9dbffa..d6a680a27825268269f044c8ef45774b9b827823 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java @@ -217,7 +217,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand { } //calculate the new data - TopicRemappingDetailWrapper newWrapper = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, brokerConfigMap); + TopicRemappingDetailWrapper newWrapper = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, new HashSet<>(), brokerConfigMap); { String newMappingDataFile = TopicQueueMappingUtils.writeToTemp(newWrapper, true);