提交 ad993e30 编写于 作者: D dongeforever

Use 3 brokers for IT test

上级 1ca218f1
......@@ -205,15 +205,13 @@ public class TopicQueueMappingUtils {
LogicQueueMappingItem newItem = newItems.get(inew);
LogicQueueMappingItem oldItem = oldItems.get(iold);
if (newItem.getGen() < oldItem.getGen()) {
//the old one may have been deleted
//the earliest item may have been deleted concurrently
inew++;
} else if (oldItem.getGen() < newItem.getGen()){
//the new one may be the "delete one from "
if (isCLean) {
//in the following cases, the new item-list has less items than old item-list
//1. the queue is mapped back to a broker which hold the logic queue before
//2. The earliest item is deleted by TopicQueueMappingCleanService
iold++;
} else {
throw new RuntimeException("The new item-list has less items than old item-list");
}
} else {
assert oldItem.getBname().equals(newItem.getBname());
assert oldItem.getQueueId() == newItem.getQueueId();
......
......@@ -56,6 +56,8 @@ public class BaseConf {
public final static String nsAddr;
protected final static String broker1Name;
protected final static String broker2Name;
//the logic queue test need at least three brokers
protected final static String broker3Name;
protected final static String clusterName;
protected final static int brokerNum;
protected final static int waitTime = 5;
......@@ -64,6 +66,7 @@ public class BaseConf {
protected final static NamesrvController namesrvController;
protected final static BrokerController brokerController1;
protected final static BrokerController brokerController2;
protected final static BrokerController brokerController3;
protected final static List<BrokerController> brokerControllerList;
protected final static Map<String, BrokerController> brokerControllerMap;
protected final static List<Object> mqClients = new ArrayList<Object>();
......@@ -76,11 +79,13 @@ public class BaseConf {
nsAddr = "127.0.0.1:" + namesrvController.getNettyServerConfig().getListenPort();
brokerController1 = IntegrationTestBase.createAndStartBroker(nsAddr);
brokerController2 = IntegrationTestBase.createAndStartBroker(nsAddr);
brokerController3 = IntegrationTestBase.createAndStartBroker(nsAddr);
clusterName = brokerController1.getBrokerConfig().getBrokerClusterName();
broker1Name = brokerController1.getBrokerConfig().getBrokerName();
broker2Name = brokerController2.getBrokerConfig().getBrokerName();
brokerNum = 2;
brokerControllerList = ImmutableList.of(brokerController1, brokerController2);
broker3Name = brokerController3.getBrokerConfig().getBrokerName();
brokerNum = 3;
brokerControllerList = ImmutableList.of(brokerController1, brokerController2, brokerController3);
brokerControllerMap = brokerControllerList.stream().collect(Collectors.toMap(input -> input.getBrokerConfig().getBrokerName(), Function.identity()));
}
......@@ -214,6 +219,7 @@ public class BaseConf {
Set<String> brokers = new HashSet<>();
brokers.add(broker1Name);
brokers.add(broker2Name);
brokers.add(broker3Name);
return brokers;
}
......
......@@ -60,7 +60,7 @@ public class StaticTopicIT extends BaseConf {
targetBrokers.add(broker1Name);
MQAdminTestUtils.createStaticTopic(topic, queueNum, targetBrokers, defaultMQAdminExt);
Map<String, TopicConfigAndQueueMapping> remoteBrokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt);
Assert.assertEquals(2, remoteBrokerConfigMap.size());
Assert.assertEquals(brokerNum, remoteBrokerConfigMap.size());
TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, remoteBrokerConfigMap);
Map<Integer, TopicQueueMappingOne> globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(getMappingDetailFromConfig(remoteBrokerConfigMap.values())), false, true);
Assert.assertEquals(queueNum, globalIdMap.size());
......@@ -75,7 +75,7 @@ public class StaticTopicIT extends BaseConf {
targetBrokers.add(broker2Name);
MQAdminTestUtils.remappingStaticTopic(topic, targetBrokers, defaultMQAdminExt);
Map<String, TopicConfigAndQueueMapping> remoteBrokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt);
Assert.assertEquals(2, remoteBrokerConfigMap.size());
Assert.assertEquals(brokerNum, remoteBrokerConfigMap.size());
TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, remoteBrokerConfigMap);
Map<Integer, TopicQueueMappingOne> globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(getMappingDetailFromConfig(remoteBrokerConfigMap.values())), false, true);
Assert.assertEquals(queueNum, globalIdMap.size());
......@@ -97,7 +97,7 @@ public class StaticTopicIT extends BaseConf {
//check the static topic config
{
Map<String, TopicConfigAndQueueMapping> remoteBrokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt);
Assert.assertEquals(2, remoteBrokerConfigMap.size());
Assert.assertEquals(brokerNum, remoteBrokerConfigMap.size());
for (Map.Entry<String, TopicConfigAndQueueMapping> entry: remoteBrokerConfigMap.entrySet()) {
String broker = entry.getKey();
TopicConfigAndQueueMapping configMapping = entry.getValue();
......@@ -356,6 +356,8 @@ public class StaticTopicIT extends BaseConf {
}
}
@Test
public void testRemappingWithNegativeLogicOffset() throws Exception {
String topic = "static" + MQRandomUtils.getRandomTopic();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册