提交 a493ca44 编写于 作者: D dongeforever

Add the allocator test

上级 8962581e
...@@ -338,9 +338,6 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements ...@@ -338,9 +338,6 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
try { try {
this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig); this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);
System.out.println("Broker body:" + new String(request.getBody()));
System.out.println("Broker bodetaildy:" + topicQueueMappingDetail.toJson());
this.brokerController.getTopicQueueMappingManager().updateTopicQueueMapping(topicQueueMappingDetail, force); this.brokerController.getTopicQueueMappingManager().updateTopicQueueMapping(topicQueueMappingDetail, force);
this.brokerController.registerIncrementBrokerData(topicConfig, this.brokerController.getTopicConfigManager().getDataVersion()); this.brokerController.registerIncrementBrokerData(topicConfig, this.brokerController.getTopicConfigManager().getDataVersion());
......
...@@ -17,13 +17,12 @@ ...@@ -17,13 +17,12 @@
package org.apache.rocketmq.common.protocol.body; package org.apache.rocketmq.common.protocol.body;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.common.DataVersion; import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable; import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
public class TopicConfigSerializeWrapper extends RemotingSerializable { public class TopicConfigSerializeWrapper extends RemotingSerializable {
private ConcurrentMap<String, TopicConfig> topicConfigTable = private ConcurrentMap<String, TopicConfig> topicConfigTable =
new ConcurrentHashMap<String, TopicConfig>(); new ConcurrentHashMap<String, TopicConfig>();
......
...@@ -49,11 +49,12 @@ public class TopicQueueMappingUtils { ...@@ -49,11 +49,12 @@ public class TopicQueueMappingUtils {
} }
private void freshState() { private void freshState() {
int minNum = -1; int minNum = Integer.MAX_VALUE;
for (Map.Entry<String, Integer> entry : brokerNumMap.entrySet()) { for (Map.Entry<String, Integer> entry : brokerNumMap.entrySet()) {
if (entry.getValue() > minNum) { if (entry.getValue() < minNum) {
leastBrokers.clear(); leastBrokers.clear();
leastBrokers.add(entry.getKey()); leastBrokers.add(entry.getKey());
minNum = entry.getValue();
} else if (entry.getValue() == minNum) { } else if (entry.getValue() == minNum) {
leastBrokers.add(entry.getKey()); leastBrokers.add(entry.getKey());
} }
......
package org.apache.rocketmq.common.statictopic;
import org.junit.Assert;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
public class TopicMappingUtilsTest {
private Map<String, Integer> buildBrokerNumMap(int num) {
Map<String, Integer> map = new HashMap<String, Integer>();
for (int i = 0; i < num; i++) {
map.put("broker" + i, 0);
}
return map;
}
private void testIdToBroker(Map<Integer, String> idToBroker, Map<String, Integer> brokerNumMap) {
Map<String, Integer> brokerNumOther = new HashMap<String, Integer>();
for (int i = 0; i < idToBroker.size(); i++) {
Assert.assertTrue(idToBroker.containsKey(i));
String broker = idToBroker.get(i);
if (brokerNumOther.containsKey(broker)) {
brokerNumOther.put(broker, brokerNumOther.get(broker) + 1);
} else {
brokerNumOther.put(broker, 1);
}
}
Assert.assertEquals(brokerNumMap.size(), brokerNumOther.size());
for (Map.Entry<String, Integer> entry: brokerNumOther.entrySet()) {
Assert.assertEquals(entry.getValue(), brokerNumMap.get(entry.getKey()));
}
}
@Test
public void testAllocator() {
//stability
for (int i = 0; i < 10; i++) {
int num = 3;
Map<String, Integer> brokerNumMap = buildBrokerNumMap(num);
TopicQueueMappingUtils.MappingAllocator allocator = TopicQueueMappingUtils.buildMappingAllocator(new HashMap<Integer, String>(), brokerNumMap);
allocator.upToNum(num * 2);
for (Map.Entry<String, Integer> entry: allocator.getBrokerNumMap().entrySet()) {
Assert.assertEquals(2L, entry.getValue().longValue());
}
Assert.assertEquals(num * 2, allocator.getIdToBroker().size());
testIdToBroker(allocator.idToBroker, allocator.getBrokerNumMap());
allocator.upToNum(num * 3 - 1);
for (Map.Entry<String, Integer> entry: allocator.getBrokerNumMap().entrySet()) {
Assert.assertTrue(entry.getValue() >= 2);
Assert.assertTrue(entry.getValue() <= 3);
}
Assert.assertEquals(num * 3 - 1, allocator.getIdToBroker().size());
testIdToBroker(allocator.idToBroker, allocator.getBrokerNumMap());
}
}
}
...@@ -57,7 +57,6 @@ public class StaticTopicIT extends BaseConf { ...@@ -57,7 +57,6 @@ public class StaticTopicIT extends BaseConf {
String broker = entry.getKey(); String broker = entry.getKey();
String addr = clientMetadata.findMasterBrokerAddr(broker); String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicConfigAndQueueMapping configMapping = entry.getValue(); TopicConfigAndQueueMapping configMapping = entry.getValue();
System.out.println(configMapping.getMappingDetail().toJson());
defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), false); defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), false);
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册