提交 5ce093e1 编写于 作者: D dongeforever

Add test for remapping static topic

上级 a172c769
......@@ -16,6 +16,8 @@
*/
package org.apache.rocketmq.common.statictopic;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import java.util.List;
......@@ -49,4 +51,30 @@ public class TopicQueueMappingOne extends RemotingSerializable {
public List<LogicQueueMappingItem> getItems() {
return items;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof TopicQueueMappingOne)) return false;
TopicQueueMappingOne that = (TopicQueueMappingOne) o;
return new EqualsBuilder()
.append(topic, that.topic)
.append(bname, that.bname)
.append(globalId, that.globalId)
.append(items, that.items)
.isEquals();
}
@Override
public int hashCode() {
return new HashCodeBuilder(17, 37)
.append(topic)
.append(bname)
.append(globalId)
.append(items)
.toHashCode();
}
}
......@@ -16,7 +16,6 @@
*/
package org.apache.rocketmq.common.statictopic;
import com.google.common.collect.ImmutableList;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
......@@ -122,7 +121,7 @@ public class TopicQueueMappingUtils {
return detailList;
}
public static Map.Entry<Long, Integer> checkConsistenceOfTopicConfigAndQueueMapping(String topic, Map<String, TopicConfigAndQueueMapping> brokerConfigMap) {
public static Map.Entry<Long, Integer> checkNameEpochNumConsistence(String topic, Map<String, TopicConfigAndQueueMapping> brokerConfigMap) {
if (brokerConfigMap == null
|| brokerConfigMap.isEmpty()) {
return null;
......@@ -212,6 +211,11 @@ public class TopicQueueMappingUtils {
|| item.getQueueId() < 0) {
throw new RuntimeException("The field is illegal, should not be negative");
}
if (items.size() >= 2
&& i <= items.size() - 2
&& items.get(i).getLogicOffset() < 0) {
throw new RuntimeException("The non-latest item has negative logic offset");
}
if (lastGen != -1 && item.getGen() >= lastGen) {
throw new RuntimeException("The gen dose not increase monotonically");
}
......@@ -249,6 +253,14 @@ public class TopicQueueMappingUtils {
}
}
public static void checkLeaderInTargetBrokers(Collection<TopicQueueMappingOne> mappingOnes, Set<String> targetBrokers) {
for (TopicQueueMappingOne mappingOne : mappingOnes) {
if (!targetBrokers.contains(mappingOne.bname)) {
throw new RuntimeException("The leader broker does not in target broker");
}
}
}
public static void checkPhysicalQueueConsistence(Map<String, TopicConfigAndQueueMapping> brokerConfigMap) {
for (Map.Entry<String, TopicConfigAndQueueMapping> entry : brokerConfigMap.entrySet()) {
TopicConfigAndQueueMapping configMapping = entry.getValue();
......@@ -264,7 +276,7 @@ public class TopicQueueMappingUtils {
}
TopicConfig topicConfig = brokerConfigMap.get(item.getBname());
if (topicConfig == null) {
throw new RuntimeException("The broker dose not exist");
throw new RuntimeException("The broker of item dose not exist");
}
if (item.getQueueId() >= topicConfig.getWriteQueueNums()) {
throw new RuntimeException("The physical queue id is overflow the write queues");
......@@ -274,6 +286,8 @@ public class TopicQueueMappingUtils {
}
}
public static Map<Integer, TopicQueueMappingOne> checkAndBuildMappingItems(List<TopicQueueMappingDetail> mappingDetailList, boolean replace, boolean checkConsistence) {
Collections.sort(mappingDetailList, new Comparator<TopicQueueMappingDetail>() {
@Override
......@@ -366,7 +380,7 @@ public class TopicQueueMappingUtils {
Map<Integer, TopicQueueMappingOne> globalIdMap = new HashMap<Integer, TopicQueueMappingOne>();
Map.Entry<Long, Integer> maxEpochAndNum = new AbstractMap.SimpleImmutableEntry<Long, Integer>(System.currentTimeMillis(), queueNum);
if (!brokerConfigMap.isEmpty()) {
maxEpochAndNum = TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
maxEpochAndNum = TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, brokerConfigMap);
globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<TopicQueueMappingDetail>(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true);
checkIfReusePhysicalQueue(globalIdMap.values());
checkPhysicalQueueConsistence(brokerConfigMap);
......@@ -430,8 +444,8 @@ public class TopicQueueMappingUtils {
}
//double check the config
{
TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<TopicQueueMappingDetail>(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true);
TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, brokerConfigMap);
globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(getMappingDetailFromConfig(brokerConfigMap.values()), false, true);
checkIfReusePhysicalQueue(globalIdMap.values());
checkPhysicalQueueConsistence(brokerConfigMap);
}
......@@ -440,8 +454,11 @@ public class TopicQueueMappingUtils {
public static TopicRemappingDetailWrapper remappingStaticTopic(String topic, Map<String, TopicConfigAndQueueMapping> brokerConfigMap, Set<String> targetBrokers) {
final Map.Entry<Long, Integer> maxEpochAndNum = TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
final Map<Integer, TopicQueueMappingOne> globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<TopicQueueMappingDetail>(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true);
Map.Entry<Long, Integer> maxEpochAndNum = TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, brokerConfigMap);
Map<Integer, TopicQueueMappingOne> globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(getMappingDetailFromConfig(brokerConfigMap.values()), false, true);
TopicQueueMappingUtils.checkPhysicalQueueConsistence(brokerConfigMap);
TopicQueueMappingUtils.checkIfReusePhysicalQueue(globalIdMap.values());
//the check is ok, now do the mapping allocation
int maxNum = maxEpochAndNum.getValue();
......@@ -449,7 +466,6 @@ public class TopicQueueMappingUtils {
for (String broker: targetBrokers) {
brokerNumMap.put(broker, 0);
}
TopicQueueMappingUtils.MappingAllocator allocator = TopicQueueMappingUtils.buildMappingAllocator(new HashMap<Integer, String>(), brokerNumMap);
allocator.upToNum(maxNum);
Map<String, Integer> expectedBrokerNumMap = allocator.getBrokerNumMap();
......@@ -504,12 +520,17 @@ public class TopicQueueMappingUtils {
TopicConfigAndQueueMapping mapInConfig = brokerConfigMap.get(mapInBroker);
TopicConfigAndQueueMapping mapOutConfig = brokerConfigMap.get(mapOutBroker);
if (mapInConfig == null) {
mapInConfig = new TopicConfigAndQueueMapping(new TopicConfig(topic, 0, 0), new TopicQueueMappingDetail(topic, maxNum, mapInBroker, newEpoch));
brokerConfigMap.put(mapInBroker, mapInConfig);
}
mapInConfig.setWriteQueueNums(mapInConfig.getWriteQueueNums() + 1);
mapInConfig.setWriteQueueNums(mapInConfig.getWriteQueueNums() + 1);
mapInConfig.setReadQueueNums(mapInConfig.getReadQueueNums() + 1);
List<LogicQueueMappingItem> items = new ArrayList<LogicQueueMappingItem>(topicQueueMappingOne.getItems());
LogicQueueMappingItem last = items.get(items.size() - 1);
items.add(new LogicQueueMappingItem(last.getGen() + 1, mapInConfig.getWriteQueueNums() - 1, mapInBroker, 0, 0, -1, -1, -1));
items.add(new LogicQueueMappingItem(last.getGen() + 1, mapInConfig.getWriteQueueNums() - 1, mapInBroker, -1, 0, -1, -1, -1));
//Use the same object
TopicQueueMappingDetail.putMappingInfo(mapInConfig.getMappingDetail(), queueId, items);
......@@ -524,10 +545,12 @@ public class TopicQueueMappingUtils {
//double check
{
TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, brokerConfigMap);
globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(getMappingDetailFromConfig(brokerConfigMap.values()), false, true);
TopicQueueMappingUtils.checkPhysicalQueueConsistence(brokerConfigMap);
TopicQueueMappingUtils.checkIfReusePhysicalQueue(globalIdMap.values());
TopicQueueMappingUtils.checkLeaderInTargetBrokers(globalIdMap.values(), targetBrokers);
}
return new TopicRemappingDetailWrapper(topic, TopicRemappingDetailWrapper.TYPE_REMAPPING, newEpoch, brokerConfigMap, brokersToMapIn, brokersToMapOut);
}
......
......@@ -15,11 +15,14 @@ import java.util.Set;
public class TopicMappingUtilsTest {
private Set<String> buildTargetBrokers(int num) {
return buildTargetBrokers(num, "");
}
private Set<String> buildTargetBrokers(int num, String suffix) {
Set<String> brokers = new HashSet<String>();
for (int i = 0; i < num; i++) {
brokers.add("broker" + i);
brokers.add("broker" + suffix + i);
}
return brokers;
}
......@@ -102,7 +105,7 @@ public class TopicMappingUtilsTest {
Assert.assertEquals(2 * i, brokerConfigMap.size());
//do the check manually
TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, brokerConfigMap);
Map<Integer, TopicQueueMappingOne> globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<TopicQueueMappingDetail>(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true);
TopicQueueMappingUtils.checkIfReusePhysicalQueue(globalIdMap.values());
TopicQueueMappingUtils.checkPhysicalQueueConsistence(brokerConfigMap);
......@@ -125,7 +128,52 @@ public class TopicMappingUtilsTest {
}
@Test
public void testCreateStaticTopic_Error() {
public void testRemappingStaticTopic() {
String topic = "static";
int queueNum = 7;
Map<String, TopicConfigAndQueueMapping> brokerConfigMap = new HashMap<String, TopicConfigAndQueueMapping>();
Set<String> originalBrokers = buildTargetBrokers(2);
TopicRemappingDetailWrapper wrapper = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, originalBrokers, brokerConfigMap);
Assert.assertEquals(wrapper.getBrokerConfigMap(), brokerConfigMap);
Assert.assertEquals(2, brokerConfigMap.size());
{
//do the check manually
TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, brokerConfigMap);
TopicQueueMappingUtils.checkPhysicalQueueConsistence(brokerConfigMap);
Map<Integer, TopicQueueMappingOne> globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<TopicQueueMappingDetail>(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true);
TopicQueueMappingUtils.checkIfReusePhysicalQueue(globalIdMap.values());
}
for (int i = 0; i < 10; i++) {
Set<String> targetBrokers = buildTargetBrokers(2, "test" + i);
TopicQueueMappingUtils.remappingStaticTopic(topic, brokerConfigMap, targetBrokers);
//do the check manually
TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, brokerConfigMap);
TopicQueueMappingUtils.checkPhysicalQueueConsistence(brokerConfigMap);
Map<Integer, TopicQueueMappingOne> globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<TopicQueueMappingDetail>(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true);
TopicQueueMappingUtils.checkIfReusePhysicalQueue(globalIdMap.values());
TopicQueueMappingUtils.checkLeaderInTargetBrokers(globalIdMap.values(), targetBrokers);
Assert.assertEquals((i + 2) * 2, brokerConfigMap.size());
//check and complete the logicOffset
for (Map.Entry<String, TopicConfigAndQueueMapping> entry : brokerConfigMap.entrySet()) {
TopicConfigAndQueueMapping configMapping = entry.getValue();
if (!targetBrokers.contains(configMapping.getMappingDetail().bname)) {
continue;
}
for (List<LogicQueueMappingItem> items: configMapping.getMappingDetail().getHostedQueues().values()) {
Assert.assertEquals(i + 2, items.size());
items.get(items.size() - 1).setLogicOffset(i + 1);
}
}
}
}
@Test
public void testUtilsCheck() {
String topic = "static";
int queueNum = 10;
Map<String, TopicConfigAndQueueMapping> brokerConfigMap = new HashMap<String, TopicConfigAndQueueMapping>();
......@@ -135,33 +183,33 @@ public class TopicMappingUtilsTest {
Assert.assertEquals(2, brokerConfigMap.size());
TopicConfigAndQueueMapping configMapping = brokerConfigMap.values().iterator().next();
List<LogicQueueMappingItem> items = configMapping.getMappingDetail().getHostedQueues().values().iterator().next();
Map.Entry<Long, Integer> maxEpochNum = TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
Map.Entry<Long, Integer> maxEpochNum = TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, brokerConfigMap);
int exceptionNum = 0;
try {
configMapping.getMappingDetail().setTopic("xxxx");
TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, brokerConfigMap);
} catch (RuntimeException ignore) {
exceptionNum++;
configMapping.getMappingDetail().setTopic(topic);
TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, brokerConfigMap);
}
try {
configMapping.getMappingDetail().setTotalQueues(1);
TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, brokerConfigMap);
} catch (RuntimeException ignore) {
exceptionNum++;
configMapping.getMappingDetail().setTotalQueues(10);
TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, brokerConfigMap);
}
try {
configMapping.getMappingDetail().setEpoch(0);
TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, brokerConfigMap);
} catch (RuntimeException ignore) {
exceptionNum++;
configMapping.getMappingDetail().setEpoch(maxEpochNum.getKey());
TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, brokerConfigMap);
}
......
......@@ -72,7 +72,7 @@ public class StaticTopicIT extends BaseConf {
Assert.assertNotNull(localConfigMapping);
Assert.assertEquals(configMapping, localConfigMapping);
}
TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, remoteBrokerConfigMap);
TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, remoteBrokerConfigMap);
Map<Integer, TopicQueueMappingOne> globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(getMappingDetailFromConfig(remoteBrokerConfigMap.values())), false, true);
Assert.assertEquals(queueNum, globalIdMap.size());
}
......
......@@ -16,22 +16,15 @@
*/
package org.apache.rocketmq.tools.command.topic;
import com.google.common.collect.ImmutableList;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionGroup;
import org.apache.commons.cli.Options;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingOne;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
import org.apache.rocketmq.common.admin.TopicOffset;
import org.apache.rocketmq.common.admin.TopicStatsTable;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.rpc.ClientMetadata;
import org.apache.rocketmq.common.statictopic.TopicRemappingDetailWrapper;
import org.apache.rocketmq.remoting.RPCHook;
......@@ -40,16 +33,11 @@ import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
import java.util.AbstractMap;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.stream.Collectors;
public class RemappingStaticTopicSubCommand implements SubCommand {
......@@ -108,7 +96,7 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
String mapData = MixAll.file2String(mapFileName);
TopicRemappingDetailWrapper wrapper = TopicRemappingDetailWrapper.decode(mapData.getBytes(), TopicRemappingDetailWrapper.class);
//double check the config
TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, wrapper.getBrokerConfigMap());
TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, wrapper.getBrokerConfigMap());
TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(TopicQueueMappingUtils.getMappingDetailFromConfig(wrapper.getBrokerConfigMap().values())), false, true);
ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
......@@ -202,7 +190,7 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
if (brokerConfigMap.isEmpty()) {
throw new RuntimeException("No topic route to do the remapping");
}
Map.Entry<Long, Integer> maxEpochAndNum = TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
Map.Entry<Long, Integer> maxEpochAndNum = TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, brokerConfigMap);
{
TopicRemappingDetailWrapper oldWrapper = new TopicRemappingDetailWrapper(topic, TopicRemappingDetailWrapper.TYPE_CREATE_OR_UPDATE, maxEpochAndNum.getKey(), brokerConfigMap, new HashSet<String>(), new HashSet<String>());
String oldMappingDataFile = TopicQueueMappingUtils.writeToTemp(oldWrapper, false);
......
......@@ -16,21 +16,14 @@
*/
package org.apache.rocketmq.tools.command.topic;
import com.google.common.collect.ImmutableList;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionGroup;
import org.apache.commons.cli.Options;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingOne;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.rpc.ClientMetadata;
import org.apache.rocketmq.common.statictopic.TopicRemappingDetailWrapper;
import org.apache.rocketmq.remoting.RPCHook;
......@@ -45,7 +38,6 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
public class UpdateStaticTopicSubCommand implements SubCommand {
......@@ -107,7 +99,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
String mapData = MixAll.file2String(mapFileName);
TopicRemappingDetailWrapper wrapper = TopicRemappingDetailWrapper.decode(mapData.getBytes(), TopicRemappingDetailWrapper.class);
//double check the config
TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, wrapper.getBrokerConfigMap());
TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, wrapper.getBrokerConfigMap());
boolean force = false;
if (commandLine.hasOption("fr") && Boolean.parseBoolean(commandLine.getOptionValue("fr").trim())) {
force = true;
......@@ -215,7 +207,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
Map.Entry<Long, Integer> maxEpochAndNum = new AbstractMap.SimpleImmutableEntry<>(System.currentTimeMillis(), queueNum);
if (!brokerConfigMap.isEmpty()) {
maxEpochAndNum = TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
maxEpochAndNum = TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, brokerConfigMap);
}
{
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册