From fa39815666187647b6990af7a8a98e1779eb065d Mon Sep 17 00:00:00 2001 From: dongeforever Date: Tue, 23 Nov 2021 21:16:26 +0800 Subject: [PATCH] Catch the exception --- .../processor/AdminBrokerProcessor.java | 3 +- .../rocketmq/client/impl/MQClientAPIImpl.java | 16 +++-- .../rocketmq/common/rpc/ClientMetadata.java | 4 ++ .../rocketmq/test/smoke/StaticTopicIT.java | 2 +- .../tools/admin/DefaultMQAdminExtImpl.java | 72 +++++++++++++++---- .../rocketmq/tools/admin/MQAdminExt.java | 8 +-- 6 files changed, 77 insertions(+), 28 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 3eceb491..0341851b 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -1880,7 +1880,8 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements TopicConfig topicConfig = this.brokerController.getTopicConfigManager().getTopicConfigTable().get(requestHeader.getTopic()); if (topicConfig == null) { log.error("No topic in this broker, client: {} topic: {}", ctx.channel().remoteAddress(), requestHeader.getTopic()); - response.setCode(ResponseCode.SYSTEM_ERROR); + //be care of the response code, should set "not-exist" explictly + response.setCode(ResponseCode.TOPIC_NOT_EXIST); response.setRemark("No topic in this broker. topic: " + requestHeader.getTopic()); return response; } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index 050fc308..b229283e 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -332,7 +332,7 @@ public class MQClientAPIImpl { public void createTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final long timeoutMillis) - throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + throws RemotingException, InterruptedException, MQClientException { CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader(); requestHeader.setTopic(topicConfig.getTopicName()); requestHeader.setDefaultTopic(defaultTopic); @@ -2708,7 +2708,7 @@ public class MQClientAPIImpl { public TopicConfigAndQueueMapping getTopicConfig(final String brokerAddr, String topic, long timeoutMillis) throws InterruptedException, - RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException { + RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException { GetTopicConfigRequestHeader header = new GetTopicConfigRequestHeader(); header.setTopic(topic); header.setWithMapping(true); @@ -2720,15 +2720,19 @@ public class MQClientAPIImpl { case ResponseCode.SUCCESS: { return RemotingSerializable.decode(response.getBody(), TopicConfigAndQueueMapping.class); } + //should check the exist + case ResponseCode.TOPIC_NOT_EXIST: { + //should return null? + break; + } default: break; } - throw new MQBrokerException(response.getCode(), response.getRemark()); + throw new MQClientException(response.getCode(), response.getRemark()); } public void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail topicQueueMappingDetail, boolean force, - final long timeoutMillis) - throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + final long timeoutMillis) throws RemotingException, InterruptedException, MQClientException { CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader(); requestHeader.setTopic(topicConfig.getTopicName()); requestHeader.setDefaultTopic(defaultTopic); @@ -2753,6 +2757,6 @@ public class MQClientAPIImpl { break; } - throw new MQBrokerException(response.getCode(), response.getRemark()); + throw new MQClientException(response.getCode(), response.getRemark()); } } diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java b/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java index 499b6a87..078f616d 100644 --- a/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java +++ b/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java @@ -91,6 +91,10 @@ public class ClientMetadata { return brokerAddrTable.get(brokerName).get(MixAll.MASTER_ID); } + public ConcurrentMap> getBrokerAddrTable() { + return brokerAddrTable; + } + public static ConcurrentMap topicRouteData2EndpointsForStaticTopic(final String topic, final TopicRouteData route) { if (route.getTopicQueueMappingByBroker() == null || route.getTopicQueueMappingByBroker().isEmpty()) { diff --git a/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java b/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java index 9dd116dd..bb21dc29 100644 --- a/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java @@ -33,13 +33,13 @@ public class StaticTopicIT extends BaseConf { defaultMQAdminExt = getAdmin(nsAddr); waitBrokerRegistered(nsAddr, clusterName); clientMetadata = new ClientMetadata(); + defaultMQAdminExt.start(); ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo(); if (clusterInfo == null || clusterInfo.getClusterAddrTable().isEmpty()) { throw new RuntimeException("The Cluster info is empty"); } clientMetadata.refreshClusterInfo(clusterInfo); - } @Test diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java index 498e8358..d805f8dc 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java @@ -211,8 +211,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { } @Override - public void createAndUpdateTopicConfig(String addr, TopicConfig config) throws RemotingException, MQBrokerException, - InterruptedException, MQClientException { + public void createAndUpdateTopicConfig(String addr, TopicConfig config) throws RemotingException, InterruptedException, MQClientException { this.mqClientInstance.getMQClientAPIImpl().createTopic(addr, this.defaultMQAdminExt.getCreateTopicKey(), config, timeoutMillis); } @@ -258,7 +257,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { } @Override - public TopicConfig examineTopicConfig(String addr, String topic) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException { + public TopicConfig examineTopicConfig(String addr, String topic) throws InterruptedException, MQClientException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException { return this.mqClientInstance.getMQClientAPIImpl().getTopicConfig(addr, topic, timeoutMillis); } @@ -1171,20 +1170,63 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { } @Override - public Map examineTopicConfigAll(ClientMetadata clientMetadata, String topic) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { - TopicRouteData routeData = examineTopicRouteInfo(topic); - clientMetadata.freshTopicRoute(topic, routeData); + public Map examineTopicConfigAll(ClientMetadata clientMetadata, String topic) throws RemotingException, InterruptedException, MQClientException { Map brokerConfigMap = new HashMap<>(); + try { + TopicRouteData routeData = examineTopicRouteInfo(topic); + clientMetadata.freshTopicRoute(topic, routeData); + if (routeData != null + && !routeData.getQueueDatas().isEmpty()) { + for (QueueData queueData: routeData.getQueueDatas()) { + String bname = queueData.getBrokerName(); + String addr = clientMetadata.findMasterBrokerAddr(bname); + try { + TopicConfigAndQueueMapping mapping = (TopicConfigAndQueueMapping) examineTopicConfig(addr, topic); + //allow the config is null + if (mapping != null) { + brokerConfigMap.put(bname, mapping); + } + } catch (MQClientException exception) { + if (exception.getResponseCode() != ResponseCode.TOPIC_NOT_EXIST) { + throw exception; + } + + } + + } + } + } catch (MQClientException exception) { + if (exception.getResponseCode() != ResponseCode.TOPIC_NOT_EXIST) { + throw exception; + } + log.info("The topic {} dose not exist in nameserver, so check it from all brokers", topic); + //if cannot get from nameserver, then check all the brokers + try { + ClusterInfo clusterInfo = examineBrokerClusterInfo(); + if (clusterInfo != null + && clusterInfo.getBrokerAddrTable() != null) { + clientMetadata.refreshClusterInfo(clusterInfo); + } + }catch (MQBrokerException e) { + throw new MQClientException(e.getResponseCode(), e.getMessage()); + } + for (Entry> entry : clientMetadata.getBrokerAddrTable().entrySet()) { + String bname = entry.getKey(); + HashMap map = entry.getValue(); + String addr = map.get(MixAll.MASTER_ID); + if (addr != null) { + try { + TopicConfigAndQueueMapping mapping = (TopicConfigAndQueueMapping) examineTopicConfig(addr, topic); + //allow the config is null + if (mapping != null) { + brokerConfigMap.put(bname, mapping); + } + } catch (MQClientException clientException) { + if (clientException.getResponseCode() != ResponseCode.TOPIC_NOT_EXIST) { + throw clientException; + } + } - if (routeData != null - && !routeData.getQueueDatas().isEmpty()) { - for (QueueData queueData: routeData.getQueueDatas()) { - String bname = queueData.getBrokerName(); - String addr = clientMetadata.findMasterBrokerAddr(bname); - TopicConfigAndQueueMapping mapping = (TopicConfigAndQueueMapping) examineTopicConfig(addr, topic); - //allow the config is null - if (mapping != null) { - brokerConfigMap.put(bname, mapping); } } } diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java index f01ccc3b..60a366d5 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java @@ -109,7 +109,7 @@ public interface MQAdminExt extends MQAdmin { SubscriptionGroupConfig examineSubscriptionGroupConfig(final String addr, final String group) throws InterruptedException, RemotingException, MQClientException, MQBrokerException; TopicConfig examineTopicConfig(final String addr, - final String topic) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException; + final String topic) throws InterruptedException, MQClientException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException; TopicStatsTable examineTopicStats( final String topic) throws RemotingException, MQClientException, InterruptedException, @@ -344,11 +344,9 @@ public interface MQAdminExt extends MQAdmin { LogicalQueueRouteData toQueueRouteData) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException; - void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail mappingDetail, final boolean force) throws RemotingException, MQBrokerException, - InterruptedException, MQClientException; + void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail mappingDetail, final boolean force) throws RemotingException, InterruptedException, MQClientException; - Map examineTopicConfigAll(ClientMetadata clientMetadata, String topic) throws RemotingException, MQBrokerException, - InterruptedException, MQClientException; + Map examineTopicConfigAll(ClientMetadata clientMetadata, String topic) throws RemotingException, InterruptedException, MQClientException; void remappingStaticTopic(ClientMetadata clientMetadata, String topic, Set brokersToMapIn, Set brokersToMapOut, Map brokerConfigMap, int blockSeqSize, boolean force) throws RemotingException, MQBrokerException, InterruptedException, MQClientException; -- GitLab