提交 2dd65949 编写于 作者: D dongeforever

Refactor the admin code, reduce exposing apis

上级 5b777932
...@@ -422,9 +422,8 @@ public class TopicQueueMappingUtils { ...@@ -422,9 +422,8 @@ public class TopicQueueMappingUtils {
} }
} }
public static TopicRemappingDetailWrapper createTopicConfigMapping(String topic, int queueNum, Set<String> targetBrokers, Set<String> nonTargetBrokers, Map<String, TopicConfigAndQueueMapping> brokerConfigMap) { public static TopicRemappingDetailWrapper createTopicConfigMapping(String topic, int queueNum, Set<String> targetBrokers, Map<String, TopicConfigAndQueueMapping> brokerConfigMap) {
checkTargetBrokersComplete(targetBrokers, brokerConfigMap); checkTargetBrokersComplete(targetBrokers, brokerConfigMap);
checkNonTargetBrokers(targetBrokers, nonTargetBrokers);
Map<Integer, TopicQueueMappingOne> globalIdMap = new HashMap<Integer, TopicQueueMappingOne>(); Map<Integer, TopicQueueMappingOne> globalIdMap = new HashMap<Integer, TopicQueueMappingOne>();
Map.Entry<Long, Integer> maxEpochAndNum = new AbstractMap.SimpleImmutableEntry<Long, Integer>(System.currentTimeMillis(), queueNum); Map.Entry<Long, Integer> maxEpochAndNum = new AbstractMap.SimpleImmutableEntry<Long, Integer>(System.currentTimeMillis(), queueNum);
if (!brokerConfigMap.isEmpty()) { if (!brokerConfigMap.isEmpty()) {
...@@ -484,12 +483,6 @@ public class TopicQueueMappingUtils { ...@@ -484,12 +483,6 @@ public class TopicQueueMappingUtils {
TopicQueueMappingDetail.putMappingInfo(configMapping.getMappingDetail(), queueId, new ArrayList<LogicQueueMappingItem>(Collections.singletonList(mappingItem))); TopicQueueMappingDetail.putMappingInfo(configMapping.getMappingDetail(), queueId, new ArrayList<LogicQueueMappingItem>(Collections.singletonList(mappingItem)));
} }
//set the non target brokers
for (String broker : nonTargetBrokers) {
if (!brokerConfigMap.containsKey(broker)) {
brokerConfigMap.put(broker, new TopicConfigAndQueueMapping(new TopicConfig(topic, 0, 0), new TopicQueueMappingDetail(topic, queueNum, broker, newEpoch)));
}
}
// set the topic config // set the topic config
for (Map.Entry<String, TopicConfigAndQueueMapping> entry : brokerConfigMap.entrySet()) { for (Map.Entry<String, TopicConfigAndQueueMapping> entry : brokerConfigMap.entrySet()) {
TopicConfigAndQueueMapping configMapping = entry.getValue(); TopicConfigAndQueueMapping configMapping = entry.getValue();
......
...@@ -128,9 +128,9 @@ public class TopicQueueMappingUtilsTest { ...@@ -128,9 +128,9 @@ public class TopicQueueMappingUtilsTest {
Set<String> targetBrokers = buildTargetBrokers(2 * i); Set<String> targetBrokers = buildTargetBrokers(2 * i);
Set<String> nonTargetBrokers = buildTargetBrokers(2 * i, "test"); Set<String> nonTargetBrokers = buildTargetBrokers(2 * i, "test");
queueNum = 10 * i; queueNum = 10 * i;
TopicRemappingDetailWrapper wrapper = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, nonTargetBrokers, brokerConfigMap); TopicRemappingDetailWrapper wrapper = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, brokerConfigMap);
Assert.assertEquals(wrapper.getBrokerConfigMap(), brokerConfigMap); Assert.assertEquals(wrapper.getBrokerConfigMap(), brokerConfigMap);
Assert.assertEquals(4 * i, brokerConfigMap.size()); Assert.assertEquals(2 * i, brokerConfigMap.size());
//do the check manually //do the check manually
Map.Entry<Long, Integer> maxEpochAndNum = TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, brokerConfigMap); Map.Entry<Long, Integer> maxEpochAndNum = TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, brokerConfigMap);
...@@ -168,7 +168,7 @@ public class TopicQueueMappingUtilsTest { ...@@ -168,7 +168,7 @@ public class TopicQueueMappingUtilsTest {
int queueNum = 7; int queueNum = 7;
Map<String, TopicConfigAndQueueMapping> brokerConfigMap = new HashMap<String, TopicConfigAndQueueMapping>(); Map<String, TopicConfigAndQueueMapping> brokerConfigMap = new HashMap<String, TopicConfigAndQueueMapping>();
Set<String> originalBrokers = buildTargetBrokers(2); Set<String> originalBrokers = buildTargetBrokers(2);
TopicRemappingDetailWrapper wrapper = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, originalBrokers, new HashSet<String>(), brokerConfigMap); TopicRemappingDetailWrapper wrapper = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, originalBrokers, brokerConfigMap);
Assert.assertEquals(wrapper.getBrokerConfigMap(), brokerConfigMap); Assert.assertEquals(wrapper.getBrokerConfigMap(), brokerConfigMap);
Assert.assertEquals(2, brokerConfigMap.size()); Assert.assertEquals(2, brokerConfigMap.size());
...@@ -213,7 +213,7 @@ public class TopicQueueMappingUtilsTest { ...@@ -213,7 +213,7 @@ public class TopicQueueMappingUtilsTest {
Map<String, TopicConfigAndQueueMapping> brokerConfigMap = new HashMap<String, TopicConfigAndQueueMapping>(); Map<String, TopicConfigAndQueueMapping> brokerConfigMap = new HashMap<String, TopicConfigAndQueueMapping>();
Set<String> originalBrokers = buildTargetBrokers(2); Set<String> originalBrokers = buildTargetBrokers(2);
{ {
TopicRemappingDetailWrapper wrapper = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, originalBrokers, new HashSet<String>(), brokerConfigMap); TopicRemappingDetailWrapper wrapper = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, originalBrokers, brokerConfigMap);
Assert.assertEquals(wrapper.getBrokerConfigMap(), brokerConfigMap); Assert.assertEquals(wrapper.getBrokerConfigMap(), brokerConfigMap);
Assert.assertEquals(2, brokerConfigMap.size()); Assert.assertEquals(2, brokerConfigMap.size());
} }
...@@ -234,7 +234,7 @@ public class TopicQueueMappingUtilsTest { ...@@ -234,7 +234,7 @@ public class TopicQueueMappingUtilsTest {
int queueNum = 10; int queueNum = 10;
Map<String, TopicConfigAndQueueMapping> brokerConfigMap = new HashMap<String, TopicConfigAndQueueMapping>(); Map<String, TopicConfigAndQueueMapping> brokerConfigMap = new HashMap<String, TopicConfigAndQueueMapping>();
Set<String> targetBrokers = buildTargetBrokers(2); Set<String> targetBrokers = buildTargetBrokers(2);
TopicRemappingDetailWrapper wrapper = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, new HashSet<String>(), brokerConfigMap); TopicRemappingDetailWrapper wrapper = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, brokerConfigMap);
Assert.assertEquals(wrapper.getBrokerConfigMap(), brokerConfigMap); Assert.assertEquals(wrapper.getBrokerConfigMap(), brokerConfigMap);
Assert.assertEquals(2, brokerConfigMap.size()); Assert.assertEquals(2, brokerConfigMap.size());
TopicConfigAndQueueMapping configMapping = brokerConfigMap.values().iterator().next(); TopicConfigAndQueueMapping configMapping = brokerConfigMap.values().iterator().next();
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
package org.apache.rocketmq.test.util; package org.apache.rocketmq.test.util;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinPool;
...@@ -25,12 +26,16 @@ import org.apache.log4j.Logger; ...@@ -25,12 +26,16 @@ import org.apache.log4j.Logger;
import org.apache.rocketmq.common.admin.TopicStatsTable; import org.apache.rocketmq.common.admin.TopicStatsTable;
import org.apache.rocketmq.common.protocol.body.ClusterInfo; import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.route.BrokerData; import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
import org.apache.rocketmq.common.statictopic.TopicRemappingDetailWrapper;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.admin.MQAdminUtils;
import org.apache.rocketmq.tools.command.CommandUtil; import org.apache.rocketmq.tools.command.CommandUtil;
public class MQAdmin { public class MQAdminTestUtils {
private static Logger log = Logger.getLogger(MQAdmin.class); private static Logger log = Logger.getLogger(MQAdminTestUtils.class);
public static boolean createTopic(String nameSrvAddr, String clusterName, String topic, public static boolean createTopic(String nameSrvAddr, String clusterName, String topic,
int queueNum) { int queueNum) {
...@@ -163,4 +168,23 @@ public class MQAdmin { ...@@ -163,4 +168,23 @@ public class MQAdmin {
ForkJoinPool.commonPool().execute(mqAdminExt::shutdown); ForkJoinPool.commonPool().execute(mqAdminExt::shutdown);
} }
//should only be test, if some middle operation failed, it dose not backup the brokerConfigMap
public static Map<String, TopicConfigAndQueueMapping> createStaticTopic(String topic, int queueNum, Set<String> targetBrokers, DefaultMQAdminExt defaultMQAdminExt) throws Exception {
Map<String, TopicConfigAndQueueMapping> brokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt);
assert brokerConfigMap.isEmpty();
TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, brokerConfigMap);
MQAdminUtils.completeNoTargetBrokers(brokerConfigMap, defaultMQAdminExt);
MQAdminUtils.updateTopicConfigMappingAll(brokerConfigMap, defaultMQAdminExt, false);
return brokerConfigMap;
}
//should only be test, if some middle operation failed, it dose not backup the brokerConfigMap
public static void remappingStaticTopic(String topic, Set<String> targetBrokers, DefaultMQAdminExt defaultMQAdminExt) throws Exception {
Map<String, TopicConfigAndQueueMapping> brokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt);
assert !brokerConfigMap.isEmpty();
TopicRemappingDetailWrapper wrapper = TopicQueueMappingUtils.remappingStaticTopic(topic, brokerConfigMap, targetBrokers);
MQAdminUtils.completeNoTargetBrokers(brokerConfigMap, defaultMQAdminExt);
MQAdminUtils.remappingStaticTopic(topic, wrapper.getBrokerToMapIn(), wrapper.getBrokerToMapOut(), brokerConfigMap, TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE, false, defaultMQAdminExt);
}
} }
...@@ -31,8 +31,6 @@ import org.apache.log4j.Logger; ...@@ -31,8 +31,6 @@ import org.apache.log4j.Logger;
import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.client.consumer.MQPullConsumer; import org.apache.rocketmq.client.consumer.MQPullConsumer;
import org.apache.rocketmq.client.consumer.MQPushConsumer; import org.apache.rocketmq.client.consumer.MQPushConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MQProducer; import org.apache.rocketmq.client.producer.MQProducer;
import org.apache.rocketmq.client.producer.TransactionListener; import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.MQVersion;
...@@ -47,7 +45,7 @@ import org.apache.rocketmq.test.clientinterface.AbstractMQConsumer; ...@@ -47,7 +45,7 @@ import org.apache.rocketmq.test.clientinterface.AbstractMQConsumer;
import org.apache.rocketmq.test.clientinterface.AbstractMQProducer; import org.apache.rocketmq.test.clientinterface.AbstractMQProducer;
import org.apache.rocketmq.test.factory.ConsumerFactory; import org.apache.rocketmq.test.factory.ConsumerFactory;
import org.apache.rocketmq.test.listener.AbstractListener; import org.apache.rocketmq.test.listener.AbstractListener;
import org.apache.rocketmq.test.util.MQAdmin; import org.apache.rocketmq.test.util.MQAdminTestUtils;
import org.apache.rocketmq.test.util.MQRandomUtils; import org.apache.rocketmq.test.util.MQRandomUtils;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.admin.MQAdminExt; import org.apache.rocketmq.tools.admin.MQAdminExt;
...@@ -122,7 +120,7 @@ public class BaseConf { ...@@ -122,7 +120,7 @@ public class BaseConf {
} }
public static String initConsumerGroup(String group) { public static String initConsumerGroup(String group) {
MQAdmin.createSub(nsAddr, clusterName, group); MQAdminTestUtils.createSub(nsAddr, clusterName, group);
return group; return group;
} }
......
...@@ -34,7 +34,7 @@ import org.apache.rocketmq.namesrv.NamesrvController; ...@@ -34,7 +34,7 @@ import org.apache.rocketmq.namesrv.NamesrvController;
import org.apache.rocketmq.remoting.netty.NettyClientConfig; import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyServerConfig; import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.test.util.MQAdmin; import org.apache.rocketmq.test.util.MQAdminTestUtils;
import org.apache.rocketmq.test.util.TestUtils; import org.apache.rocketmq.test.util.TestUtils;
public class IntegrationTestBase { public class IntegrationTestBase {
...@@ -166,7 +166,7 @@ public class IntegrationTestBase { ...@@ -166,7 +166,7 @@ public class IntegrationTestBase {
boolean createResult; boolean createResult;
while (true) { while (true) {
createResult = MQAdmin.createTopic(nsAddr, clusterName, topic, queueNumbers); createResult = MQAdminTestUtils.createTopic(nsAddr, clusterName, topic, queueNumbers);
if (createResult) { if (createResult) {
break; break;
} else if (System.currentTimeMillis() - startTime > topicCreateTime) { } else if (System.currentTimeMillis() - startTime > topicCreateTime) {
......
package org.apache.rocketmq.test.smoke; package org.apache.rocketmq.test.smoke;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageClientExt;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.body.ClusterInfo; import org.apache.rocketmq.common.protocol.body.ClusterInfo;
...@@ -12,20 +10,20 @@ import org.apache.rocketmq.common.rpc.ClientMetadata; ...@@ -12,20 +10,20 @@ import org.apache.rocketmq.common.rpc.ClientMetadata;
import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping; import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingOne; import org.apache.rocketmq.common.statictopic.TopicQueueMappingOne;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils; import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
import org.apache.rocketmq.common.statictopic.TopicRemappingDetailWrapper;
import org.apache.rocketmq.test.base.BaseConf; import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer; import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener; import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.MQAdminTestUtils;
import org.apache.rocketmq.test.util.MQRandomUtils; import org.apache.rocketmq.test.util.MQRandomUtils;
import org.apache.rocketmq.test.util.VerifyUtils; import org.apache.rocketmq.test.util.VerifyUtils;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.admin.MQAdminUtils;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.FixMethodOrder; import org.junit.FixMethodOrder;
import org.junit.Test; import org.junit.Test;
import sun.jvm.hotspot.runtime.aarch64.AARCH64CurrentFrameGuess;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
...@@ -45,50 +43,44 @@ public class StaticTopicIT extends BaseConf { ...@@ -45,50 +43,44 @@ public class StaticTopicIT extends BaseConf {
private static Logger logger = Logger.getLogger(StaticTopicIT.class); private static Logger logger = Logger.getLogger(StaticTopicIT.class);
private DefaultMQAdminExt defaultMQAdminExt; private DefaultMQAdminExt defaultMQAdminExt;
private ClientMetadata clientMetadata;
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
System.setProperty("rocketmq.client.rebalance.waitInterval", "500"); System.setProperty("rocketmq.client.rebalance.waitInterval", "500");
defaultMQAdminExt = getAdmin(nsAddr); defaultMQAdminExt = getAdmin(nsAddr);
waitBrokerRegistered(nsAddr, clusterName); waitBrokerRegistered(nsAddr, clusterName);
clientMetadata = new ClientMetadata();
defaultMQAdminExt.start(); defaultMQAdminExt.start();
ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
if (clusterInfo == null
|| clusterInfo.getClusterAddrTable().isEmpty()) {
throw new RuntimeException("The Cluster info is empty");
}
clientMetadata.refreshClusterInfo(clusterInfo);
} }
public Map<String, TopicConfigAndQueueMapping> createStaticTopic(String topic, int queueNum, Set<String> targetBrokers) throws Exception { @Test
Map<String, TopicConfigAndQueueMapping> brokerConfigMap = defaultMQAdminExt.examineTopicConfigAll(clientMetadata, topic); public void testNoTargetBrokers() throws Exception {
Assert.assertTrue(brokerConfigMap.isEmpty()); String topic = "static" + MQRandomUtils.getRandomTopic();
TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, new HashSet<>(), brokerConfigMap); int queueNum = 10;
Assert.assertEquals(targetBrokers.size(), brokerConfigMap.size()); {
//If some succeed, and others fail, it will cause inconsistent data Set<String> targetBrokers = new HashSet<>();
for (Map.Entry<String, TopicConfigAndQueueMapping> entry : brokerConfigMap.entrySet()) { targetBrokers.add(broker1Name);
String broker = entry.getKey(); MQAdminTestUtils.createStaticTopic(topic, queueNum, targetBrokers, defaultMQAdminExt);
String addr = clientMetadata.findMasterBrokerAddr(broker); Map<String, TopicConfigAndQueueMapping> remoteBrokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt);
TopicConfigAndQueueMapping configMapping = entry.getValue(); Assert.assertEquals(2, remoteBrokerConfigMap.size());
defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), false); TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, remoteBrokerConfigMap);
Map<Integer, TopicQueueMappingOne> globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(getMappingDetailFromConfig(remoteBrokerConfigMap.values())), false, true);
Assert.assertEquals(queueNum, globalIdMap.size());
TopicConfigAndQueueMapping configMapping = remoteBrokerConfigMap.get(broker2Name);
Assert.assertEquals(0, configMapping.getWriteQueueNums());
Assert.assertEquals(0, configMapping.getReadQueueNums());
Assert.assertEquals(0, configMapping.getMappingDetail().getHostedQueues().size());
} }
return brokerConfigMap;
}
public void remappingStaticTopic(String topic, Set<String> targetBrokers) throws Exception {
Map<String, TopicConfigAndQueueMapping> brokerConfigMap = defaultMQAdminExt.examineTopicConfigAll(clientMetadata, topic);
Assert.assertFalse(brokerConfigMap.isEmpty());
TopicRemappingDetailWrapper wrapper = TopicQueueMappingUtils.remappingStaticTopic(topic, brokerConfigMap, targetBrokers);
defaultMQAdminExt.remappingStaticTopic(clientMetadata, topic, wrapper.getBrokerToMapIn(), wrapper.getBrokerToMapOut(), brokerConfigMap, TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE, false);
}
@Test {
public void testNonTargetBrokers() { Set<String> targetBrokers = new HashSet<>();
targetBrokers.add(broker2Name);
MQAdminTestUtils.remappingStaticTopic(topic, targetBrokers, defaultMQAdminExt);
Map<String, TopicConfigAndQueueMapping> remoteBrokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt);
Assert.assertEquals(2, remoteBrokerConfigMap.size());
TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, remoteBrokerConfigMap);
Map<Integer, TopicQueueMappingOne> globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(getMappingDetailFromConfig(remoteBrokerConfigMap.values())), false, true);
Assert.assertEquals(queueNum, globalIdMap.size());
}
} }
...@@ -102,10 +94,10 @@ public class StaticTopicIT extends BaseConf { ...@@ -102,10 +94,10 @@ public class StaticTopicIT extends BaseConf {
int queueNum = 10; int queueNum = 10;
int msgEachQueue = 100; int msgEachQueue = 100;
//create static topic //create static topic
Map<String, TopicConfigAndQueueMapping> localBrokerConfigMap = createStaticTopic(topic, queueNum, getBrokers()); Map<String, TopicConfigAndQueueMapping> localBrokerConfigMap = MQAdminTestUtils.createStaticTopic(topic, queueNum, getBrokers(), defaultMQAdminExt);
//check the static topic config //check the static topic config
{ {
Map<String, TopicConfigAndQueueMapping> remoteBrokerConfigMap = defaultMQAdminExt.examineTopicConfigAll(clientMetadata, topic); Map<String, TopicConfigAndQueueMapping> remoteBrokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt);
Assert.assertEquals(2, remoteBrokerConfigMap.size()); Assert.assertEquals(2, remoteBrokerConfigMap.size());
for (Map.Entry<String, TopicConfigAndQueueMapping> entry: remoteBrokerConfigMap.entrySet()) { for (Map.Entry<String, TopicConfigAndQueueMapping> entry: remoteBrokerConfigMap.entrySet()) {
String broker = entry.getKey(); String broker = entry.getKey();
...@@ -178,7 +170,7 @@ public class StaticTopicIT extends BaseConf { ...@@ -178,7 +170,7 @@ public class StaticTopicIT extends BaseConf {
} }
@Test @Test
public void testDoubleReadCheck() throws Exception { public void testDoubleReadCheckConsumerOffset() throws Exception {
String topic = "static" + MQRandomUtils.getRandomTopic(); String topic = "static" + MQRandomUtils.getRandomTopic();
String group = initConsumerGroup(); String group = initConsumerGroup();
RMQNormalProducer producer = getProducer(nsAddr, topic); RMQNormalProducer producer = getProducer(nsAddr, topic);
...@@ -194,7 +186,7 @@ public class StaticTopicIT extends BaseConf { ...@@ -194,7 +186,7 @@ public class StaticTopicIT extends BaseConf {
{ {
Set<String> targetBrokers = new HashSet<>(); Set<String> targetBrokers = new HashSet<>();
targetBrokers.add(broker1Name); targetBrokers.add(broker1Name);
createStaticTopic(topic, queueNum, targetBrokers); MQAdminTestUtils.createStaticTopic(topic, queueNum, targetBrokers, defaultMQAdminExt);
} }
//produce the messages //produce the messages
{ {
...@@ -217,7 +209,7 @@ public class StaticTopicIT extends BaseConf { ...@@ -217,7 +209,7 @@ public class StaticTopicIT extends BaseConf {
{ {
Set<String> targetBrokers = new HashSet<>(); Set<String> targetBrokers = new HashSet<>();
targetBrokers.add(broker2Name); targetBrokers.add(broker2Name);
remappingStaticTopic(topic, targetBrokers); MQAdminTestUtils.remappingStaticTopic(topic, targetBrokers, defaultMQAdminExt);
} }
//make the metadata //make the metadata
...@@ -226,8 +218,8 @@ public class StaticTopicIT extends BaseConf { ...@@ -226,8 +218,8 @@ public class StaticTopicIT extends BaseConf {
{ {
producer = getProducer(nsAddr, topic); producer = getProducer(nsAddr, topic);
ClientMetadata clientMetadata = MQAdminUtils.getBrokerAndTopicMetadata(topic, defaultMQAdminExt);
//just refresh the metadata //just refresh the metadata
defaultMQAdminExt.examineTopicConfigAll(clientMetadata, topic);
List<MessageQueue> messageQueueList = producer.getMessageQueue(); List<MessageQueue> messageQueueList = producer.getMessageQueue();
for(MessageQueue messageQueue: messageQueueList) { for(MessageQueue messageQueue: messageQueueList) {
producer.send(msgEachQueue, messageQueue); producer.send(msgEachQueue, messageQueue);
...@@ -276,7 +268,7 @@ public class StaticTopicIT extends BaseConf { ...@@ -276,7 +268,7 @@ public class StaticTopicIT extends BaseConf {
{ {
Set<String> targetBrokers = new HashSet<>(); Set<String> targetBrokers = new HashSet<>();
targetBrokers.add(broker1Name); targetBrokers.add(broker1Name);
createStaticTopic(topic, queueNum, targetBrokers); MQAdminTestUtils.createStaticTopic(topic, queueNum, targetBrokers, defaultMQAdminExt);
} }
//System.out.printf("%s %s\n", broker1Name, clientMetadata.findMasterBrokerAddr(broker1Name)); //System.out.printf("%s %s\n", broker1Name, clientMetadata.findMasterBrokerAddr(broker1Name));
//System.out.printf("%s %s\n", broker2Name, clientMetadata.findMasterBrokerAddr(broker2Name)); //System.out.printf("%s %s\n", broker2Name, clientMetadata.findMasterBrokerAddr(broker2Name));
...@@ -310,8 +302,8 @@ public class StaticTopicIT extends BaseConf { ...@@ -310,8 +302,8 @@ public class StaticTopicIT extends BaseConf {
{ {
Set<String> targetBrokers = new HashSet<>(); Set<String> targetBrokers = new HashSet<>();
targetBrokers.add(broker2Name); targetBrokers.add(broker2Name);
remappingStaticTopic(topic, targetBrokers); MQAdminTestUtils.remappingStaticTopic(topic, targetBrokers, defaultMQAdminExt);
Map<String, TopicConfigAndQueueMapping> remoteBrokerConfigMap = defaultMQAdminExt.examineTopicConfigAll(clientMetadata, topic); Map<String, TopicConfigAndQueueMapping> remoteBrokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt);
TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, remoteBrokerConfigMap); TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, remoteBrokerConfigMap);
Map<Integer, TopicQueueMappingOne> globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(getMappingDetailFromConfig(remoteBrokerConfigMap.values())), false, true); Map<Integer, TopicQueueMappingOne> globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(getMappingDetailFromConfig(remoteBrokerConfigMap.values())), false, true);
...@@ -324,6 +316,7 @@ public class StaticTopicIT extends BaseConf { ...@@ -324,6 +316,7 @@ public class StaticTopicIT extends BaseConf {
Thread.sleep(500); Thread.sleep(500);
producer.setDebug(true); producer.setDebug(true);
{ {
ClientMetadata clientMetadata = MQAdminUtils.getBrokerAndTopicMetadata(topic, defaultMQAdminExt);
List<MessageQueue> messageQueueList = producer.getMessageQueue(); List<MessageQueue> messageQueueList = producer.getMessageQueue();
for (int i = 0; i < queueNum; i++) { for (int i = 0; i < queueNum; i++) {
MessageQueue messageQueue = messageQueueList.get(i); MessageQueue messageQueue = messageQueueList.get(i);
......
...@@ -219,11 +219,6 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt { ...@@ -219,11 +219,6 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
return defaultMQAdminExtImpl.examineTopicConfig(addr, topic); return defaultMQAdminExtImpl.examineTopicConfig(addr, topic);
} }
@Override
public Map<String, TopicConfigAndQueueMapping> examineTopicConfigAll(ClientMetadata clientMetadata, String topic) throws RemotingException, InterruptedException, MQBrokerException {
return this.defaultMQAdminExtImpl.examineTopicConfigAll(clientMetadata, topic);
}
@Override @Override
public TopicStatsTable examineTopicStats( public TopicStatsTable examineTopicStats(
String topic) throws RemotingException, MQClientException, InterruptedException, String topic) throws RemotingException, MQClientException, InterruptedException,
...@@ -621,9 +616,5 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt { ...@@ -621,9 +616,5 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
this.defaultMQAdminExtImpl.createStaticTopic(addr, defaultTopic, topicConfig, mappingDetail, force); this.defaultMQAdminExtImpl.createStaticTopic(addr, defaultTopic, topicConfig, mappingDetail, force);
} }
@Override
public void remappingStaticTopic(ClientMetadata clientMetadata, String topic, Set<String> brokersToMapIn, Set<String> brokersToMapOut, Map<String, TopicConfigAndQueueMapping> brokerConfigMap, int blockSeqSize, boolean force) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
this.defaultMQAdminExtImpl.remappingStaticTopic(clientMetadata, topic, brokersToMapIn, brokersToMapOut, brokerConfigMap, blockSeqSize, force);
}
} }
...@@ -1055,139 +1055,6 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { ...@@ -1055,139 +1055,6 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
} }
@Override
public void remappingStaticTopic(ClientMetadata clientMetadata, String topic, Set<String> brokersToMapIn, Set<String> brokersToMapOut, Map<String, TopicConfigAndQueueMapping> brokerConfigMap, int blockSeqSize, boolean force) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
for (String broker : brokerConfigMap.keySet()) {
String addr = clientMetadata.findMasterBrokerAddr(broker);
if (addr == null) {
throw new RuntimeException("Can't find addr for broker " + broker);
}
}
// now do the remapping
//Step1: let the new leader can be write without the logicOffset
for (String broker: brokersToMapIn) {
String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);
createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force);
}
//Step2: forbid the write of old leader
for (String broker: brokersToMapOut) {
String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);
createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force);
}
//Step3: decide the logic offset
for (String broker: brokersToMapOut) {
String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicStatsTable statsTable = examineTopicStats(addr, topic);
TopicConfigAndQueueMapping mapOutConfig = brokerConfigMap.get(broker);
for (Map.Entry<Integer, List<LogicQueueMappingItem>> entry : mapOutConfig.getMappingDetail().getHostedQueues().entrySet()) {
List<LogicQueueMappingItem> items = entry.getValue();
Integer globalId = entry.getKey();
if (items.size() < 2) {
continue;
}
LogicQueueMappingItem newLeader = items.get(items.size() - 1);
LogicQueueMappingItem oldLeader = items.get(items.size() - 2);
if (newLeader.getLogicOffset() > 0) {
continue;
}
TopicOffset topicOffset = statsTable.getOffsetTable().get(new MessageQueue(topic, oldLeader.getBname(), oldLeader.getQueueId()));
if (topicOffset == null) {
throw new RuntimeException("Cannot get the max offset for old leader " + oldLeader);
}
//TODO check the max offset, will it return -1?
if (topicOffset.getMaxOffset() < oldLeader.getStartOffset()) {
throw new RuntimeException("The max offset is smaller then the start offset " + oldLeader + " " + topicOffset.getMaxOffset());
}
newLeader.setLogicOffset(TopicQueueMappingUtils.blockSeqRoundUp(oldLeader.computeStaticQueueOffset(topicOffset.getMaxOffset()), blockSeqSize));
TopicConfigAndQueueMapping mapInConfig = brokerConfigMap.get(newLeader.getBname());
//fresh the new leader
TopicQueueMappingDetail.putMappingInfo(mapInConfig.getMappingDetail(), globalId, items);
}
}
//Step4: write to the new leader with logic offset
for (String broker: brokersToMapIn) {
String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);
createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force);
}
//Step5: write the non-target brokers
for (String broker: brokerConfigMap.keySet()) {
if (brokersToMapIn.contains(broker) || brokersToMapOut.contains(broker)) {
continue;
}
String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);
createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force);
}
}
@Override
public Map<String, TopicConfigAndQueueMapping> examineTopicConfigAll(ClientMetadata clientMetadata, String topic) throws RemotingException, InterruptedException, MQBrokerException {
Map<String, TopicConfigAndQueueMapping> brokerConfigMap = new HashMap<>();
boolean getFromBrokers = false;
TopicRouteData routeData = null;
try {
routeData = examineTopicRouteInfo(topic);
} catch (MQClientException exception) {
if (exception.getResponseCode() != ResponseCode.TOPIC_NOT_EXIST) {
throw new MQBrokerException(exception.getResponseCode(), exception.getErrorMessage());
} else {
getFromBrokers = true;
}
}
if (!getFromBrokers) {
if (routeData != null
&& !routeData.getQueueDatas().isEmpty()) {
clientMetadata.freshTopicRoute(topic, routeData);
for (QueueData queueData: routeData.getQueueDatas()) {
String bname = queueData.getBrokerName();
String addr = clientMetadata.findMasterBrokerAddr(bname);
try {
TopicConfigAndQueueMapping mapping = (TopicConfigAndQueueMapping) examineTopicConfig(addr, topic);
//allow the config is null
if (mapping != null) {
brokerConfigMap.put(bname, mapping);
}
} catch (MQBrokerException exception) {
if (exception.getResponseCode() != ResponseCode.TOPIC_NOT_EXIST) {
throw exception;
}
}
}
}
} else {
log.info("The topic {} dose not exist in nameserver, so check it from all brokers", topic);
//if cannot get from nameserver, then check all the brokers
ClusterInfo clusterInfo = examineBrokerClusterInfo();
if (clusterInfo != null
&& clusterInfo.getBrokerAddrTable() != null) {
clientMetadata.refreshClusterInfo(clusterInfo);
}
for (Entry<String, HashMap<Long, String>> entry : clientMetadata.getBrokerAddrTable().entrySet()) {
String bname = entry.getKey();
HashMap<Long, String> map = entry.getValue();
String addr = map.get(MixAll.MASTER_ID);
if (addr != null) {
try {
TopicConfigAndQueueMapping mapping = (TopicConfigAndQueueMapping) examineTopicConfig(addr, topic);
//allow the config is null
if (mapping != null) {
brokerConfigMap.put(bname, mapping);
}
} catch (MQBrokerException exception1) {
if (exception1.getResponseCode() != ResponseCode.TOPIC_NOT_EXIST) {
throw exception1;
}
}
}
}
}
return brokerConfigMap;
}
@Override @Override
public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException { public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
return this.mqClientInstance.getMQAdminImpl().searchOffset(mq, timestamp); return this.mqClientInstance.getMQAdminImpl().searchOffset(mq, timestamp);
......
...@@ -320,7 +320,4 @@ public interface MQAdminExt extends MQAdmin { ...@@ -320,7 +320,4 @@ public interface MQAdminExt extends MQAdmin {
void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail mappingDetail, final boolean force) throws RemotingException, InterruptedException, MQBrokerException; void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail mappingDetail, final boolean force) throws RemotingException, InterruptedException, MQBrokerException;
Map<String, TopicConfigAndQueueMapping> examineTopicConfigAll(ClientMetadata clientMetadata, String topic) throws RemotingException, InterruptedException, MQBrokerException;
void remappingStaticTopic(ClientMetadata clientMetadata, String topic, Set<String> brokersToMapIn, Set<String> brokersToMapOut, Map<String, TopicConfigAndQueueMapping> brokerConfigMap, int blockSeqSize, boolean force) throws RemotingException, MQBrokerException, InterruptedException, MQClientException;
} }
package org.apache.rocketmq.tools.admin;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.admin.TopicOffset;
import org.apache.rocketmq.common.admin.TopicStatsTable;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.rpc.ClientMetadata;
import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
import org.apache.rocketmq.common.statictopic.TopicRemappingDetailWrapper;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class MQAdminUtils {
public static ClientMetadata getBrokerMetadata(DefaultMQAdminExt defaultMQAdminExt) throws InterruptedException, RemotingConnectException, RemotingTimeoutException, RemotingSendRequestException, MQBrokerException {
ClientMetadata clientMetadata = new ClientMetadata();
refreshClusterInfo(defaultMQAdminExt, clientMetadata);
return clientMetadata;
}
public static ClientMetadata getBrokerAndTopicMetadata(String topic, DefaultMQAdminExt defaultMQAdminExt) throws InterruptedException, RemotingException, MQBrokerException {
ClientMetadata clientMetadata = new ClientMetadata();
refreshClusterInfo(defaultMQAdminExt, clientMetadata);
refreshTopicRouteInfo(topic, defaultMQAdminExt, clientMetadata);
return clientMetadata;
}
public static void refreshClusterInfo(DefaultMQAdminExt defaultMQAdminExt, ClientMetadata clientMetadata) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
if (clusterInfo == null
|| clusterInfo.getClusterAddrTable().isEmpty()) {
throw new RuntimeException("The Cluster info is empty");
}
clientMetadata.refreshClusterInfo(clusterInfo);
}
public static void refreshTopicRouteInfo(String topic, DefaultMQAdminExt defaultMQAdminExt, ClientMetadata clientMetadata) throws RemotingException, InterruptedException, MQBrokerException {
TopicRouteData routeData = null;
try {
routeData = defaultMQAdminExt.examineTopicRouteInfo(topic);
} catch (MQClientException exception) {
if (exception.getResponseCode() != ResponseCode.TOPIC_NOT_EXIST) {
throw new MQBrokerException(exception.getResponseCode(), exception.getErrorMessage());
}
}
if (routeData != null
&& !routeData.getQueueDatas().isEmpty()) {
clientMetadata.freshTopicRoute(topic, routeData);
}
}
public static Set<String> getAllBrokersInSameCluster(Collection<String> brokers, DefaultMQAdminExt defaultMQAdminExt) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
if (clusterInfo == null
|| clusterInfo.getClusterAddrTable().isEmpty()) {
throw new RuntimeException("The Cluster info is empty");
}
Set<String> allBrokers = new HashSet<>();
for (String broker: brokers) {
if (allBrokers.contains(broker)) {
continue;
}
for (Set<String> clusterBrokers : clusterInfo.getClusterAddrTable().values()) {
if (clusterBrokers.contains(broker)) {
allBrokers.addAll(clusterBrokers);
break;
}
}
}
return allBrokers;
}
public static void completeNoTargetBrokers(Map<String, TopicConfigAndQueueMapping> brokerConfigMap, DefaultMQAdminExt defaultMQAdminExt) throws InterruptedException, RemotingConnectException, RemotingTimeoutException, RemotingSendRequestException, MQBrokerException {
TopicConfigAndQueueMapping configMapping = brokerConfigMap.values().iterator().next();
String topic = configMapping.getTopicName();
int queueNum = configMapping.getMappingDetail().getTotalQueues();
long newEpoch = configMapping.getMappingDetail().getEpoch();
Set<String> allBrokers = getAllBrokersInSameCluster(brokerConfigMap.keySet(), defaultMQAdminExt);
for (String broker: allBrokers) {
if (!brokerConfigMap.containsKey(broker)) {
brokerConfigMap.put(broker, new TopicConfigAndQueueMapping(new TopicConfig(topic, 0, 0), new TopicQueueMappingDetail(topic, queueNum, broker, newEpoch)));
}
}
}
public static void checkIfMasterAlive(Collection<String> brokers, DefaultMQAdminExt defaultMQAdminExt, ClientMetadata clientMetadata) {
for (String broker : brokers) {
String addr = clientMetadata.findMasterBrokerAddr(broker);
if (addr == null) {
throw new RuntimeException("Can't find addr for broker " + broker);
}
}
}
public static void updateTopicConfigMappingAll(Map<String, TopicConfigAndQueueMapping> brokerConfigMap, DefaultMQAdminExt defaultMQAdminExt, boolean force) throws Exception {
ClientMetadata clientMetadata = getBrokerMetadata(defaultMQAdminExt);
checkIfMasterAlive(brokerConfigMap.keySet(), defaultMQAdminExt, clientMetadata);
//If some succeed, and others fail, it will cause inconsistent data
for (Map.Entry<String, TopicConfigAndQueueMapping> entry : brokerConfigMap.entrySet()) {
String broker = entry.getKey();
String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicConfigAndQueueMapping configMapping = entry.getValue();
defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force);
}
}
public static void remappingStaticTopic(String topic, Set<String> brokersToMapIn, Set<String> brokersToMapOut, Map<String, TopicConfigAndQueueMapping> brokerConfigMap, int blockSeqSize, boolean force, DefaultMQAdminExt defaultMQAdminExt) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
ClientMetadata clientMetadata = MQAdminUtils.getBrokerMetadata(defaultMQAdminExt);
MQAdminUtils.checkIfMasterAlive(brokerConfigMap.keySet(), defaultMQAdminExt, clientMetadata);
// now do the remapping
//Step1: let the new leader can be write without the logicOffset
for (String broker: brokersToMapIn) {
String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);
defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force);
}
//Step2: forbid the write of old leader
for (String broker: brokersToMapOut) {
String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);
defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force);
}
//Step3: decide the logic offset
for (String broker: brokersToMapOut) {
String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicStatsTable statsTable = defaultMQAdminExt.examineTopicStats(addr, topic);
TopicConfigAndQueueMapping mapOutConfig = brokerConfigMap.get(broker);
for (Map.Entry<Integer, List<LogicQueueMappingItem>> entry : mapOutConfig.getMappingDetail().getHostedQueues().entrySet()) {
List<LogicQueueMappingItem> items = entry.getValue();
Integer globalId = entry.getKey();
if (items.size() < 2) {
continue;
}
LogicQueueMappingItem newLeader = items.get(items.size() - 1);
LogicQueueMappingItem oldLeader = items.get(items.size() - 2);
if (newLeader.getLogicOffset() > 0) {
continue;
}
TopicOffset topicOffset = statsTable.getOffsetTable().get(new MessageQueue(topic, oldLeader.getBname(), oldLeader.getQueueId()));
if (topicOffset == null) {
throw new RuntimeException("Cannot get the max offset for old leader " + oldLeader);
}
//TODO check the max offset, will it return -1?
if (topicOffset.getMaxOffset() < oldLeader.getStartOffset()) {
throw new RuntimeException("The max offset is smaller then the start offset " + oldLeader + " " + topicOffset.getMaxOffset());
}
newLeader.setLogicOffset(TopicQueueMappingUtils.blockSeqRoundUp(oldLeader.computeStaticQueueOffset(topicOffset.getMaxOffset()), blockSeqSize));
TopicConfigAndQueueMapping mapInConfig = brokerConfigMap.get(newLeader.getBname());
//fresh the new leader
TopicQueueMappingDetail.putMappingInfo(mapInConfig.getMappingDetail(), globalId, items);
}
}
//Step4: write to the new leader with logic offset
for (String broker: brokersToMapIn) {
String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);
defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force);
}
//Step5: write the non-target brokers
for (String broker: brokerConfigMap.keySet()) {
if (brokersToMapIn.contains(broker) || brokersToMapOut.contains(broker)) {
continue;
}
String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);
defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force);
}
}
public static Map<String, TopicConfigAndQueueMapping> examineTopicConfigAll(String topic, DefaultMQAdminExt defaultMQAdminExt) throws RemotingException, InterruptedException, MQBrokerException {
Map<String, TopicConfigAndQueueMapping> brokerConfigMap = new HashMap<>();
ClientMetadata clientMetadata = new ClientMetadata();
boolean getFromBrokers = false;
TopicRouteData routeData = null;
try {
routeData = defaultMQAdminExt.examineTopicRouteInfo(topic);
} catch (MQClientException exception) {
if (exception.getResponseCode() != ResponseCode.TOPIC_NOT_EXIST) {
throw new MQBrokerException(exception.getResponseCode(), exception.getErrorMessage());
} else {
getFromBrokers = true;
}
}
if (!getFromBrokers) {
if (routeData != null
&& !routeData.getQueueDatas().isEmpty()) {
clientMetadata.freshTopicRoute(topic, routeData);
for (QueueData queueData: routeData.getQueueDatas()) {
String bname = queueData.getBrokerName();
String addr = clientMetadata.findMasterBrokerAddr(bname);
try {
TopicConfigAndQueueMapping mapping = (TopicConfigAndQueueMapping) defaultMQAdminExt.examineTopicConfig(addr, topic);
//allow the config is null
if (mapping != null) {
brokerConfigMap.put(bname, mapping);
}
} catch (MQBrokerException exception) {
if (exception.getResponseCode() != ResponseCode.TOPIC_NOT_EXIST) {
throw exception;
}
}
}
}
} else {
//if cannot get from nameserver, then check all the brokers
ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
if (clusterInfo != null
&& clusterInfo.getBrokerAddrTable() != null) {
clientMetadata.refreshClusterInfo(clusterInfo);
}
for (Map.Entry<String, HashMap<Long, String>> entry : clientMetadata.getBrokerAddrTable().entrySet()) {
String bname = entry.getKey();
HashMap<Long, String> map = entry.getValue();
String addr = map.get(MixAll.MASTER_ID);
if (addr != null) {
try {
TopicConfigAndQueueMapping mapping = (TopicConfigAndQueueMapping) defaultMQAdminExt.examineTopicConfig(addr, topic);
//allow the config is null
if (mapping != null) {
brokerConfigMap.put(bname, mapping);
}
} catch (MQBrokerException exception1) {
if (exception1.getResponseCode() != ResponseCode.TOPIC_NOT_EXIST) {
throw exception1;
}
}
}
}
}
return brokerConfigMap;
}
}
...@@ -20,6 +20,7 @@ import org.apache.commons.cli.CommandLine; ...@@ -20,6 +20,7 @@ import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option; import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionGroup; import org.apache.commons.cli.OptionGroup;
import org.apache.commons.cli.Options; import org.apache.commons.cli.Options;
import org.apache.rocketmq.client.MQAdmin;
import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping; import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingOne; import org.apache.rocketmq.common.statictopic.TopicQueueMappingOne;
...@@ -30,6 +31,7 @@ import org.apache.rocketmq.common.statictopic.TopicRemappingDetailWrapper; ...@@ -30,6 +31,7 @@ import org.apache.rocketmq.common.statictopic.TopicRemappingDetailWrapper;
import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.srvutil.ServerUtil; import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.admin.MQAdminUtils;
import org.apache.rocketmq.tools.command.SubCommand; import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException; import org.apache.rocketmq.tools.command.SubCommandException;
...@@ -87,7 +89,6 @@ public class RemappingStaticTopicSubCommand implements SubCommand { ...@@ -87,7 +89,6 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
ClientMetadata clientMetadata = new ClientMetadata();
try { try {
String topic = commandLine.getOptionValue('t').trim(); String topic = commandLine.getOptionValue('t').trim();
...@@ -99,24 +100,12 @@ public class RemappingStaticTopicSubCommand implements SubCommand { ...@@ -99,24 +100,12 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, wrapper.getBrokerConfigMap()); TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, wrapper.getBrokerConfigMap());
TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(TopicQueueMappingUtils.getMappingDetailFromConfig(wrapper.getBrokerConfigMap().values())), false, true); TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(TopicQueueMappingUtils.getMappingDetailFromConfig(wrapper.getBrokerConfigMap().values())), false, true);
ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
if (clusterInfo == null
|| clusterInfo.getClusterAddrTable().isEmpty()) {
throw new RuntimeException("The Cluster info is empty");
}
clientMetadata.refreshClusterInfo(clusterInfo);
boolean force = false; boolean force = false;
if (commandLine.hasOption("fr") && Boolean.parseBoolean(commandLine.getOptionValue("fr").trim())) { if (commandLine.hasOption("fr") && Boolean.parseBoolean(commandLine.getOptionValue("fr").trim())) {
force = true; force = true;
} }
for (String broker : wrapper.getBrokerConfigMap().keySet()) { MQAdminUtils.remappingStaticTopic(topic, wrapper.getBrokerToMapIn(), wrapper.getBrokerToMapOut(), wrapper.getBrokerConfigMap(), 10000, force, defaultMQAdminExt);
String addr = clientMetadata.findMasterBrokerAddr(broker);
if (addr == null) {
throw new RuntimeException("Can't find addr for broker " + broker);
}
}
defaultMQAdminExt.remappingStaticTopic(clientMetadata, topic, wrapper.getBrokerToMapIn(), wrapper.getBrokerToMapOut(), wrapper.getBrokerConfigMap(), 10000, force);
return; return;
}catch (Exception e) { }catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e); throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
...@@ -159,7 +148,6 @@ public class RemappingStaticTopicSubCommand implements SubCommand { ...@@ -159,7 +148,6 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
|| clusterInfo.getClusterAddrTable().isEmpty()) { || clusterInfo.getClusterAddrTable().isEmpty()) {
throw new RuntimeException("The Cluster info is empty"); throw new RuntimeException("The Cluster info is empty");
} }
clientMetadata.refreshClusterInfo(clusterInfo);
{ {
if (commandLine.hasOption("b")) { if (commandLine.hasOption("b")) {
String brokerStrs = commandLine.getOptionValue("b").trim(); String brokerStrs = commandLine.getOptionValue("b").trim();
...@@ -186,7 +174,7 @@ public class RemappingStaticTopicSubCommand implements SubCommand { ...@@ -186,7 +174,7 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
} }
} }
brokerConfigMap = defaultMQAdminExt.examineTopicConfigAll(clientMetadata, topic); brokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt);
if (brokerConfigMap.isEmpty()) { if (brokerConfigMap.isEmpty()) {
throw new RuntimeException("No topic route to do the remapping"); throw new RuntimeException("No topic route to do the remapping");
} }
...@@ -204,7 +192,9 @@ public class RemappingStaticTopicSubCommand implements SubCommand { ...@@ -204,7 +192,9 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
System.out.println("The old mapping data is written to file " + newMappingDataFile); System.out.println("The old mapping data is written to file " + newMappingDataFile);
} }
defaultMQAdminExt.remappingStaticTopic(clientMetadata, topic, newWrapper.getBrokerToMapIn(), newWrapper.getBrokerToMapOut(), newWrapper.getBrokerConfigMap(), 10000, false); MQAdminUtils.completeNoTargetBrokers(newWrapper.getBrokerConfigMap(), defaultMQAdminExt);
MQAdminUtils.remappingStaticTopic(topic, newWrapper.getBrokerToMapIn(), newWrapper.getBrokerToMapOut(), newWrapper.getBrokerConfigMap(), 10000, false, defaultMQAdminExt);
} catch (Exception e) { } catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e); throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
......
...@@ -29,6 +29,7 @@ import org.apache.rocketmq.common.statictopic.TopicRemappingDetailWrapper; ...@@ -29,6 +29,7 @@ import org.apache.rocketmq.common.statictopic.TopicRemappingDetailWrapper;
import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.srvutil.ServerUtil; import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.admin.MQAdminUtils;
import org.apache.rocketmq.tools.command.SubCommand; import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException; import org.apache.rocketmq.tools.command.SubCommandException;
...@@ -106,14 +107,8 @@ public class UpdateStaticTopicSubCommand implements SubCommand { ...@@ -106,14 +107,8 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
} }
TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(TopicQueueMappingUtils.getMappingDetailFromConfig(wrapper.getBrokerConfigMap().values())), false, true); TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(TopicQueueMappingUtils.getMappingDetailFromConfig(wrapper.getBrokerConfigMap().values())), false, true);
ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo(); MQAdminUtils.completeNoTargetBrokers(wrapper.getBrokerConfigMap(), defaultMQAdminExt);
if (clusterInfo == null MQAdminUtils.updateTopicConfigMappingAll(wrapper.getBrokerConfigMap(), defaultMQAdminExt, false);
|| clusterInfo.getClusterAddrTable().isEmpty()) {
throw new RuntimeException("The Cluster info is empty");
}
clientMetadata.refreshClusterInfo(clusterInfo);
doUpdate(wrapper.getBrokerConfigMap(), clientMetadata, defaultMQAdminExt, force);
return; return;
}catch (Exception e) { }catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e); throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
...@@ -122,22 +117,6 @@ public class UpdateStaticTopicSubCommand implements SubCommand { ...@@ -122,22 +117,6 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
} }
} }
public void doUpdate(Map<String, TopicConfigAndQueueMapping> brokerConfigMap, ClientMetadata clientMetadata, DefaultMQAdminExt defaultMQAdminExt, boolean force) throws Exception {
//check it before
for (String broker : brokerConfigMap.keySet()) {
String addr = clientMetadata.findMasterBrokerAddr(broker);
if (addr == null) {
throw new RuntimeException("Can't find addr for broker " + broker);
}
}
//If some succeed, and others fail, it will cause inconsistent data
for (Map.Entry<String, TopicConfigAndQueueMapping> entry : brokerConfigMap.entrySet()) {
String broker = entry.getKey();
String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicConfigAndQueueMapping configMapping = entry.getValue();
defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force);
}
}
@Override @Override
...@@ -155,7 +134,6 @@ public class UpdateStaticTopicSubCommand implements SubCommand { ...@@ -155,7 +134,6 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
ClientMetadata clientMetadata = new ClientMetadata();
Map<String, TopicConfigAndQueueMapping> brokerConfigMap = new HashMap<>(); Map<String, TopicConfigAndQueueMapping> brokerConfigMap = new HashMap<>();
Set<String> targetBrokers = new HashSet<>(); Set<String> targetBrokers = new HashSet<>();
...@@ -173,7 +151,6 @@ public class UpdateStaticTopicSubCommand implements SubCommand { ...@@ -173,7 +151,6 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
|| clusterInfo.getClusterAddrTable().isEmpty()) { || clusterInfo.getClusterAddrTable().isEmpty()) {
throw new RuntimeException("The Cluster info is empty"); throw new RuntimeException("The Cluster info is empty");
} }
clientMetadata.refreshClusterInfo(clusterInfo);
{ {
if (commandLine.hasOption("b")) { if (commandLine.hasOption("b")) {
String brokerStrs = commandLine.getOptionValue("b").trim(); String brokerStrs = commandLine.getOptionValue("b").trim();
...@@ -192,17 +169,11 @@ public class UpdateStaticTopicSubCommand implements SubCommand { ...@@ -192,17 +169,11 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
if (targetBrokers.isEmpty()) { if (targetBrokers.isEmpty()) {
throw new RuntimeException("Find none brokers, do nothing"); throw new RuntimeException("Find none brokers, do nothing");
} }
for (String broker : targetBrokers) {
String addr = clientMetadata.findMasterBrokerAddr(broker);
if (addr == null) {
throw new RuntimeException("Can't find addr for broker " + broker);
}
}
} }
//get the existed topic config and mapping //get the existed topic config and mapping
brokerConfigMap = defaultMQAdminExt.examineTopicConfigAll(clientMetadata, topic); brokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt);
int queueNum = Integer.parseInt(commandLine.getOptionValue("qn").trim()); int queueNum = Integer.parseInt(commandLine.getOptionValue("qn").trim());
Map.Entry<Long, Integer> maxEpochAndNum = new AbstractMap.SimpleImmutableEntry<>(System.currentTimeMillis(), queueNum); Map.Entry<Long, Integer> maxEpochAndNum = new AbstractMap.SimpleImmutableEntry<>(System.currentTimeMillis(), queueNum);
...@@ -219,15 +190,15 @@ public class UpdateStaticTopicSubCommand implements SubCommand { ...@@ -219,15 +190,15 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
targetBrokers.addAll(brokerConfigMap.keySet()); targetBrokers.addAll(brokerConfigMap.keySet());
//calculate the new data //calculate the new data
TopicRemappingDetailWrapper newWrapper = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, new HashSet<>(), brokerConfigMap); TopicRemappingDetailWrapper newWrapper = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, brokerConfigMap);
{ {
String newMappingDataFile = TopicQueueMappingUtils.writeToTemp(newWrapper, true); String newMappingDataFile = TopicQueueMappingUtils.writeToTemp(newWrapper, true);
System.out.println("The new mapping data is written to file " + newMappingDataFile); System.out.println("The new mapping data is written to file " + newMappingDataFile);
} }
doUpdate(newWrapper.getBrokerConfigMap(), clientMetadata, defaultMQAdminExt, false); MQAdminUtils.completeNoTargetBrokers(brokerConfigMap, defaultMQAdminExt);
MQAdminUtils.updateTopicConfigMappingAll(brokerConfigMap, defaultMQAdminExt, false);
} catch (Exception e) { } catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e); throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally { } finally {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册