diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java index 3ef45e05168fc56274d11f096a8a17920779b7af..1b5290406000a5a0934550da59a28c7a11849f9c 100644 --- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java +++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java @@ -205,15 +205,13 @@ public class TopicQueueMappingUtils { LogicQueueMappingItem newItem = newItems.get(inew); LogicQueueMappingItem oldItem = oldItems.get(iold); if (newItem.getGen() < oldItem.getGen()) { - //the old one may have been deleted + //the earliest item may have been deleted concurrently inew++; } else if (oldItem.getGen() < newItem.getGen()){ - //the new one may be the "delete one from " - if (isCLean) { - iold++; - } else { - throw new RuntimeException("The new item-list has less items than old item-list"); - } + //in the following cases, the new item-list has less items than old item-list + //1. the queue is mapped back to a broker which hold the logic queue before + //2. The earliest item is deleted by TopicQueueMappingCleanService + iold++; } else { assert oldItem.getBname().equals(newItem.getBname()); assert oldItem.getQueueId() == newItem.getQueueId(); diff --git a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java index ddeee6ee10dcc23fb91259207dc2e8f4eed99d37..a92389ea37ac5af4c832b8cb2035267c3aaffb21 100644 --- a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java +++ b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java @@ -56,6 +56,8 @@ public class BaseConf { public final static String nsAddr; protected final static String broker1Name; protected final static String broker2Name; + //the logic queue test need at least three brokers + protected final static String broker3Name; protected final static String clusterName; protected final static int brokerNum; protected final static int waitTime = 5; @@ -64,6 +66,7 @@ public class BaseConf { protected final static NamesrvController namesrvController; protected final static BrokerController brokerController1; protected final static BrokerController brokerController2; + protected final static BrokerController brokerController3; protected final static List brokerControllerList; protected final static Map brokerControllerMap; protected final static List mqClients = new ArrayList(); @@ -76,11 +79,13 @@ public class BaseConf { nsAddr = "127.0.0.1:" + namesrvController.getNettyServerConfig().getListenPort(); brokerController1 = IntegrationTestBase.createAndStartBroker(nsAddr); brokerController2 = IntegrationTestBase.createAndStartBroker(nsAddr); + brokerController3 = IntegrationTestBase.createAndStartBroker(nsAddr); clusterName = brokerController1.getBrokerConfig().getBrokerClusterName(); broker1Name = brokerController1.getBrokerConfig().getBrokerName(); broker2Name = brokerController2.getBrokerConfig().getBrokerName(); - brokerNum = 2; - brokerControllerList = ImmutableList.of(brokerController1, brokerController2); + broker3Name = brokerController3.getBrokerConfig().getBrokerName(); + brokerNum = 3; + brokerControllerList = ImmutableList.of(brokerController1, brokerController2, brokerController3); brokerControllerMap = brokerControllerList.stream().collect(Collectors.toMap(input -> input.getBrokerConfig().getBrokerName(), Function.identity())); } @@ -214,6 +219,7 @@ public class BaseConf { Set brokers = new HashSet<>(); brokers.add(broker1Name); brokers.add(broker2Name); + brokers.add(broker3Name); return brokers; } diff --git a/test/src/test/java/org/apache/rocketmq/test/statictopic/StaticTopicIT.java b/test/src/test/java/org/apache/rocketmq/test/statictopic/StaticTopicIT.java index 2bec8a0ac9db66da16e7c4bfa723c898af3eb2a0..14bb96783b8b58c8513411cd51798bdb7f5f09af 100644 --- a/test/src/test/java/org/apache/rocketmq/test/statictopic/StaticTopicIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/statictopic/StaticTopicIT.java @@ -60,7 +60,7 @@ public class StaticTopicIT extends BaseConf { targetBrokers.add(broker1Name); MQAdminTestUtils.createStaticTopic(topic, queueNum, targetBrokers, defaultMQAdminExt); Map remoteBrokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt); - Assert.assertEquals(2, remoteBrokerConfigMap.size()); + Assert.assertEquals(brokerNum, remoteBrokerConfigMap.size()); TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, remoteBrokerConfigMap); Map globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(getMappingDetailFromConfig(remoteBrokerConfigMap.values())), false, true); Assert.assertEquals(queueNum, globalIdMap.size()); @@ -75,7 +75,7 @@ public class StaticTopicIT extends BaseConf { targetBrokers.add(broker2Name); MQAdminTestUtils.remappingStaticTopic(topic, targetBrokers, defaultMQAdminExt); Map remoteBrokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt); - Assert.assertEquals(2, remoteBrokerConfigMap.size()); + Assert.assertEquals(brokerNum, remoteBrokerConfigMap.size()); TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, remoteBrokerConfigMap); Map globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(getMappingDetailFromConfig(remoteBrokerConfigMap.values())), false, true); Assert.assertEquals(queueNum, globalIdMap.size()); @@ -97,7 +97,7 @@ public class StaticTopicIT extends BaseConf { //check the static topic config { Map remoteBrokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt); - Assert.assertEquals(2, remoteBrokerConfigMap.size()); + Assert.assertEquals(brokerNum, remoteBrokerConfigMap.size()); for (Map.Entry entry: remoteBrokerConfigMap.entrySet()) { String broker = entry.getKey(); TopicConfigAndQueueMapping configMapping = entry.getValue(); @@ -356,6 +356,8 @@ public class StaticTopicIT extends BaseConf { } } + + @Test public void testRemappingWithNegativeLogicOffset() throws Exception { String topic = "static" + MQRandomUtils.getRandomTopic();