提交 1dacb667 编写于 作者: D dongeforever

Test utils for static topic

上级 1074ef78
...@@ -234,6 +234,43 @@ public class TopicQueueMappingUtils { ...@@ -234,6 +234,43 @@ public class TopicQueueMappingUtils {
} }
} }
public static void checkIfReusePhysicalQueue(Collection<TopicQueueMappingOne> mappingOnes) {
Map<String, TopicQueueMappingOne> physicalQueueIdMap = new HashMap<String, TopicQueueMappingOne>();
for (TopicQueueMappingOne mappingOne : mappingOnes) {
for (LogicQueueMappingItem item: mappingOne.items) {
String physicalQueueId = item.getBname() + "-" + item.getQueueId();
if (physicalQueueIdMap.containsKey(physicalQueueId)) {
throw new RuntimeException(String.format("Topic %s global queue id %d and %d shared the same physical queue %s",
mappingOne.topic, mappingOne.globalId, physicalQueueIdMap.get(physicalQueueId).globalId, physicalQueueId));
} else {
physicalQueueIdMap.put(physicalQueueId, mappingOne);
}
}
}
}
public static void checkPhysicalQueueConsistence(Map<String, TopicConfigAndQueueMapping> brokerConfigMap) {
for (Map.Entry<String, TopicConfigAndQueueMapping> entry : brokerConfigMap.entrySet()) {
TopicConfigAndQueueMapping configMapping = entry.getValue();
assert configMapping != null;
assert configMapping.getMappingDetail() != null;
if (configMapping.getReadQueueNums() < configMapping.getWriteQueueNums()) {
throw new RuntimeException("Read queues is smaller than write queues");
}
for (List<LogicQueueMappingItem> items: configMapping.getMappingDetail().getHostedQueues().values()) {
for (LogicQueueMappingItem item: items) {
if (item.getStartOffset() != 0) {
throw new RuntimeException("The start offset dose not begin from 0");
}
TopicConfig topicConfig = brokerConfigMap.get(item.getBname());
if (item.getQueueId() >= topicConfig.getWriteQueueNums()) {
throw new RuntimeException("The physical queue id is overflow the write queues");
}
}
}
}
}
public static Map<Integer, TopicQueueMappingOne> checkAndBuildMappingItems(List<TopicQueueMappingDetail> mappingDetailList, boolean replace, boolean checkConsistence) { public static Map<Integer, TopicQueueMappingOne> checkAndBuildMappingItems(List<TopicQueueMappingDetail> mappingDetailList, boolean replace, boolean checkConsistence) {
Collections.sort(mappingDetailList, new Comparator<TopicQueueMappingDetail>() { Collections.sort(mappingDetailList, new Comparator<TopicQueueMappingDetail>() {
@Override @Override
...@@ -275,6 +312,7 @@ public class TopicQueueMappingUtils { ...@@ -275,6 +312,7 @@ public class TopicQueueMappingUtils {
} }
} }
} }
checkIfReusePhysicalQueue(globalIdMap.values());
return globalIdMap; return globalIdMap;
} }
...@@ -312,12 +350,23 @@ public class TopicQueueMappingUtils { ...@@ -312,12 +350,23 @@ public class TopicQueueMappingUtils {
} }
} }
public static void checkIfTargetBrokersComplete(Set<String> targetBrokers, Map<String, TopicConfigAndQueueMapping> brokerConfigMap) {
for (String broker : brokerConfigMap.keySet()) {
if (!targetBrokers.contains(broker)) {
throw new RuntimeException("The existed broker " + broker + " dose not in target brokers ");
}
}
}
public static TopicRemappingDetailWrapper createTopicConfigMapping(String topic, int queueNum, Set<String> targetBrokers, Map<String, TopicConfigAndQueueMapping> brokerConfigMap) { public static TopicRemappingDetailWrapper createTopicConfigMapping(String topic, int queueNum, Set<String> targetBrokers, Map<String, TopicConfigAndQueueMapping> brokerConfigMap) {
checkIfTargetBrokersComplete(targetBrokers, brokerConfigMap);
Map<Integer, TopicQueueMappingOne> globalIdMap = new HashMap<Integer, TopicQueueMappingOne>(); Map<Integer, TopicQueueMappingOne> globalIdMap = new HashMap<Integer, TopicQueueMappingOne>();
Map.Entry<Long, Integer> maxEpochAndNum = new AbstractMap.SimpleImmutableEntry<Long, Integer>(System.currentTimeMillis(), queueNum); Map.Entry<Long, Integer> maxEpochAndNum = new AbstractMap.SimpleImmutableEntry<Long, Integer>(System.currentTimeMillis(), queueNum);
if (!brokerConfigMap.isEmpty()) { if (!brokerConfigMap.isEmpty()) {
maxEpochAndNum = TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap); maxEpochAndNum = TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<TopicQueueMappingDetail>(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true); globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<TopicQueueMappingDetail>(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true);
checkIfReusePhysicalQueue(globalIdMap.values());
checkPhysicalQueueConsistence(brokerConfigMap);
} }
if (queueNum < globalIdMap.size()) { if (queueNum < globalIdMap.size()) {
throw new RuntimeException(String.format("Cannot decrease the queue num for static topic %d < %d", queueNum, globalIdMap.size())); throw new RuntimeException(String.format("Cannot decrease the queue num for static topic %d < %d", queueNum, globalIdMap.size()));
...@@ -377,9 +426,12 @@ public class TopicQueueMappingUtils { ...@@ -377,9 +426,12 @@ public class TopicQueueMappingUtils {
configMapping.getMappingDetail().setTotalQueues(queueNum); configMapping.getMappingDetail().setTotalQueues(queueNum);
} }
//double check the config //double check the config
TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap); {
TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<TopicQueueMappingDetail>(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true); TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<TopicQueueMappingDetail>(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true);
checkIfReusePhysicalQueue(globalIdMap.values());
checkPhysicalQueueConsistence(brokerConfigMap);
}
return new TopicRemappingDetailWrapper(topic, TopicRemappingDetailWrapper.TYPE_CREATE_OR_UPDATE, newEpoch, brokerConfigMap, new HashSet<String>(), new HashSet<String>()); return new TopicRemappingDetailWrapper(topic, TopicRemappingDetailWrapper.TYPE_CREATE_OR_UPDATE, newEpoch, brokerConfigMap, new HashSet<String>(), new HashSet<String>());
} }
...@@ -454,7 +506,7 @@ public class TopicQueueMappingUtils { ...@@ -454,7 +506,7 @@ public class TopicQueueMappingUtils {
List<LogicQueueMappingItem> items = new ArrayList<LogicQueueMappingItem>(topicQueueMappingOne.getItems()); List<LogicQueueMappingItem> items = new ArrayList<LogicQueueMappingItem>(topicQueueMappingOne.getItems());
LogicQueueMappingItem last = items.get(items.size() - 1); LogicQueueMappingItem last = items.get(items.size() - 1);
items.add(new LogicQueueMappingItem(last.getGen() + 1, mapInConfig.getWriteQueueNums() - 1, mapInBroker, -1, 0, -1, -1, -1)); items.add(new LogicQueueMappingItem(last.getGen() + 1, mapInConfig.getWriteQueueNums() - 1, mapInBroker, 0, 0, -1, -1, -1));
ImmutableList<LogicQueueMappingItem> resultItems = ImmutableList.copyOf(items); ImmutableList<LogicQueueMappingItem> resultItems = ImmutableList.copyOf(items);
//Use the same object //Use the same object
...@@ -469,8 +521,10 @@ public class TopicQueueMappingUtils { ...@@ -469,8 +521,10 @@ public class TopicQueueMappingUtils {
} }
//double check //double check
TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap); {
TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<TopicQueueMappingDetail>(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true); TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
}
return new TopicRemappingDetailWrapper(topic, TopicRemappingDetailWrapper.TYPE_REMAPPING, newEpoch, brokerConfigMap, brokersToMapIn, brokersToMapOut); return new TopicRemappingDetailWrapper(topic, TopicRemappingDetailWrapper.TYPE_REMAPPING, newEpoch, brokerConfigMap, brokersToMapIn, brokersToMapOut);
} }
......
package org.apache.rocketmq.common.statictopic; package org.apache.rocketmq.common.statictopic;
import org.apache.rocketmq.common.TopicConfig;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
public class TopicMappingUtilsTest { public class TopicMappingUtilsTest {
private Set<String> buildTargetBrokers(int num) {
Set<String> brokers = new HashSet<String>();
for (int i = 0; i < num; i++) {
brokers.add("broker" + i);
}
return brokers;
}
private Map<String, Integer> buildBrokerNumMap(int num) { private Map<String, Integer> buildBrokerNumMap(int num) {
Map<String, Integer> map = new HashMap<String, Integer>(); Map<String, Integer> map = new HashMap<String, Integer>();
for (int i = 0; i < num; i++) { for (int i = 0; i < num; i++) {
...@@ -58,4 +72,53 @@ public class TopicMappingUtilsTest { ...@@ -58,4 +72,53 @@ public class TopicMappingUtilsTest {
testIdToBroker(allocator.idToBroker, allocator.getBrokerNumMap()); testIdToBroker(allocator.idToBroker, allocator.getBrokerNumMap());
} }
} }
@Test(expected = RuntimeException.class)
public void testTargetBrokersComplete() {
String topic = "static";
String broker1 = "broker1";
String broker2 = "broker2";
Set<String> targetBrokers = new HashSet<String>();
targetBrokers.add(broker1);
Map<String, TopicConfigAndQueueMapping> brokerConfigMap = new HashMap<String, TopicConfigAndQueueMapping>();
brokerConfigMap.put(broker2, new TopicConfigAndQueueMapping(new TopicConfig(topic, 0, 0), new TopicQueueMappingDetail(topic, 0, broker2, 0)));
TopicQueueMappingUtils.checkIfTargetBrokersComplete(targetBrokers, brokerConfigMap);
}
@Test
public void testCreateStaticTopic() {
String topic = "static";
int queueNum;
Map<String, TopicConfigAndQueueMapping> brokerConfigMap = new HashMap<String, TopicConfigAndQueueMapping>();
for (int i = 1; i < 10; i++) {
Set<String> targetBrokers = buildTargetBrokers(2 * i);
queueNum = 10 * i;
TopicRemappingDetailWrapper wrapper = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, brokerConfigMap);
Assert.assertEquals(wrapper.getBrokerConfigMap(), brokerConfigMap);
Assert.assertEquals(2 * i, brokerConfigMap.size());
//do the check manually
TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
Map<Integer, TopicQueueMappingOne> globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<TopicQueueMappingDetail>(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true);
TopicQueueMappingUtils.checkIfReusePhysicalQueue(globalIdMap.values());
TopicQueueMappingUtils.checkPhysicalQueueConsistence(brokerConfigMap);
for (Map.Entry<String, TopicConfigAndQueueMapping> entry : brokerConfigMap.entrySet()) {
TopicConfigAndQueueMapping configMapping = entry.getValue();
Assert.assertEquals(5, configMapping.getReadQueueNums());
Assert.assertEquals(5, configMapping.getWriteQueueNums());
for (List<LogicQueueMappingItem> items: configMapping.getMappingDetail().getHostedQueues().values()) {
for (LogicQueueMappingItem item: items) {
Assert.assertEquals(0, item.getStartOffset());
Assert.assertEquals(0, item.getLogicOffset());
TopicConfig topicConfig = brokerConfigMap.get(item.getBname());
Assert.assertTrue(item.getQueueId() < topicConfig.getWriteQueueNums());
}
}
}
}
}
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册