diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 6fb9551537be60e895e96cd35f7392db308ac123..ab62415d7f543e3adaaabf5bf4f90abf083dc533 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -338,9 +338,6 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements try { this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig); - System.out.println("Broker body:" + new String(request.getBody())); - System.out.println("Broker bodetaildy:" + topicQueueMappingDetail.toJson()); - this.brokerController.getTopicQueueMappingManager().updateTopicQueueMapping(topicQueueMappingDetail, force); this.brokerController.registerIncrementBrokerData(topicConfig, this.brokerController.getTopicConfigManager().getDataVersion()); diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigSerializeWrapper.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigSerializeWrapper.java index a71c50a68595898685fcb849f830a5a9af47f009..ce123021d459b69b8f26ccf43f2b22ebe6216082 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigSerializeWrapper.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigSerializeWrapper.java @@ -17,13 +17,12 @@ package org.apache.rocketmq.common.protocol.body; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.common.DataVersion; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.remoting.protocol.RemotingSerializable; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - public class TopicConfigSerializeWrapper extends RemotingSerializable { private ConcurrentMap topicConfigTable = new ConcurrentHashMap(); diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java index 6e6b15ba2261d54e43216087fb80d048e59614f7..5f843a9968b0185eb0777ccbfea13e5706a5c404 100644 --- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java +++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java @@ -49,11 +49,12 @@ public class TopicQueueMappingUtils { } private void freshState() { - int minNum = -1; + int minNum = Integer.MAX_VALUE; for (Map.Entry entry : brokerNumMap.entrySet()) { - if (entry.getValue() > minNum) { + if (entry.getValue() < minNum) { leastBrokers.clear(); leastBrokers.add(entry.getKey()); + minNum = entry.getValue(); } else if (entry.getValue() == minNum) { leastBrokers.add(entry.getKey()); } diff --git a/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicMappingUtilsTest.java b/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicMappingUtilsTest.java new file mode 100644 index 0000000000000000000000000000000000000000..6a61de5bcd95f4c4833942357890bfdc30571a24 --- /dev/null +++ b/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicMappingUtilsTest.java @@ -0,0 +1,61 @@ +package org.apache.rocketmq.common.statictopic; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +public class TopicMappingUtilsTest { + + + private Map buildBrokerNumMap(int num) { + Map map = new HashMap(); + for (int i = 0; i < num; i++) { + map.put("broker" + i, 0); + } + return map; + } + + private void testIdToBroker(Map idToBroker, Map brokerNumMap) { + Map brokerNumOther = new HashMap(); + for (int i = 0; i < idToBroker.size(); i++) { + Assert.assertTrue(idToBroker.containsKey(i)); + String broker = idToBroker.get(i); + if (brokerNumOther.containsKey(broker)) { + brokerNumOther.put(broker, brokerNumOther.get(broker) + 1); + } else { + brokerNumOther.put(broker, 1); + } + } + Assert.assertEquals(brokerNumMap.size(), brokerNumOther.size()); + for (Map.Entry entry: brokerNumOther.entrySet()) { + Assert.assertEquals(entry.getValue(), brokerNumMap.get(entry.getKey())); + } + } + + @Test + public void testAllocator() { + //stability + for (int i = 0; i < 10; i++) { + int num = 3; + Map brokerNumMap = buildBrokerNumMap(num); + TopicQueueMappingUtils.MappingAllocator allocator = TopicQueueMappingUtils.buildMappingAllocator(new HashMap(), brokerNumMap); + allocator.upToNum(num * 2); + for (Map.Entry entry: allocator.getBrokerNumMap().entrySet()) { + Assert.assertEquals(2L, entry.getValue().longValue()); + } + Assert.assertEquals(num * 2, allocator.getIdToBroker().size()); + testIdToBroker(allocator.idToBroker, allocator.getBrokerNumMap()); + + allocator.upToNum(num * 3 - 1); + + for (Map.Entry entry: allocator.getBrokerNumMap().entrySet()) { + Assert.assertTrue(entry.getValue() >= 2); + Assert.assertTrue(entry.getValue() <= 3); + } + Assert.assertEquals(num * 3 - 1, allocator.getIdToBroker().size()); + testIdToBroker(allocator.idToBroker, allocator.getBrokerNumMap()); + } + } +} diff --git a/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java b/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java index dac33d0eee88703865064eea1f3000f8ca659954..bb21dc294ad7e2148f6a4a7efddcf59f38e71012 100644 --- a/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java @@ -57,7 +57,6 @@ public class StaticTopicIT extends BaseConf { String broker = entry.getKey(); String addr = clientMetadata.findMasterBrokerAddr(broker); TopicConfigAndQueueMapping configMapping = entry.getValue(); - System.out.println(configMapping.getMappingDetail().toJson()); defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), false); } }