diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java index 975a5ba003cd017e2463cc9cb8d2fb1a069a8331..e56d585371ad26d1f20c25cd3fe019f57b8053ce 100644 --- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java +++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java @@ -422,9 +422,8 @@ public class TopicQueueMappingUtils { } } - public static TopicRemappingDetailWrapper createTopicConfigMapping(String topic, int queueNum, Set targetBrokers, Set nonTargetBrokers, Map brokerConfigMap) { + public static TopicRemappingDetailWrapper createTopicConfigMapping(String topic, int queueNum, Set targetBrokers, Map brokerConfigMap) { checkTargetBrokersComplete(targetBrokers, brokerConfigMap); - checkNonTargetBrokers(targetBrokers, nonTargetBrokers); Map globalIdMap = new HashMap(); Map.Entry maxEpochAndNum = new AbstractMap.SimpleImmutableEntry(System.currentTimeMillis(), queueNum); if (!brokerConfigMap.isEmpty()) { @@ -484,12 +483,6 @@ public class TopicQueueMappingUtils { TopicQueueMappingDetail.putMappingInfo(configMapping.getMappingDetail(), queueId, new ArrayList(Collections.singletonList(mappingItem))); } - //set the non target brokers - for (String broker : nonTargetBrokers) { - if (!brokerConfigMap.containsKey(broker)) { - brokerConfigMap.put(broker, new TopicConfigAndQueueMapping(new TopicConfig(topic, 0, 0), new TopicQueueMappingDetail(topic, queueNum, broker, newEpoch))); - } - } // set the topic config for (Map.Entry entry : brokerConfigMap.entrySet()) { TopicConfigAndQueueMapping configMapping = entry.getValue(); diff --git a/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtilsTest.java b/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtilsTest.java index 93cad48a3d85b79db4a253f208835c19a810d8bb..f5cd0efafd2a03f9b824f2443468177e3ed95b31 100644 --- a/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtilsTest.java +++ b/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtilsTest.java @@ -128,9 +128,9 @@ public class TopicQueueMappingUtilsTest { Set targetBrokers = buildTargetBrokers(2 * i); Set nonTargetBrokers = buildTargetBrokers(2 * i, "test"); queueNum = 10 * i; - TopicRemappingDetailWrapper wrapper = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, nonTargetBrokers, brokerConfigMap); + TopicRemappingDetailWrapper wrapper = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, brokerConfigMap); Assert.assertEquals(wrapper.getBrokerConfigMap(), brokerConfigMap); - Assert.assertEquals(4 * i, brokerConfigMap.size()); + Assert.assertEquals(2 * i, brokerConfigMap.size()); //do the check manually Map.Entry maxEpochAndNum = TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, brokerConfigMap); @@ -168,7 +168,7 @@ public class TopicQueueMappingUtilsTest { int queueNum = 7; Map brokerConfigMap = new HashMap(); Set originalBrokers = buildTargetBrokers(2); - TopicRemappingDetailWrapper wrapper = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, originalBrokers, new HashSet(), brokerConfigMap); + TopicRemappingDetailWrapper wrapper = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, originalBrokers, brokerConfigMap); Assert.assertEquals(wrapper.getBrokerConfigMap(), brokerConfigMap); Assert.assertEquals(2, brokerConfigMap.size()); @@ -213,7 +213,7 @@ public class TopicQueueMappingUtilsTest { Map brokerConfigMap = new HashMap(); Set originalBrokers = buildTargetBrokers(2); { - TopicRemappingDetailWrapper wrapper = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, originalBrokers, new HashSet(), brokerConfigMap); + TopicRemappingDetailWrapper wrapper = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, originalBrokers, brokerConfigMap); Assert.assertEquals(wrapper.getBrokerConfigMap(), brokerConfigMap); Assert.assertEquals(2, brokerConfigMap.size()); } @@ -234,7 +234,7 @@ public class TopicQueueMappingUtilsTest { int queueNum = 10; Map brokerConfigMap = new HashMap(); Set targetBrokers = buildTargetBrokers(2); - TopicRemappingDetailWrapper wrapper = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, new HashSet(), brokerConfigMap); + TopicRemappingDetailWrapper wrapper = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, brokerConfigMap); Assert.assertEquals(wrapper.getBrokerConfigMap(), brokerConfigMap); Assert.assertEquals(2, brokerConfigMap.size()); TopicConfigAndQueueMapping configMapping = brokerConfigMap.values().iterator().next(); diff --git a/test/src/main/java/org/apache/rocketmq/test/util/MQAdmin.java b/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java similarity index 76% rename from test/src/main/java/org/apache/rocketmq/test/util/MQAdmin.java rename to test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java index 8c0caa6a2711f2fa7f3f596e7f37f9ba567f1c44..9de620504dc3ce560c75e200c4eebb6171f429db 100644 --- a/test/src/main/java/org/apache/rocketmq/test/util/MQAdmin.java +++ b/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java @@ -18,6 +18,7 @@ package org.apache.rocketmq.test.util; import java.util.HashMap; +import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.ForkJoinPool; @@ -25,12 +26,16 @@ import org.apache.log4j.Logger; import org.apache.rocketmq.common.admin.TopicStatsTable; import org.apache.rocketmq.common.protocol.body.ClusterInfo; import org.apache.rocketmq.common.protocol.route.BrokerData; +import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping; +import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils; +import org.apache.rocketmq.common.statictopic.TopicRemappingDetailWrapper; import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.apache.rocketmq.tools.admin.MQAdminUtils; import org.apache.rocketmq.tools.command.CommandUtil; -public class MQAdmin { - private static Logger log = Logger.getLogger(MQAdmin.class); +public class MQAdminTestUtils { + private static Logger log = Logger.getLogger(MQAdminTestUtils.class); public static boolean createTopic(String nameSrvAddr, String clusterName, String topic, int queueNum) { @@ -163,4 +168,23 @@ public class MQAdmin { ForkJoinPool.commonPool().execute(mqAdminExt::shutdown); } + //should only be test, if some middle operation failed, it dose not backup the brokerConfigMap + public static Map createStaticTopic(String topic, int queueNum, Set targetBrokers, DefaultMQAdminExt defaultMQAdminExt) throws Exception { + Map brokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt); + assert brokerConfigMap.isEmpty(); + TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, brokerConfigMap); + MQAdminUtils.completeNoTargetBrokers(brokerConfigMap, defaultMQAdminExt); + MQAdminUtils.updateTopicConfigMappingAll(brokerConfigMap, defaultMQAdminExt, false); + return brokerConfigMap; + } + + //should only be test, if some middle operation failed, it dose not backup the brokerConfigMap + public static void remappingStaticTopic(String topic, Set targetBrokers, DefaultMQAdminExt defaultMQAdminExt) throws Exception { + Map brokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt); + assert !brokerConfigMap.isEmpty(); + TopicRemappingDetailWrapper wrapper = TopicQueueMappingUtils.remappingStaticTopic(topic, brokerConfigMap, targetBrokers); + MQAdminUtils.completeNoTargetBrokers(brokerConfigMap, defaultMQAdminExt); + MQAdminUtils.remappingStaticTopic(topic, wrapper.getBrokerToMapIn(), wrapper.getBrokerToMapOut(), brokerConfigMap, TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE, false, defaultMQAdminExt); + } + } diff --git a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java index 53a7ab3bcb3d840c26f3b15f8219e108c6d80074..ddeee6ee10dcc23fb91259207dc2e8f4eed99d37 100644 --- a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java +++ b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java @@ -31,8 +31,6 @@ import org.apache.log4j.Logger; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.client.consumer.MQPullConsumer; import org.apache.rocketmq.client.consumer.MQPushConsumer; -import org.apache.rocketmq.client.exception.MQClientException; -import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.MQProducer; import org.apache.rocketmq.client.producer.TransactionListener; import org.apache.rocketmq.common.MQVersion; @@ -47,7 +45,7 @@ import org.apache.rocketmq.test.clientinterface.AbstractMQConsumer; import org.apache.rocketmq.test.clientinterface.AbstractMQProducer; import org.apache.rocketmq.test.factory.ConsumerFactory; import org.apache.rocketmq.test.listener.AbstractListener; -import org.apache.rocketmq.test.util.MQAdmin; +import org.apache.rocketmq.test.util.MQAdminTestUtils; import org.apache.rocketmq.test.util.MQRandomUtils; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; import org.apache.rocketmq.tools.admin.MQAdminExt; @@ -122,7 +120,7 @@ public class BaseConf { } public static String initConsumerGroup(String group) { - MQAdmin.createSub(nsAddr, clusterName, group); + MQAdminTestUtils.createSub(nsAddr, clusterName, group); return group; } diff --git a/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java b/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java index 50dc8fc64ab4c5cbee3946301e72d1cd64966c2f..06f079c4d5806a87feecc37470adc66e70d65954 100644 --- a/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java +++ b/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java @@ -34,7 +34,7 @@ import org.apache.rocketmq.namesrv.NamesrvController; import org.apache.rocketmq.remoting.netty.NettyClientConfig; import org.apache.rocketmq.remoting.netty.NettyServerConfig; import org.apache.rocketmq.store.config.MessageStoreConfig; -import org.apache.rocketmq.test.util.MQAdmin; +import org.apache.rocketmq.test.util.MQAdminTestUtils; import org.apache.rocketmq.test.util.TestUtils; public class IntegrationTestBase { @@ -166,7 +166,7 @@ public class IntegrationTestBase { boolean createResult; while (true) { - createResult = MQAdmin.createTopic(nsAddr, clusterName, topic, queueNumbers); + createResult = MQAdminTestUtils.createTopic(nsAddr, clusterName, topic, queueNumbers); if (createResult) { break; } else if (System.currentTimeMillis() - startTime > topicCreateTime) { diff --git a/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java b/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java index c1cc60be99c47fce62cdc3f2e222bbd628cd3242..aeddef6dd697f3beade06f8fe8b42daed19670a4 100644 --- a/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java @@ -1,10 +1,8 @@ package org.apache.rocketmq.test.smoke; import org.apache.log4j.Logger; +import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.MixAll; -import org.apache.rocketmq.common.message.Message; -import org.apache.rocketmq.common.message.MessageClientExt; -import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.body.ClusterInfo; @@ -12,20 +10,20 @@ import org.apache.rocketmq.common.rpc.ClientMetadata; import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping; import org.apache.rocketmq.common.statictopic.TopicQueueMappingOne; import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils; -import org.apache.rocketmq.common.statictopic.TopicRemappingDetailWrapper; import org.apache.rocketmq.test.base.BaseConf; import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer; import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener; +import org.apache.rocketmq.test.util.MQAdminTestUtils; import org.apache.rocketmq.test.util.MQRandomUtils; import org.apache.rocketmq.test.util.VerifyUtils; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.apache.rocketmq.tools.admin.MQAdminUtils; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.FixMethodOrder; import org.junit.Test; -import sun.jvm.hotspot.runtime.aarch64.AARCH64CurrentFrameGuess; import java.util.ArrayList; import java.util.Collection; @@ -45,50 +43,44 @@ public class StaticTopicIT extends BaseConf { private static Logger logger = Logger.getLogger(StaticTopicIT.class); private DefaultMQAdminExt defaultMQAdminExt; - private ClientMetadata clientMetadata; @Before public void setUp() throws Exception { System.setProperty("rocketmq.client.rebalance.waitInterval", "500"); defaultMQAdminExt = getAdmin(nsAddr); waitBrokerRegistered(nsAddr, clusterName); - clientMetadata = new ClientMetadata(); defaultMQAdminExt.start(); - ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo(); - if (clusterInfo == null - || clusterInfo.getClusterAddrTable().isEmpty()) { - throw new RuntimeException("The Cluster info is empty"); - } - clientMetadata.refreshClusterInfo(clusterInfo); } - public Map createStaticTopic(String topic, int queueNum, Set targetBrokers) throws Exception { - Map brokerConfigMap = defaultMQAdminExt.examineTopicConfigAll(clientMetadata, topic); - Assert.assertTrue(brokerConfigMap.isEmpty()); - TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, new HashSet<>(), brokerConfigMap); - Assert.assertEquals(targetBrokers.size(), brokerConfigMap.size()); - //If some succeed, and others fail, it will cause inconsistent data - for (Map.Entry entry : brokerConfigMap.entrySet()) { - String broker = entry.getKey(); - String addr = clientMetadata.findMasterBrokerAddr(broker); - TopicConfigAndQueueMapping configMapping = entry.getValue(); - defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), false); + @Test + public void testNoTargetBrokers() throws Exception { + String topic = "static" + MQRandomUtils.getRandomTopic(); + int queueNum = 10; + { + Set targetBrokers = new HashSet<>(); + targetBrokers.add(broker1Name); + MQAdminTestUtils.createStaticTopic(topic, queueNum, targetBrokers, defaultMQAdminExt); + Map remoteBrokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt); + Assert.assertEquals(2, remoteBrokerConfigMap.size()); + TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, remoteBrokerConfigMap); + Map globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(getMappingDetailFromConfig(remoteBrokerConfigMap.values())), false, true); + Assert.assertEquals(queueNum, globalIdMap.size()); + TopicConfigAndQueueMapping configMapping = remoteBrokerConfigMap.get(broker2Name); + Assert.assertEquals(0, configMapping.getWriteQueueNums()); + Assert.assertEquals(0, configMapping.getReadQueueNums()); + Assert.assertEquals(0, configMapping.getMappingDetail().getHostedQueues().size()); } - return brokerConfigMap; - } - - public void remappingStaticTopic(String topic, Set targetBrokers) throws Exception { - Map brokerConfigMap = defaultMQAdminExt.examineTopicConfigAll(clientMetadata, topic); - Assert.assertFalse(brokerConfigMap.isEmpty()); - TopicRemappingDetailWrapper wrapper = TopicQueueMappingUtils.remappingStaticTopic(topic, brokerConfigMap, targetBrokers); - defaultMQAdminExt.remappingStaticTopic(clientMetadata, topic, wrapper.getBrokerToMapIn(), wrapper.getBrokerToMapOut(), brokerConfigMap, TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE, false); - } - - - - @Test - public void testNonTargetBrokers() { + { + Set targetBrokers = new HashSet<>(); + targetBrokers.add(broker2Name); + MQAdminTestUtils.remappingStaticTopic(topic, targetBrokers, defaultMQAdminExt); + Map remoteBrokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt); + Assert.assertEquals(2, remoteBrokerConfigMap.size()); + TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, remoteBrokerConfigMap); + Map globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(getMappingDetailFromConfig(remoteBrokerConfigMap.values())), false, true); + Assert.assertEquals(queueNum, globalIdMap.size()); + } } @@ -102,10 +94,10 @@ public class StaticTopicIT extends BaseConf { int queueNum = 10; int msgEachQueue = 100; //create static topic - Map localBrokerConfigMap = createStaticTopic(topic, queueNum, getBrokers()); + Map localBrokerConfigMap = MQAdminTestUtils.createStaticTopic(topic, queueNum, getBrokers(), defaultMQAdminExt); //check the static topic config { - Map remoteBrokerConfigMap = defaultMQAdminExt.examineTopicConfigAll(clientMetadata, topic); + Map remoteBrokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt); Assert.assertEquals(2, remoteBrokerConfigMap.size()); for (Map.Entry entry: remoteBrokerConfigMap.entrySet()) { String broker = entry.getKey(); @@ -178,7 +170,7 @@ public class StaticTopicIT extends BaseConf { } @Test - public void testDoubleReadCheck() throws Exception { + public void testDoubleReadCheckConsumerOffset() throws Exception { String topic = "static" + MQRandomUtils.getRandomTopic(); String group = initConsumerGroup(); RMQNormalProducer producer = getProducer(nsAddr, topic); @@ -194,7 +186,7 @@ public class StaticTopicIT extends BaseConf { { Set targetBrokers = new HashSet<>(); targetBrokers.add(broker1Name); - createStaticTopic(topic, queueNum, targetBrokers); + MQAdminTestUtils.createStaticTopic(topic, queueNum, targetBrokers, defaultMQAdminExt); } //produce the messages { @@ -217,7 +209,7 @@ public class StaticTopicIT extends BaseConf { { Set targetBrokers = new HashSet<>(); targetBrokers.add(broker2Name); - remappingStaticTopic(topic, targetBrokers); + MQAdminTestUtils.remappingStaticTopic(topic, targetBrokers, defaultMQAdminExt); } //make the metadata @@ -226,8 +218,8 @@ public class StaticTopicIT extends BaseConf { { producer = getProducer(nsAddr, topic); + ClientMetadata clientMetadata = MQAdminUtils.getBrokerAndTopicMetadata(topic, defaultMQAdminExt); //just refresh the metadata - defaultMQAdminExt.examineTopicConfigAll(clientMetadata, topic); List messageQueueList = producer.getMessageQueue(); for(MessageQueue messageQueue: messageQueueList) { producer.send(msgEachQueue, messageQueue); @@ -276,7 +268,7 @@ public class StaticTopicIT extends BaseConf { { Set targetBrokers = new HashSet<>(); targetBrokers.add(broker1Name); - createStaticTopic(topic, queueNum, targetBrokers); + MQAdminTestUtils.createStaticTopic(topic, queueNum, targetBrokers, defaultMQAdminExt); } //System.out.printf("%s %s\n", broker1Name, clientMetadata.findMasterBrokerAddr(broker1Name)); //System.out.printf("%s %s\n", broker2Name, clientMetadata.findMasterBrokerAddr(broker2Name)); @@ -310,8 +302,8 @@ public class StaticTopicIT extends BaseConf { { Set targetBrokers = new HashSet<>(); targetBrokers.add(broker2Name); - remappingStaticTopic(topic, targetBrokers); - Map remoteBrokerConfigMap = defaultMQAdminExt.examineTopicConfigAll(clientMetadata, topic); + MQAdminTestUtils.remappingStaticTopic(topic, targetBrokers, defaultMQAdminExt); + Map remoteBrokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt); TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, remoteBrokerConfigMap); Map globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(getMappingDetailFromConfig(remoteBrokerConfigMap.values())), false, true); @@ -324,6 +316,7 @@ public class StaticTopicIT extends BaseConf { Thread.sleep(500); producer.setDebug(true); { + ClientMetadata clientMetadata = MQAdminUtils.getBrokerAndTopicMetadata(topic, defaultMQAdminExt); List messageQueueList = producer.getMessageQueue(); for (int i = 0; i < queueNum; i++) { MessageQueue messageQueue = messageQueueList.get(i); diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java index a36948cd90e7efb7ed4a12af717cae9f428bd5b8..5c80e86d55f0b8f685294caa9f163795d328a8dd 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java @@ -219,11 +219,6 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt { return defaultMQAdminExtImpl.examineTopicConfig(addr, topic); } - @Override - public Map examineTopicConfigAll(ClientMetadata clientMetadata, String topic) throws RemotingException, InterruptedException, MQBrokerException { - return this.defaultMQAdminExtImpl.examineTopicConfigAll(clientMetadata, topic); - } - @Override public TopicStatsTable examineTopicStats( String topic) throws RemotingException, MQClientException, InterruptedException, @@ -621,9 +616,5 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt { this.defaultMQAdminExtImpl.createStaticTopic(addr, defaultTopic, topicConfig, mappingDetail, force); } - @Override - public void remappingStaticTopic(ClientMetadata clientMetadata, String topic, Set brokersToMapIn, Set brokersToMapOut, Map brokerConfigMap, int blockSeqSize, boolean force) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { - this.defaultMQAdminExtImpl.remappingStaticTopic(clientMetadata, topic, brokersToMapIn, brokersToMapOut, brokerConfigMap, blockSeqSize, force); - } } diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java index 07c4bf345d0a03f7c4e48accde1ad22d14d128be..7b0450c1ed24a11d9196bedc782461e7e87f3349 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java @@ -1055,139 +1055,6 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { } - @Override - public void remappingStaticTopic(ClientMetadata clientMetadata, String topic, Set brokersToMapIn, Set brokersToMapOut, Map brokerConfigMap, int blockSeqSize, boolean force) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { - for (String broker : brokerConfigMap.keySet()) { - String addr = clientMetadata.findMasterBrokerAddr(broker); - if (addr == null) { - throw new RuntimeException("Can't find addr for broker " + broker); - } - } - // now do the remapping - //Step1: let the new leader can be write without the logicOffset - for (String broker: brokersToMapIn) { - String addr = clientMetadata.findMasterBrokerAddr(broker); - TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker); - createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force); - } - //Step2: forbid the write of old leader - for (String broker: brokersToMapOut) { - String addr = clientMetadata.findMasterBrokerAddr(broker); - TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker); - createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force); - } - //Step3: decide the logic offset - for (String broker: brokersToMapOut) { - String addr = clientMetadata.findMasterBrokerAddr(broker); - TopicStatsTable statsTable = examineTopicStats(addr, topic); - TopicConfigAndQueueMapping mapOutConfig = brokerConfigMap.get(broker); - for (Map.Entry> entry : mapOutConfig.getMappingDetail().getHostedQueues().entrySet()) { - List items = entry.getValue(); - Integer globalId = entry.getKey(); - if (items.size() < 2) { - continue; - } - LogicQueueMappingItem newLeader = items.get(items.size() - 1); - LogicQueueMappingItem oldLeader = items.get(items.size() - 2); - if (newLeader.getLogicOffset() > 0) { - continue; - } - TopicOffset topicOffset = statsTable.getOffsetTable().get(new MessageQueue(topic, oldLeader.getBname(), oldLeader.getQueueId())); - if (topicOffset == null) { - throw new RuntimeException("Cannot get the max offset for old leader " + oldLeader); - } - //TODO check the max offset, will it return -1? - if (topicOffset.getMaxOffset() < oldLeader.getStartOffset()) { - throw new RuntimeException("The max offset is smaller then the start offset " + oldLeader + " " + topicOffset.getMaxOffset()); - } - newLeader.setLogicOffset(TopicQueueMappingUtils.blockSeqRoundUp(oldLeader.computeStaticQueueOffset(topicOffset.getMaxOffset()), blockSeqSize)); - TopicConfigAndQueueMapping mapInConfig = brokerConfigMap.get(newLeader.getBname()); - //fresh the new leader - TopicQueueMappingDetail.putMappingInfo(mapInConfig.getMappingDetail(), globalId, items); - } - } - //Step4: write to the new leader with logic offset - for (String broker: brokersToMapIn) { - String addr = clientMetadata.findMasterBrokerAddr(broker); - TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker); - createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force); - } - //Step5: write the non-target brokers - for (String broker: brokerConfigMap.keySet()) { - if (brokersToMapIn.contains(broker) || brokersToMapOut.contains(broker)) { - continue; - } - String addr = clientMetadata.findMasterBrokerAddr(broker); - TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker); - createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force); - } - } - - @Override - public Map examineTopicConfigAll(ClientMetadata clientMetadata, String topic) throws RemotingException, InterruptedException, MQBrokerException { - Map brokerConfigMap = new HashMap<>(); - boolean getFromBrokers = false; - TopicRouteData routeData = null; - try { - routeData = examineTopicRouteInfo(topic); - } catch (MQClientException exception) { - if (exception.getResponseCode() != ResponseCode.TOPIC_NOT_EXIST) { - throw new MQBrokerException(exception.getResponseCode(), exception.getErrorMessage()); - } else { - getFromBrokers = true; - } - } - if (!getFromBrokers) { - if (routeData != null - && !routeData.getQueueDatas().isEmpty()) { - clientMetadata.freshTopicRoute(topic, routeData); - for (QueueData queueData: routeData.getQueueDatas()) { - String bname = queueData.getBrokerName(); - String addr = clientMetadata.findMasterBrokerAddr(bname); - try { - TopicConfigAndQueueMapping mapping = (TopicConfigAndQueueMapping) examineTopicConfig(addr, topic); - //allow the config is null - if (mapping != null) { - brokerConfigMap.put(bname, mapping); - } - } catch (MQBrokerException exception) { - if (exception.getResponseCode() != ResponseCode.TOPIC_NOT_EXIST) { - throw exception; - } - } - - } - } - } else { - log.info("The topic {} dose not exist in nameserver, so check it from all brokers", topic); - //if cannot get from nameserver, then check all the brokers - ClusterInfo clusterInfo = examineBrokerClusterInfo(); - if (clusterInfo != null - && clusterInfo.getBrokerAddrTable() != null) { - clientMetadata.refreshClusterInfo(clusterInfo); - } - for (Entry> entry : clientMetadata.getBrokerAddrTable().entrySet()) { - String bname = entry.getKey(); - HashMap map = entry.getValue(); - String addr = map.get(MixAll.MASTER_ID); - if (addr != null) { - try { - TopicConfigAndQueueMapping mapping = (TopicConfigAndQueueMapping) examineTopicConfig(addr, topic); - //allow the config is null - if (mapping != null) { - brokerConfigMap.put(bname, mapping); - } - } catch (MQBrokerException exception1) { - if (exception1.getResponseCode() != ResponseCode.TOPIC_NOT_EXIST) { - throw exception1; - } - } - } - } - } - return brokerConfigMap; - } - @Override public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException { return this.mqClientInstance.getMQAdminImpl().searchOffset(mq, timestamp); diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java index 6c274438c3ed073376a8939008be6e67e58a7b05..c4838e3b7f651ddb3f890fc64a0c4a23b9523f92 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java @@ -320,7 +320,4 @@ public interface MQAdminExt extends MQAdmin { void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail mappingDetail, final boolean force) throws RemotingException, InterruptedException, MQBrokerException; - Map examineTopicConfigAll(ClientMetadata clientMetadata, String topic) throws RemotingException, InterruptedException, MQBrokerException; - - void remappingStaticTopic(ClientMetadata clientMetadata, String topic, Set brokersToMapIn, Set brokersToMapOut, Map brokerConfigMap, int blockSeqSize, boolean force) throws RemotingException, MQBrokerException, InterruptedException, MQClientException; } diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminUtils.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminUtils.java new file mode 100644 index 0000000000000000000000000000000000000000..288bac466ffd7a8e5180692d295c90ad659b2cfa --- /dev/null +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminUtils.java @@ -0,0 +1,254 @@ +package org.apache.rocketmq.tools.admin; + +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.common.admin.TopicOffset; +import org.apache.rocketmq.common.admin.TopicStatsTable; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.protocol.ResponseCode; +import org.apache.rocketmq.common.protocol.body.ClusterInfo; +import org.apache.rocketmq.common.protocol.route.QueueData; +import org.apache.rocketmq.common.protocol.route.TopicRouteData; +import org.apache.rocketmq.common.rpc.ClientMetadata; +import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem; +import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping; +import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail; +import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils; +import org.apache.rocketmq.common.statictopic.TopicRemappingDetailWrapper; +import org.apache.rocketmq.remoting.exception.RemotingConnectException; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; +import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; + +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class MQAdminUtils { + + + public static ClientMetadata getBrokerMetadata(DefaultMQAdminExt defaultMQAdminExt) throws InterruptedException, RemotingConnectException, RemotingTimeoutException, RemotingSendRequestException, MQBrokerException { + ClientMetadata clientMetadata = new ClientMetadata(); + refreshClusterInfo(defaultMQAdminExt, clientMetadata); + return clientMetadata; + } + + public static ClientMetadata getBrokerAndTopicMetadata(String topic, DefaultMQAdminExt defaultMQAdminExt) throws InterruptedException, RemotingException, MQBrokerException { + ClientMetadata clientMetadata = new ClientMetadata(); + refreshClusterInfo(defaultMQAdminExt, clientMetadata); + refreshTopicRouteInfo(topic, defaultMQAdminExt, clientMetadata); + return clientMetadata; + } + + public static void refreshClusterInfo(DefaultMQAdminExt defaultMQAdminExt, ClientMetadata clientMetadata) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException { + ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo(); + if (clusterInfo == null + || clusterInfo.getClusterAddrTable().isEmpty()) { + throw new RuntimeException("The Cluster info is empty"); + } + clientMetadata.refreshClusterInfo(clusterInfo); + } + + public static void refreshTopicRouteInfo(String topic, DefaultMQAdminExt defaultMQAdminExt, ClientMetadata clientMetadata) throws RemotingException, InterruptedException, MQBrokerException { + TopicRouteData routeData = null; + try { + routeData = defaultMQAdminExt.examineTopicRouteInfo(topic); + } catch (MQClientException exception) { + if (exception.getResponseCode() != ResponseCode.TOPIC_NOT_EXIST) { + throw new MQBrokerException(exception.getResponseCode(), exception.getErrorMessage()); + } + } + if (routeData != null + && !routeData.getQueueDatas().isEmpty()) { + clientMetadata.freshTopicRoute(topic, routeData); + } + } + + public static Set getAllBrokersInSameCluster(Collection brokers, DefaultMQAdminExt defaultMQAdminExt) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException { + ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo(); + if (clusterInfo == null + || clusterInfo.getClusterAddrTable().isEmpty()) { + throw new RuntimeException("The Cluster info is empty"); + } + Set allBrokers = new HashSet<>(); + for (String broker: brokers) { + if (allBrokers.contains(broker)) { + continue; + } + for (Set clusterBrokers : clusterInfo.getClusterAddrTable().values()) { + if (clusterBrokers.contains(broker)) { + allBrokers.addAll(clusterBrokers); + break; + } + } + } + return allBrokers; + } + + public static void completeNoTargetBrokers(Map brokerConfigMap, DefaultMQAdminExt defaultMQAdminExt) throws InterruptedException, RemotingConnectException, RemotingTimeoutException, RemotingSendRequestException, MQBrokerException { + TopicConfigAndQueueMapping configMapping = brokerConfigMap.values().iterator().next(); + String topic = configMapping.getTopicName(); + int queueNum = configMapping.getMappingDetail().getTotalQueues(); + long newEpoch = configMapping.getMappingDetail().getEpoch(); + Set allBrokers = getAllBrokersInSameCluster(brokerConfigMap.keySet(), defaultMQAdminExt); + for (String broker: allBrokers) { + if (!brokerConfigMap.containsKey(broker)) { + brokerConfigMap.put(broker, new TopicConfigAndQueueMapping(new TopicConfig(topic, 0, 0), new TopicQueueMappingDetail(topic, queueNum, broker, newEpoch))); + } + } + } + + public static void checkIfMasterAlive(Collection brokers, DefaultMQAdminExt defaultMQAdminExt, ClientMetadata clientMetadata) { + for (String broker : brokers) { + String addr = clientMetadata.findMasterBrokerAddr(broker); + if (addr == null) { + throw new RuntimeException("Can't find addr for broker " + broker); + } + } + } + + public static void updateTopicConfigMappingAll(Map brokerConfigMap, DefaultMQAdminExt defaultMQAdminExt, boolean force) throws Exception { + ClientMetadata clientMetadata = getBrokerMetadata(defaultMQAdminExt); + checkIfMasterAlive(brokerConfigMap.keySet(), defaultMQAdminExt, clientMetadata); + //If some succeed, and others fail, it will cause inconsistent data + for (Map.Entry entry : brokerConfigMap.entrySet()) { + String broker = entry.getKey(); + String addr = clientMetadata.findMasterBrokerAddr(broker); + TopicConfigAndQueueMapping configMapping = entry.getValue(); + defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force); + } + } + + public static void remappingStaticTopic(String topic, Set brokersToMapIn, Set brokersToMapOut, Map brokerConfigMap, int blockSeqSize, boolean force, DefaultMQAdminExt defaultMQAdminExt) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + + ClientMetadata clientMetadata = MQAdminUtils.getBrokerMetadata(defaultMQAdminExt); + MQAdminUtils.checkIfMasterAlive(brokerConfigMap.keySet(), defaultMQAdminExt, clientMetadata); + // now do the remapping + //Step1: let the new leader can be write without the logicOffset + for (String broker: brokersToMapIn) { + String addr = clientMetadata.findMasterBrokerAddr(broker); + TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker); + defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force); + } + //Step2: forbid the write of old leader + for (String broker: brokersToMapOut) { + String addr = clientMetadata.findMasterBrokerAddr(broker); + TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker); + defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force); + } + //Step3: decide the logic offset + for (String broker: brokersToMapOut) { + String addr = clientMetadata.findMasterBrokerAddr(broker); + TopicStatsTable statsTable = defaultMQAdminExt.examineTopicStats(addr, topic); + TopicConfigAndQueueMapping mapOutConfig = brokerConfigMap.get(broker); + for (Map.Entry> entry : mapOutConfig.getMappingDetail().getHostedQueues().entrySet()) { + List items = entry.getValue(); + Integer globalId = entry.getKey(); + if (items.size() < 2) { + continue; + } + LogicQueueMappingItem newLeader = items.get(items.size() - 1); + LogicQueueMappingItem oldLeader = items.get(items.size() - 2); + if (newLeader.getLogicOffset() > 0) { + continue; + } + TopicOffset topicOffset = statsTable.getOffsetTable().get(new MessageQueue(topic, oldLeader.getBname(), oldLeader.getQueueId())); + if (topicOffset == null) { + throw new RuntimeException("Cannot get the max offset for old leader " + oldLeader); + } + //TODO check the max offset, will it return -1? + if (topicOffset.getMaxOffset() < oldLeader.getStartOffset()) { + throw new RuntimeException("The max offset is smaller then the start offset " + oldLeader + " " + topicOffset.getMaxOffset()); + } + newLeader.setLogicOffset(TopicQueueMappingUtils.blockSeqRoundUp(oldLeader.computeStaticQueueOffset(topicOffset.getMaxOffset()), blockSeqSize)); + TopicConfigAndQueueMapping mapInConfig = brokerConfigMap.get(newLeader.getBname()); + //fresh the new leader + TopicQueueMappingDetail.putMappingInfo(mapInConfig.getMappingDetail(), globalId, items); + } + } + //Step4: write to the new leader with logic offset + for (String broker: brokersToMapIn) { + String addr = clientMetadata.findMasterBrokerAddr(broker); + TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker); + defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force); + } + //Step5: write the non-target brokers + for (String broker: brokerConfigMap.keySet()) { + if (brokersToMapIn.contains(broker) || brokersToMapOut.contains(broker)) { + continue; + } + String addr = clientMetadata.findMasterBrokerAddr(broker); + TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker); + defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force); + } + } + + public static Map examineTopicConfigAll(String topic, DefaultMQAdminExt defaultMQAdminExt) throws RemotingException, InterruptedException, MQBrokerException { + Map brokerConfigMap = new HashMap<>(); + ClientMetadata clientMetadata = new ClientMetadata(); + boolean getFromBrokers = false; + TopicRouteData routeData = null; + try { + routeData = defaultMQAdminExt.examineTopicRouteInfo(topic); + } catch (MQClientException exception) { + if (exception.getResponseCode() != ResponseCode.TOPIC_NOT_EXIST) { + throw new MQBrokerException(exception.getResponseCode(), exception.getErrorMessage()); + } else { + getFromBrokers = true; + } + } + if (!getFromBrokers) { + if (routeData != null + && !routeData.getQueueDatas().isEmpty()) { + clientMetadata.freshTopicRoute(topic, routeData); + for (QueueData queueData: routeData.getQueueDatas()) { + String bname = queueData.getBrokerName(); + String addr = clientMetadata.findMasterBrokerAddr(bname); + try { + TopicConfigAndQueueMapping mapping = (TopicConfigAndQueueMapping) defaultMQAdminExt.examineTopicConfig(addr, topic); + //allow the config is null + if (mapping != null) { + brokerConfigMap.put(bname, mapping); + } + } catch (MQBrokerException exception) { + if (exception.getResponseCode() != ResponseCode.TOPIC_NOT_EXIST) { + throw exception; + } + } + + } + } + } else { + //if cannot get from nameserver, then check all the brokers + ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo(); + if (clusterInfo != null + && clusterInfo.getBrokerAddrTable() != null) { + clientMetadata.refreshClusterInfo(clusterInfo); + } + for (Map.Entry> entry : clientMetadata.getBrokerAddrTable().entrySet()) { + String bname = entry.getKey(); + HashMap map = entry.getValue(); + String addr = map.get(MixAll.MASTER_ID); + if (addr != null) { + try { + TopicConfigAndQueueMapping mapping = (TopicConfigAndQueueMapping) defaultMQAdminExt.examineTopicConfig(addr, topic); + //allow the config is null + if (mapping != null) { + brokerConfigMap.put(bname, mapping); + } + } catch (MQBrokerException exception1) { + if (exception1.getResponseCode() != ResponseCode.TOPIC_NOT_EXIST) { + throw exception1; + } + } + } + } + } + return brokerConfigMap; + } +} diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java index b516f1c19ea93479d1af686152eadddd3913e284..9a6f15a2cb07fc2fded5a64f9dc32ac15e280f07 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java @@ -20,6 +20,7 @@ import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; import org.apache.commons.cli.OptionGroup; import org.apache.commons.cli.Options; +import org.apache.rocketmq.client.MQAdmin; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping; import org.apache.rocketmq.common.statictopic.TopicQueueMappingOne; @@ -30,6 +31,7 @@ import org.apache.rocketmq.common.statictopic.TopicRemappingDetailWrapper; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.srvutil.ServerUtil; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.apache.rocketmq.tools.admin.MQAdminUtils; import org.apache.rocketmq.tools.command.SubCommand; import org.apache.rocketmq.tools.command.SubCommandException; @@ -87,7 +89,6 @@ public class RemappingStaticTopicSubCommand implements SubCommand { DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); - ClientMetadata clientMetadata = new ClientMetadata(); try { String topic = commandLine.getOptionValue('t').trim(); @@ -99,24 +100,12 @@ public class RemappingStaticTopicSubCommand implements SubCommand { TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, wrapper.getBrokerConfigMap()); TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(TopicQueueMappingUtils.getMappingDetailFromConfig(wrapper.getBrokerConfigMap().values())), false, true); - ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo(); - if (clusterInfo == null - || clusterInfo.getClusterAddrTable().isEmpty()) { - throw new RuntimeException("The Cluster info is empty"); - } - clientMetadata.refreshClusterInfo(clusterInfo); boolean force = false; if (commandLine.hasOption("fr") && Boolean.parseBoolean(commandLine.getOptionValue("fr").trim())) { force = true; } - for (String broker : wrapper.getBrokerConfigMap().keySet()) { - String addr = clientMetadata.findMasterBrokerAddr(broker); - if (addr == null) { - throw new RuntimeException("Can't find addr for broker " + broker); - } - } - defaultMQAdminExt.remappingStaticTopic(clientMetadata, topic, wrapper.getBrokerToMapIn(), wrapper.getBrokerToMapOut(), wrapper.getBrokerConfigMap(), 10000, force); + MQAdminUtils.remappingStaticTopic(topic, wrapper.getBrokerToMapIn(), wrapper.getBrokerToMapOut(), wrapper.getBrokerConfigMap(), 10000, force, defaultMQAdminExt); return; }catch (Exception e) { throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e); @@ -159,7 +148,6 @@ public class RemappingStaticTopicSubCommand implements SubCommand { || clusterInfo.getClusterAddrTable().isEmpty()) { throw new RuntimeException("The Cluster info is empty"); } - clientMetadata.refreshClusterInfo(clusterInfo); { if (commandLine.hasOption("b")) { String brokerStrs = commandLine.getOptionValue("b").trim(); @@ -186,7 +174,7 @@ public class RemappingStaticTopicSubCommand implements SubCommand { } } - brokerConfigMap = defaultMQAdminExt.examineTopicConfigAll(clientMetadata, topic); + brokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt); if (brokerConfigMap.isEmpty()) { throw new RuntimeException("No topic route to do the remapping"); } @@ -204,7 +192,9 @@ public class RemappingStaticTopicSubCommand implements SubCommand { System.out.println("The old mapping data is written to file " + newMappingDataFile); } - defaultMQAdminExt.remappingStaticTopic(clientMetadata, topic, newWrapper.getBrokerToMapIn(), newWrapper.getBrokerToMapOut(), newWrapper.getBrokerConfigMap(), 10000, false); + MQAdminUtils.completeNoTargetBrokers(newWrapper.getBrokerConfigMap(), defaultMQAdminExt); + + MQAdminUtils.remappingStaticTopic(topic, newWrapper.getBrokerToMapIn(), newWrapper.getBrokerToMapOut(), newWrapper.getBrokerConfigMap(), 10000, false, defaultMQAdminExt); } catch (Exception e) { throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e); diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java index a32dc8d08141aa2dd59abb4b13867f8d295af554..958922ecb73a992423247fd72bcbc7fe47f67c0f 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java @@ -29,6 +29,7 @@ import org.apache.rocketmq.common.statictopic.TopicRemappingDetailWrapper; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.srvutil.ServerUtil; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.apache.rocketmq.tools.admin.MQAdminUtils; import org.apache.rocketmq.tools.command.SubCommand; import org.apache.rocketmq.tools.command.SubCommandException; @@ -106,14 +107,8 @@ public class UpdateStaticTopicSubCommand implements SubCommand { } TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(TopicQueueMappingUtils.getMappingDetailFromConfig(wrapper.getBrokerConfigMap().values())), false, true); - ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo(); - if (clusterInfo == null - || clusterInfo.getClusterAddrTable().isEmpty()) { - throw new RuntimeException("The Cluster info is empty"); - } - clientMetadata.refreshClusterInfo(clusterInfo); - - doUpdate(wrapper.getBrokerConfigMap(), clientMetadata, defaultMQAdminExt, force); + MQAdminUtils.completeNoTargetBrokers(wrapper.getBrokerConfigMap(), defaultMQAdminExt); + MQAdminUtils.updateTopicConfigMappingAll(wrapper.getBrokerConfigMap(), defaultMQAdminExt, false); return; }catch (Exception e) { throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e); @@ -122,22 +117,6 @@ public class UpdateStaticTopicSubCommand implements SubCommand { } } - public void doUpdate(Map brokerConfigMap, ClientMetadata clientMetadata, DefaultMQAdminExt defaultMQAdminExt, boolean force) throws Exception { - //check it before - for (String broker : brokerConfigMap.keySet()) { - String addr = clientMetadata.findMasterBrokerAddr(broker); - if (addr == null) { - throw new RuntimeException("Can't find addr for broker " + broker); - } - } - //If some succeed, and others fail, it will cause inconsistent data - for (Map.Entry entry : brokerConfigMap.entrySet()) { - String broker = entry.getKey(); - String addr = clientMetadata.findMasterBrokerAddr(broker); - TopicConfigAndQueueMapping configMapping = entry.getValue(); - defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force); - } - } @Override @@ -155,7 +134,6 @@ public class UpdateStaticTopicSubCommand implements SubCommand { DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); - ClientMetadata clientMetadata = new ClientMetadata(); Map brokerConfigMap = new HashMap<>(); Set targetBrokers = new HashSet<>(); @@ -173,7 +151,6 @@ public class UpdateStaticTopicSubCommand implements SubCommand { || clusterInfo.getClusterAddrTable().isEmpty()) { throw new RuntimeException("The Cluster info is empty"); } - clientMetadata.refreshClusterInfo(clusterInfo); { if (commandLine.hasOption("b")) { String brokerStrs = commandLine.getOptionValue("b").trim(); @@ -192,17 +169,11 @@ public class UpdateStaticTopicSubCommand implements SubCommand { if (targetBrokers.isEmpty()) { throw new RuntimeException("Find none brokers, do nothing"); } - for (String broker : targetBrokers) { - String addr = clientMetadata.findMasterBrokerAddr(broker); - if (addr == null) { - throw new RuntimeException("Can't find addr for broker " + broker); - } - } } //get the existed topic config and mapping - brokerConfigMap = defaultMQAdminExt.examineTopicConfigAll(clientMetadata, topic); + brokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt); int queueNum = Integer.parseInt(commandLine.getOptionValue("qn").trim()); Map.Entry maxEpochAndNum = new AbstractMap.SimpleImmutableEntry<>(System.currentTimeMillis(), queueNum); @@ -219,15 +190,15 @@ public class UpdateStaticTopicSubCommand implements SubCommand { targetBrokers.addAll(brokerConfigMap.keySet()); //calculate the new data - TopicRemappingDetailWrapper newWrapper = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, new HashSet<>(), brokerConfigMap); + TopicRemappingDetailWrapper newWrapper = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, brokerConfigMap); { String newMappingDataFile = TopicQueueMappingUtils.writeToTemp(newWrapper, true); System.out.println("The new mapping data is written to file " + newMappingDataFile); } - doUpdate(newWrapper.getBrokerConfigMap(), clientMetadata, defaultMQAdminExt, false); - + MQAdminUtils.completeNoTargetBrokers(brokerConfigMap, defaultMQAdminExt); + MQAdminUtils.updateTopicConfigMappingAll(brokerConfigMap, defaultMQAdminExt, false); } catch (Exception e) { throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e); } finally {