From a493ca44d99c647364c8c1ebd0e6f786da28ff55 Mon Sep 17 00:00:00 2001 From: dongeforever Date: Wed, 24 Nov 2021 16:32:49 +0800 Subject: [PATCH] Add the allocator test --- .../processor/AdminBrokerProcessor.java | 3 - .../body/TopicConfigSerializeWrapper.java | 5 +- .../statictopic/TopicQueueMappingUtils.java | 5 +- .../statictopic/TopicMappingUtilsTest.java | 61 +++++++++++++++++++ .../rocketmq/test/smoke/StaticTopicIT.java | 1 - 5 files changed, 66 insertions(+), 9 deletions(-) create mode 100644 common/src/test/java/org/apache/rocketmq/common/statictopic/TopicMappingUtilsTest.java diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 6fb95515..ab62415d 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -338,9 +338,6 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements try { this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig); - System.out.println("Broker body:" + new String(request.getBody())); - System.out.println("Broker bodetaildy:" + topicQueueMappingDetail.toJson()); - this.brokerController.getTopicQueueMappingManager().updateTopicQueueMapping(topicQueueMappingDetail, force); this.brokerController.registerIncrementBrokerData(topicConfig, this.brokerController.getTopicConfigManager().getDataVersion()); diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigSerializeWrapper.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigSerializeWrapper.java index a71c50a6..ce123021 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigSerializeWrapper.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigSerializeWrapper.java @@ -17,13 +17,12 @@ package org.apache.rocketmq.common.protocol.body; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.common.DataVersion; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.remoting.protocol.RemotingSerializable; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - public class TopicConfigSerializeWrapper extends RemotingSerializable { private ConcurrentMap topicConfigTable = new ConcurrentHashMap(); diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java index 6e6b15ba..5f843a99 100644 --- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java +++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java @@ -49,11 +49,12 @@ public class TopicQueueMappingUtils { } private void freshState() { - int minNum = -1; + int minNum = Integer.MAX_VALUE; for (Map.Entry entry : brokerNumMap.entrySet()) { - if (entry.getValue() > minNum) { + if (entry.getValue() < minNum) { leastBrokers.clear(); leastBrokers.add(entry.getKey()); + minNum = entry.getValue(); } else if (entry.getValue() == minNum) { leastBrokers.add(entry.getKey()); } diff --git a/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicMappingUtilsTest.java b/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicMappingUtilsTest.java new file mode 100644 index 00000000..6a61de5b --- /dev/null +++ b/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicMappingUtilsTest.java @@ -0,0 +1,61 @@ +package org.apache.rocketmq.common.statictopic; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +public class TopicMappingUtilsTest { + + + private Map buildBrokerNumMap(int num) { + Map map = new HashMap(); + for (int i = 0; i < num; i++) { + map.put("broker" + i, 0); + } + return map; + } + + private void testIdToBroker(Map idToBroker, Map brokerNumMap) { + Map brokerNumOther = new HashMap(); + for (int i = 0; i < idToBroker.size(); i++) { + Assert.assertTrue(idToBroker.containsKey(i)); + String broker = idToBroker.get(i); + if (brokerNumOther.containsKey(broker)) { + brokerNumOther.put(broker, brokerNumOther.get(broker) + 1); + } else { + brokerNumOther.put(broker, 1); + } + } + Assert.assertEquals(brokerNumMap.size(), brokerNumOther.size()); + for (Map.Entry entry: brokerNumOther.entrySet()) { + Assert.assertEquals(entry.getValue(), brokerNumMap.get(entry.getKey())); + } + } + + @Test + public void testAllocator() { + //stability + for (int i = 0; i < 10; i++) { + int num = 3; + Map brokerNumMap = buildBrokerNumMap(num); + TopicQueueMappingUtils.MappingAllocator allocator = TopicQueueMappingUtils.buildMappingAllocator(new HashMap(), brokerNumMap); + allocator.upToNum(num * 2); + for (Map.Entry entry: allocator.getBrokerNumMap().entrySet()) { + Assert.assertEquals(2L, entry.getValue().longValue()); + } + Assert.assertEquals(num * 2, allocator.getIdToBroker().size()); + testIdToBroker(allocator.idToBroker, allocator.getBrokerNumMap()); + + allocator.upToNum(num * 3 - 1); + + for (Map.Entry entry: allocator.getBrokerNumMap().entrySet()) { + Assert.assertTrue(entry.getValue() >= 2); + Assert.assertTrue(entry.getValue() <= 3); + } + Assert.assertEquals(num * 3 - 1, allocator.getIdToBroker().size()); + testIdToBroker(allocator.idToBroker, allocator.getBrokerNumMap()); + } + } +} diff --git a/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java b/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java index dac33d0e..bb21dc29 100644 --- a/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java @@ -57,7 +57,6 @@ public class StaticTopicIT extends BaseConf { String broker = entry.getKey(); String addr = clientMetadata.findMasterBrokerAddr(broker); TopicConfigAndQueueMapping configMapping = entry.getValue(); - System.out.println(configMapping.getMappingDetail().toJson()); defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), false); } } -- GitLab