From 2047f94d81fe4b70006c950b72f814c8eb5ba112 Mon Sep 17 00:00:00 2001 From: Li Zhanhui Date: Mon, 28 May 2018 16:38:13 +0800 Subject: [PATCH] [BugFix] When deleting topics, instruct both master and slave nodes to delete it such that consume queues are all deleted --- .../rocketmq/tools/command/CommandUtil.java | 36 ++++++++----------- .../command/topic/DeleteTopicSubCommand.java | 4 +-- 2 files changed, 17 insertions(+), 23 deletions(-) diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/CommandUtil.java b/tools/src/main/java/org/apache/rocketmq/tools/command/CommandUtil.java index edc9144d..2e65f980 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/CommandUtil.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/CommandUtil.java @@ -35,6 +35,9 @@ import org.apache.rocketmq.tools.admin.MQAdminExt; public class CommandUtil { + private static final String ERROR_MESSAGE = "Make sure the specified clusterName exists or the name server " + + "connected to is correct."; + public static Map/*slave addr*/> fetchMasterAndSlaveDistinguish( final MQAdminExt adminExt, final String clusterName) throws InterruptedException, RemotingConnectException, @@ -46,8 +49,7 @@ public class CommandUtil { Set brokerNameSet = clusterInfoSerializeWrapper.getClusterAddrTable().get(clusterName); if (brokerNameSet == null) { - System.out - .printf("[error] Make sure the specified clusterName exists or the nameserver which connected is correct."); + System.out.printf("[error] %s", ERROR_MESSAGE); return masterAndSlaveMap; } @@ -62,8 +64,7 @@ public class CommandUtil { masterAndSlaveMap.put(masterAddr, new ArrayList()); for (Long id : brokerData.getBrokerAddrs().keySet()) { - if (brokerData.getBrokerAddrs().get(id) == null - || id.longValue() == MixAll.MASTER_ID) { + if (brokerData.getBrokerAddrs().get(id) == null || id == MixAll.MASTER_ID) { continue; } @@ -95,8 +96,7 @@ public class CommandUtil { } } } else { - System.out - .printf("[error] Make sure the specified clusterName exists or the nameserver which connected is correct."); + System.out.printf("[error] %s", ERROR_MESSAGE); } return masterSet; @@ -105,26 +105,22 @@ public class CommandUtil { public static Set fetchMasterAndSlaveAddrByClusterName(final MQAdminExt adminExt, final String clusterName) throws InterruptedException, RemotingConnectException, RemotingTimeoutException, RemotingSendRequestException, MQBrokerException { - Set masterSet = new HashSet(); - + Set brokerAddressSet = new HashSet(); ClusterInfo clusterInfoSerializeWrapper = adminExt.examineBrokerClusterInfo(); - Set brokerNameSet = clusterInfoSerializeWrapper.getClusterAddrTable().get(clusterName); - if (brokerNameSet != null) { for (String brokerName : brokerNameSet) { BrokerData brokerData = clusterInfoSerializeWrapper.getBrokerAddrTable().get(brokerName); if (brokerData != null) { final Collection addrs = brokerData.getBrokerAddrs().values(); - masterSet.addAll(addrs); + brokerAddressSet.addAll(addrs); } } } else { - System.out - .printf("[error] Make sure the specified clusterName exists or the nameserver which connected is correct."); + System.out.printf("[error] %s", ERROR_MESSAGE); } - return masterSet; + return brokerAddressSet; } public static Set fetchBrokerNameByClusterName(final MQAdminExt adminExt, final String clusterName) @@ -132,25 +128,23 @@ public class CommandUtil { ClusterInfo clusterInfoSerializeWrapper = adminExt.examineBrokerClusterInfo(); Set brokerNameSet = clusterInfoSerializeWrapper.getClusterAddrTable().get(clusterName); if (brokerNameSet.isEmpty()) { - throw new Exception( - "Make sure the specified clusterName exists or the nameserver which connected is correct."); + throw new Exception(ERROR_MESSAGE); } return brokerNameSet; } public static String fetchBrokerNameByAddr(final MQAdminExt adminExt, final String addr) throws Exception { ClusterInfo clusterInfoSerializeWrapper = adminExt.examineBrokerClusterInfo(); - HashMap brokerAddrTable = - clusterInfoSerializeWrapper.getBrokerAddrTable(); + HashMap brokerAddrTable = clusterInfoSerializeWrapper.getBrokerAddrTable(); Iterator> it = brokerAddrTable.entrySet().iterator(); while (it.hasNext()) { Map.Entry entry = it.next(); HashMap brokerAddrs = entry.getValue().getBrokerAddrs(); - if (brokerAddrs.containsValue(addr)) + if (brokerAddrs.containsValue(addr)) { return entry.getKey(); + } } - throw new Exception( - "Make sure the specified broker addr exists or the nameserver which connected is correct."); + throw new Exception(ERROR_MESSAGE); } } diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/DeleteTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/DeleteTopicSubCommand.java index 25d36ce8..6cb3f18b 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/DeleteTopicSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/DeleteTopicSubCommand.java @@ -38,8 +38,8 @@ public class DeleteTopicSubCommand implements SubCommand { final String topic ) throws InterruptedException, MQBrokerException, RemotingException, MQClientException { - Set masterSet = CommandUtil.fetchMasterAddrByClusterName(adminExt, clusterName); - adminExt.deleteTopicInBroker(masterSet, topic); + Set brokerAddressSet = CommandUtil.fetchMasterAndSlaveAddrByClusterName(adminExt, clusterName); + adminExt.deleteTopicInBroker(brokerAddressSet, topic); System.out.printf("delete topic [%s] from cluster [%s] success.%n", topic, clusterName); Set nameServerSet = null; -- GitLab