提交 2047f94d 编写于 作者: L Li Zhanhui

[BugFix] When deleting topics, instruct both master and slave nodes to delete...

[BugFix] When deleting topics, instruct both master and slave nodes to delete it such that consume queues are all deleted
上级 2e488e53
...@@ -35,6 +35,9 @@ import org.apache.rocketmq.tools.admin.MQAdminExt; ...@@ -35,6 +35,9 @@ import org.apache.rocketmq.tools.admin.MQAdminExt;
public class CommandUtil { public class CommandUtil {
private static final String ERROR_MESSAGE = "Make sure the specified clusterName exists or the name server " +
"connected to is correct.";
public static Map<String/*master addr*/, List<String>/*slave addr*/> fetchMasterAndSlaveDistinguish( public static Map<String/*master addr*/, List<String>/*slave addr*/> fetchMasterAndSlaveDistinguish(
final MQAdminExt adminExt, final String clusterName) final MQAdminExt adminExt, final String clusterName)
throws InterruptedException, RemotingConnectException, throws InterruptedException, RemotingConnectException,
...@@ -46,8 +49,7 @@ public class CommandUtil { ...@@ -46,8 +49,7 @@ public class CommandUtil {
Set<String> brokerNameSet = clusterInfoSerializeWrapper.getClusterAddrTable().get(clusterName); Set<String> brokerNameSet = clusterInfoSerializeWrapper.getClusterAddrTable().get(clusterName);
if (brokerNameSet == null) { if (brokerNameSet == null) {
System.out System.out.printf("[error] %s", ERROR_MESSAGE);
.printf("[error] Make sure the specified clusterName exists or the nameserver which connected is correct.");
return masterAndSlaveMap; return masterAndSlaveMap;
} }
...@@ -62,8 +64,7 @@ public class CommandUtil { ...@@ -62,8 +64,7 @@ public class CommandUtil {
masterAndSlaveMap.put(masterAddr, new ArrayList<String>()); masterAndSlaveMap.put(masterAddr, new ArrayList<String>());
for (Long id : brokerData.getBrokerAddrs().keySet()) { for (Long id : brokerData.getBrokerAddrs().keySet()) {
if (brokerData.getBrokerAddrs().get(id) == null if (brokerData.getBrokerAddrs().get(id) == null || id == MixAll.MASTER_ID) {
|| id.longValue() == MixAll.MASTER_ID) {
continue; continue;
} }
...@@ -95,8 +96,7 @@ public class CommandUtil { ...@@ -95,8 +96,7 @@ public class CommandUtil {
} }
} }
} else { } else {
System.out System.out.printf("[error] %s", ERROR_MESSAGE);
.printf("[error] Make sure the specified clusterName exists or the nameserver which connected is correct.");
} }
return masterSet; return masterSet;
...@@ -105,26 +105,22 @@ public class CommandUtil { ...@@ -105,26 +105,22 @@ public class CommandUtil {
public static Set<String> fetchMasterAndSlaveAddrByClusterName(final MQAdminExt adminExt, final String clusterName) public static Set<String> fetchMasterAndSlaveAddrByClusterName(final MQAdminExt adminExt, final String clusterName)
throws InterruptedException, RemotingConnectException, RemotingTimeoutException, throws InterruptedException, RemotingConnectException, RemotingTimeoutException,
RemotingSendRequestException, MQBrokerException { RemotingSendRequestException, MQBrokerException {
Set<String> masterSet = new HashSet<String>(); Set<String> brokerAddressSet = new HashSet<String>();
ClusterInfo clusterInfoSerializeWrapper = adminExt.examineBrokerClusterInfo(); ClusterInfo clusterInfoSerializeWrapper = adminExt.examineBrokerClusterInfo();
Set<String> brokerNameSet = clusterInfoSerializeWrapper.getClusterAddrTable().get(clusterName); Set<String> brokerNameSet = clusterInfoSerializeWrapper.getClusterAddrTable().get(clusterName);
if (brokerNameSet != null) { if (brokerNameSet != null) {
for (String brokerName : brokerNameSet) { for (String brokerName : brokerNameSet) {
BrokerData brokerData = clusterInfoSerializeWrapper.getBrokerAddrTable().get(brokerName); BrokerData brokerData = clusterInfoSerializeWrapper.getBrokerAddrTable().get(brokerName);
if (brokerData != null) { if (brokerData != null) {
final Collection<String> addrs = brokerData.getBrokerAddrs().values(); final Collection<String> addrs = brokerData.getBrokerAddrs().values();
masterSet.addAll(addrs); brokerAddressSet.addAll(addrs);
} }
} }
} else { } else {
System.out System.out.printf("[error] %s", ERROR_MESSAGE);
.printf("[error] Make sure the specified clusterName exists or the nameserver which connected is correct.");
} }
return masterSet; return brokerAddressSet;
} }
public static Set<String> fetchBrokerNameByClusterName(final MQAdminExt adminExt, final String clusterName) public static Set<String> fetchBrokerNameByClusterName(final MQAdminExt adminExt, final String clusterName)
...@@ -132,25 +128,23 @@ public class CommandUtil { ...@@ -132,25 +128,23 @@ public class CommandUtil {
ClusterInfo clusterInfoSerializeWrapper = adminExt.examineBrokerClusterInfo(); ClusterInfo clusterInfoSerializeWrapper = adminExt.examineBrokerClusterInfo();
Set<String> brokerNameSet = clusterInfoSerializeWrapper.getClusterAddrTable().get(clusterName); Set<String> brokerNameSet = clusterInfoSerializeWrapper.getClusterAddrTable().get(clusterName);
if (brokerNameSet.isEmpty()) { if (brokerNameSet.isEmpty()) {
throw new Exception( throw new Exception(ERROR_MESSAGE);
"Make sure the specified clusterName exists or the nameserver which connected is correct.");
} }
return brokerNameSet; return brokerNameSet;
} }
public static String fetchBrokerNameByAddr(final MQAdminExt adminExt, final String addr) throws Exception { public static String fetchBrokerNameByAddr(final MQAdminExt adminExt, final String addr) throws Exception {
ClusterInfo clusterInfoSerializeWrapper = adminExt.examineBrokerClusterInfo(); ClusterInfo clusterInfoSerializeWrapper = adminExt.examineBrokerClusterInfo();
HashMap<String/* brokerName */, BrokerData> brokerAddrTable = HashMap<String/* brokerName */, BrokerData> brokerAddrTable = clusterInfoSerializeWrapper.getBrokerAddrTable();
clusterInfoSerializeWrapper.getBrokerAddrTable();
Iterator<Map.Entry<String, BrokerData>> it = brokerAddrTable.entrySet().iterator(); Iterator<Map.Entry<String, BrokerData>> it = brokerAddrTable.entrySet().iterator();
while (it.hasNext()) { while (it.hasNext()) {
Map.Entry<String, BrokerData> entry = it.next(); Map.Entry<String, BrokerData> entry = it.next();
HashMap<Long, String> brokerAddrs = entry.getValue().getBrokerAddrs(); HashMap<Long, String> brokerAddrs = entry.getValue().getBrokerAddrs();
if (brokerAddrs.containsValue(addr)) if (brokerAddrs.containsValue(addr)) {
return entry.getKey(); return entry.getKey();
}
} }
throw new Exception( throw new Exception(ERROR_MESSAGE);
"Make sure the specified broker addr exists or the nameserver which connected is correct.");
} }
} }
...@@ -38,8 +38,8 @@ public class DeleteTopicSubCommand implements SubCommand { ...@@ -38,8 +38,8 @@ public class DeleteTopicSubCommand implements SubCommand {
final String topic final String topic
) throws InterruptedException, MQBrokerException, RemotingException, MQClientException { ) throws InterruptedException, MQBrokerException, RemotingException, MQClientException {
Set<String> masterSet = CommandUtil.fetchMasterAddrByClusterName(adminExt, clusterName); Set<String> brokerAddressSet = CommandUtil.fetchMasterAndSlaveAddrByClusterName(adminExt, clusterName);
adminExt.deleteTopicInBroker(masterSet, topic); adminExt.deleteTopicInBroker(brokerAddressSet, topic);
System.out.printf("delete topic [%s] from cluster [%s] success.%n", topic, clusterName); System.out.printf("delete topic [%s] from cluster [%s] success.%n", topic, clusterName);
Set<String> nameServerSet = null; Set<String> nameServerSet = null;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册