提交 a172c769 编写于 作者: D dongeforever

Finish the test for utils of createStaticTopic

上级 1dacb667
......@@ -263,6 +263,9 @@ public class TopicQueueMappingUtils {
throw new RuntimeException("The start offset dose not begin from 0");
}
TopicConfig topicConfig = brokerConfigMap.get(item.getBname());
if (topicConfig == null) {
throw new RuntimeException("The broker dose not exist");
}
if (item.getQueueId() >= topicConfig.getWriteQueueNums()) {
throw new RuntimeException("The physical queue id is overflow the write queues");
}
......@@ -406,7 +409,7 @@ public class TopicQueueMappingUtils {
}
TopicConfigAndQueueMapping configMapping;
if (!brokerConfigMap.containsKey(broker)) {
configMapping = new TopicConfigAndQueueMapping(new TopicConfig(topic), new TopicQueueMappingDetail(topic, 0, broker, -1));
configMapping = new TopicConfigAndQueueMapping(new TopicConfig(topic), new TopicQueueMappingDetail(topic, 0, broker, System.currentTimeMillis()));
configMapping.setWriteQueueNums(1);
configMapping.setReadQueueNums(1);
brokerConfigMap.put(broker, configMapping);
......@@ -416,7 +419,7 @@ public class TopicQueueMappingUtils {
configMapping.setReadQueueNums(configMapping.getReadQueueNums() + 1);
}
LogicQueueMappingItem mappingItem = new LogicQueueMappingItem(0, configMapping.getWriteQueueNums() - 1, broker, 0, 0, -1, -1, -1);
TopicQueueMappingDetail.putMappingInfo(configMapping.getMappingDetail(), queueId, ImmutableList.of(mappingItem));
TopicQueueMappingDetail.putMappingInfo(configMapping.getMappingDetail(), queueId, new ArrayList<LogicQueueMappingItem>(Collections.singletonList(mappingItem)));
}
// set the topic config
......@@ -508,10 +511,9 @@ public class TopicQueueMappingUtils {
LogicQueueMappingItem last = items.get(items.size() - 1);
items.add(new LogicQueueMappingItem(last.getGen() + 1, mapInConfig.getWriteQueueNums() - 1, mapInBroker, 0, 0, -1, -1, -1));
ImmutableList<LogicQueueMappingItem> resultItems = ImmutableList.copyOf(items);
//Use the same object
TopicQueueMappingDetail.putMappingInfo(mapInConfig.getMappingDetail(), queueId, resultItems);
TopicQueueMappingDetail.putMappingInfo(mapOutConfig.getMappingDetail(), queueId, resultItems);
TopicQueueMappingDetail.putMappingInfo(mapInConfig.getMappingDetail(), queueId, items);
TopicQueueMappingDetail.putMappingInfo(mapOutConfig.getMappingDetail(), queueId, items);
}
for (Map.Entry<String, TopicConfigAndQueueMapping> entry : brokerConfigMap.entrySet()) {
......
......@@ -5,6 +5,7 @@ import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
......@@ -110,6 +111,7 @@ public class TopicMappingUtilsTest {
TopicConfigAndQueueMapping configMapping = entry.getValue();
Assert.assertEquals(5, configMapping.getReadQueueNums());
Assert.assertEquals(5, configMapping.getWriteQueueNums());
Assert.assertTrue(configMapping.getMappingDetail().epoch > System.currentTimeMillis());
for (List<LogicQueueMappingItem> items: configMapping.getMappingDetail().getHostedQueues().values()) {
for (LogicQueueMappingItem item: items) {
Assert.assertEquals(0, item.getStartOffset());
......@@ -121,4 +123,77 @@ public class TopicMappingUtilsTest {
}
}
}
@Test
public void testCreateStaticTopic_Error() {
String topic = "static";
int queueNum = 10;
Map<String, TopicConfigAndQueueMapping> brokerConfigMap = new HashMap<String, TopicConfigAndQueueMapping>();
Set<String> targetBrokers = buildTargetBrokers(2);
TopicRemappingDetailWrapper wrapper = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, brokerConfigMap);
Assert.assertEquals(wrapper.getBrokerConfigMap(), brokerConfigMap);
Assert.assertEquals(2, brokerConfigMap.size());
TopicConfigAndQueueMapping configMapping = brokerConfigMap.values().iterator().next();
List<LogicQueueMappingItem> items = configMapping.getMappingDetail().getHostedQueues().values().iterator().next();
Map.Entry<Long, Integer> maxEpochNum = TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
int exceptionNum = 0;
try {
configMapping.getMappingDetail().setTopic("xxxx");
TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
} catch (RuntimeException ignore) {
exceptionNum++;
configMapping.getMappingDetail().setTopic(topic);
TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
}
try {
configMapping.getMappingDetail().setTotalQueues(1);
TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
} catch (RuntimeException ignore) {
exceptionNum++;
configMapping.getMappingDetail().setTotalQueues(10);
TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
}
try {
configMapping.getMappingDetail().setEpoch(0);
TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
} catch (RuntimeException ignore) {
exceptionNum++;
configMapping.getMappingDetail().setEpoch(maxEpochNum.getKey());
TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
}
try {
configMapping.getMappingDetail().getHostedQueues().put(10000, new ArrayList<LogicQueueMappingItem>(Collections.singletonList(new LogicQueueMappingItem(1, 1, targetBrokers.iterator().next(), 0, 0, -1, -1, -1))));
TopicQueueMappingUtils.checkAndBuildMappingItems(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values()), false, true);
} catch (RuntimeException ignore) {
exceptionNum++;
configMapping.getMappingDetail().getHostedQueues().remove(10000);
TopicQueueMappingUtils.checkAndBuildMappingItems(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values()), false, true);
}
try {
configMapping.setWriteQueueNums(1);
TopicQueueMappingUtils.checkPhysicalQueueConsistence(brokerConfigMap);
} catch (RuntimeException ignore) {
exceptionNum++;
configMapping.setWriteQueueNums(5);
TopicQueueMappingUtils.checkPhysicalQueueConsistence(brokerConfigMap);
}
try {
items.add(new LogicQueueMappingItem(1, 1, targetBrokers.iterator().next(), 0, 0, -1, -1, -1));
Map<Integer, TopicQueueMappingOne> map = TopicQueueMappingUtils.checkAndBuildMappingItems(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values()), false, true);
TopicQueueMappingUtils.checkIfReusePhysicalQueue(map.values());
} catch (RuntimeException ignore) {
exceptionNum++;
items.remove(items.size() - 1);
Map<Integer, TopicQueueMappingOne> map = TopicQueueMappingUtils.checkAndBuildMappingItems(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values()), false, true);
TopicQueueMappingUtils.checkIfReusePhysicalQueue(map.values());
}
Assert.assertEquals(6, exceptionNum);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册