提交 f05cfb92 编写于 作者: D dongeforever

Polish the static topic commands

上级 f225fd05
......@@ -244,6 +244,7 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
{
TopicRouteData routeData = defaultMQAdminExt.examineTopicRouteInfo(topic);
clientMetadata.freshTopicRoute(topic, routeData);
if (routeData != null
&& !routeData.getQueueDatas().isEmpty()) {
for (QueueData queueData: routeData.getQueueDatas()) {
......
......@@ -112,13 +112,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
throw new RuntimeException("The Cluster info is empty");
}
clientMetadata.refreshClusterInfo(clusterInfo);
//If some succeed, and others fail, it will cause inconsistent data
for (Map.Entry<String, TopicConfigAndQueueMapping> entry : wrapper.getBrokerConfigMap().entrySet()) {
String broker = entry.getKey();
String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicConfigAndQueueMapping configMapping = entry.getValue();
defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail());
}
doUpdate(wrapper.getBrokerConfigMap(), clientMetadata, defaultMQAdminExt);
return;
}catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
......@@ -127,6 +121,16 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
}
}
public void doUpdate(Map<String, TopicConfigAndQueueMapping> brokerConfigMap, ClientMetadata clientMetadata, DefaultMQAdminExt defaultMQAdminExt) throws Exception {
//If some succeed, and others fail, it will cause inconsistent data
for (Map.Entry<String, TopicConfigAndQueueMapping> entry : brokerConfigMap.entrySet()) {
String broker = entry.getKey();
String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicConfigAndQueueMapping configMapping = entry.getValue();
defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail());
}
}
@Override
public void execute(final CommandLine commandLine, final Options options,
......@@ -147,15 +151,15 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
Map<String, TopicConfigAndQueueMapping> brokerConfigMap = new HashMap<>();
Map<Integer, TopicQueueMappingOne> globalIdMap = new HashMap<>();
Set<String> brokers = new HashSet<>();
Set<String> targetBrokers = new HashSet<>();
try {
if ((!commandLine.hasOption("b") && !commandLine.hasOption('c'))
|| !commandLine.hasOption('t')
|| !commandLine.hasOption("qn")) {
ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
return;
}
String topic = commandLine.getOptionValue('t').trim();
ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
if (clusterInfo == null
......@@ -163,26 +167,33 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
throw new RuntimeException("The Cluster info is empty");
}
clientMetadata.refreshClusterInfo(clusterInfo);
String clusters = commandLine.getOptionValue('c').trim();
for (String cluster : clusters.split(",")) {
cluster = cluster.trim();
if (clusterInfo.getClusterAddrTable().get(cluster) != null) {
brokers.addAll(clusterInfo.getClusterAddrTable().get(cluster));
{
if (commandLine.hasOption("b")) {
String brokerStrs = commandLine.getOptionValue("b").trim();
for (String broker: brokerStrs.split(",")) {
targetBrokers.add(broker.trim());
}
} else if (commandLine.hasOption("c")) {
String clusters = commandLine.getOptionValue('c').trim();
for (String cluster : clusters.split(",")) {
cluster = cluster.trim();
if (clusterInfo.getClusterAddrTable().get(cluster) != null) {
targetBrokers.addAll(clusterInfo.getClusterAddrTable().get(cluster));
}
}
}
}
if (brokers.isEmpty()) {
throw new RuntimeException("Find none brokers for " + clusters);
}
for (String broker : brokers) {
String addr = clientMetadata.findMasterBrokerAddr(broker);
if (addr == null) {
throw new RuntimeException("Can't find addr for broker " + broker);
if (targetBrokers.isEmpty()) {
throw new RuntimeException("Find none brokers, do nothing");
}
for (String broker : targetBrokers) {
String addr = clientMetadata.findMasterBrokerAddr(broker);
if (addr == null) {
throw new RuntimeException("Can't find addr for broker " + broker);
}
}
}
//get the existed topic config and mapping
String topic = commandLine.getOptionValue('t').trim();
int queueNum = Integer.parseInt(commandLine.getOptionValue("qn").trim());
{
TopicRouteData routeData = defaultMQAdminExt.examineTopicRouteInfo(topic);
......@@ -223,7 +234,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
//the check is ok, now do the mapping allocation
Map<String, Integer> brokerNumMap = brokers.stream().collect(Collectors.toMap( x -> x, x -> 0));
Map<String, Integer> brokerNumMap = targetBrokers.stream().collect(Collectors.toMap( x -> x, x -> 0));
final Map<Integer, String> oldIdToBroker = new HashMap<>();
globalIdMap.forEach((key, value) -> {
String leaderbroker = value.getBname();
......@@ -277,13 +288,8 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
System.out.println("The new mapping data is written to file " + newMappingDataFile);
}
//If some succeed, and others fail, it will cause inconsistent data
for (Map.Entry<String, TopicConfigAndQueueMapping> entry : brokerConfigMap.entrySet()) {
String broker = entry.getKey();
String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicConfigAndQueueMapping configMapping = entry.getValue();
defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail());
}
doUpdate(brokerConfigMap, clientMetadata, defaultMQAdminExt);
} catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册