diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateTopicPermSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateTopicPermSubCommand.java index 69420913e46c9b324c2fcb6161570974cf234e1f..3c23bf3dcf5ad9e6a258a25d245f8cccc5d75961 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateTopicPermSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateTopicPermSubCommand.java @@ -16,12 +16,16 @@ */ package org.apache.rocketmq.tools.command.topic; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Set; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; +import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.common.protocol.route.BrokerData; import org.apache.rocketmq.common.protocol.route.QueueData; import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.remoting.RPCHook; @@ -67,46 +71,91 @@ public class UpdateTopicPermSubCommand implements SubCommand { @Override public void execute(final CommandLine commandLine, final Options options, RPCHook rpcHook) throws SubCommandException { + DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); + try { defaultMQAdminExt.start(); - TopicConfig topicConfig = new TopicConfig(); - String topic = commandLine.getOptionValue('t').trim(); + TopicConfig topicConfig = new TopicConfig(); + String topic; + if (commandLine.hasOption('t')) { + topic = commandLine.getOptionValue('t').trim(); + } else { + System.out.printf("topic paramter value must be need.%n"); + return; + } TopicRouteData topicRouteData = defaultMQAdminExt.examineTopicRouteInfo(topic); assert topicRouteData != null; List queueDatas = topicRouteData.getQueueDatas(); assert queueDatas != null && queueDatas.size() > 0; - QueueData queueData = queueDatas.get(0); topicConfig.setTopicName(topic); topicConfig.setWriteQueueNums(queueData.getWriteQueueNums()); topicConfig.setReadQueueNums(queueData.getReadQueueNums()); - topicConfig.setPerm(queueData.getPerm()); topicConfig.setTopicSysFlag(queueData.getTopicSynFlag()); - //new perm - int perm = Integer.parseInt(commandLine.getOptionValue('p').trim()); - int oldPerm = topicConfig.getPerm(); - if (perm == oldPerm) { - System.out.printf("new perm equals to the old one!%n"); + int perm; + if (commandLine.hasOption('p')) { + perm = Integer.parseInt(commandLine.getOptionValue('p').trim()); + } else { + System.out.printf("perm paramter value must be need.%n"); return; } topicConfig.setPerm(perm); if (commandLine.hasOption('b')) { - String addr = commandLine.getOptionValue('b').trim(); - defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig); - System.out.printf("update topic perm from %s to %s in %s success.%n", oldPerm, perm, addr); - System.out.printf("%s%n", topicConfig); - return; + String brokerAddr = commandLine.getOptionValue('b').trim(); + List brokerDatas = topicRouteData.getBrokerDatas(); + boolean isBrokerLegal = false; + String brokerName = null; + for (BrokerData data : brokerDatas) { + HashMap brokerAddrs = data.getBrokerAddrs(); + if (brokerAddrs == null || brokerAddrs.size() == 0) { + continue; + } + for (Map.Entry entry : brokerAddrs.entrySet()) { + if (brokerAddr.equals(entry.getValue()) && MixAll.MASTER_ID == entry.getKey()) { + isBrokerLegal = true; + brokerName = data.getBrokerName(); + break; + } + } + if (isBrokerLegal) { + break; + } + } + + if (isBrokerLegal && brokerName != null) { + List queueDataList = topicRouteData.getQueueDatas(); + assert queueDataList != null && queueDataList.size() > 0; + int oldPerm = 0; + for (QueueData data : queueDataList) { + if (brokerName.equals(data.getBrokerName())) { + oldPerm = data.getPerm(); + if (perm == oldPerm) { + System.out.printf("new perm equals to the old one!%n"); + return; + } + break; + } + } + defaultMQAdminExt.createAndUpdateTopicConfig(brokerAddr, topicConfig); + System.out.printf("update topic perm from %s to %s in %s success.%n", oldPerm, perm, brokerAddr); + System.out.printf("%s.%n", topicConfig); + return; + } else { + System.out.printf("updateTopicPerm error broker not exit or broker is not master!.%n"); + return; + } + } else if (commandLine.hasOption('c')) { String clusterName = commandLine.getOptionValue('c').trim(); Set masterSet = CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName); for (String addr : masterSet) { defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig); - System.out.printf("update topic perm from %s to %s in %s success.%n", oldPerm, perm, addr); + System.out.printf("update topic perm from %s to %s in %s success.%n", queueData.getPerm(), perm, addr); } return; }