diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 3eceb49134778601413b1974db1eacc48623e0d6..0341851ba2aec27efb2a6508ddaca907b9d05f2b 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -1880,7 +1880,8 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements TopicConfig topicConfig = this.brokerController.getTopicConfigManager().getTopicConfigTable().get(requestHeader.getTopic()); if (topicConfig == null) { log.error("No topic in this broker, client: {} topic: {}", ctx.channel().remoteAddress(), requestHeader.getTopic()); - response.setCode(ResponseCode.SYSTEM_ERROR); + //be care of the response code, should set "not-exist" explictly + response.setCode(ResponseCode.TOPIC_NOT_EXIST); response.setRemark("No topic in this broker. topic: " + requestHeader.getTopic()); return response; } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index 050fc30808955b9f3aa37ae391c572083a141fa1..b229283e38b9b35445be24b287c8e887d45cd2ee 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -332,7 +332,7 @@ public class MQClientAPIImpl { public void createTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final long timeoutMillis) - throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + throws RemotingException, InterruptedException, MQClientException { CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader(); requestHeader.setTopic(topicConfig.getTopicName()); requestHeader.setDefaultTopic(defaultTopic); @@ -2708,7 +2708,7 @@ public class MQClientAPIImpl { public TopicConfigAndQueueMapping getTopicConfig(final String brokerAddr, String topic, long timeoutMillis) throws InterruptedException, - RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException { + RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException { GetTopicConfigRequestHeader header = new GetTopicConfigRequestHeader(); header.setTopic(topic); header.setWithMapping(true); @@ -2720,15 +2720,19 @@ public class MQClientAPIImpl { case ResponseCode.SUCCESS: { return RemotingSerializable.decode(response.getBody(), TopicConfigAndQueueMapping.class); } + //should check the exist + case ResponseCode.TOPIC_NOT_EXIST: { + //should return null? + break; + } default: break; } - throw new MQBrokerException(response.getCode(), response.getRemark()); + throw new MQClientException(response.getCode(), response.getRemark()); } public void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail topicQueueMappingDetail, boolean force, - final long timeoutMillis) - throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + final long timeoutMillis) throws RemotingException, InterruptedException, MQClientException { CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader(); requestHeader.setTopic(topicConfig.getTopicName()); requestHeader.setDefaultTopic(defaultTopic); @@ -2753,6 +2757,6 @@ public class MQClientAPIImpl { break; } - throw new MQBrokerException(response.getCode(), response.getRemark()); + throw new MQClientException(response.getCode(), response.getRemark()); } } diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java b/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java index 499b6a87fb1cedeae5c3f56f3faf3a097d23568b..078f616d644f0240b4863c8b1db1d7918eabd625 100644 --- a/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java +++ b/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java @@ -91,6 +91,10 @@ public class ClientMetadata { return brokerAddrTable.get(brokerName).get(MixAll.MASTER_ID); } + public ConcurrentMap> getBrokerAddrTable() { + return brokerAddrTable; + } + public static ConcurrentMap topicRouteData2EndpointsForStaticTopic(final String topic, final TopicRouteData route) { if (route.getTopicQueueMappingByBroker() == null || route.getTopicQueueMappingByBroker().isEmpty()) { diff --git a/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java b/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java index 9dd116dd65f30f279dd9b4ccc717ad096ee8a3d4..bb21dc294ad7e2148f6a4a7efddcf59f38e71012 100644 --- a/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java @@ -33,13 +33,13 @@ public class StaticTopicIT extends BaseConf { defaultMQAdminExt = getAdmin(nsAddr); waitBrokerRegistered(nsAddr, clusterName); clientMetadata = new ClientMetadata(); + defaultMQAdminExt.start(); ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo(); if (clusterInfo == null || clusterInfo.getClusterAddrTable().isEmpty()) { throw new RuntimeException("The Cluster info is empty"); } clientMetadata.refreshClusterInfo(clusterInfo); - } @Test diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java index 498e8358924d24ae196e8f11321aa9dd05ddd3a8..d805f8dcff28996ee9a12f49c992055aa30835ea 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java @@ -211,8 +211,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { } @Override - public void createAndUpdateTopicConfig(String addr, TopicConfig config) throws RemotingException, MQBrokerException, - InterruptedException, MQClientException { + public void createAndUpdateTopicConfig(String addr, TopicConfig config) throws RemotingException, InterruptedException, MQClientException { this.mqClientInstance.getMQClientAPIImpl().createTopic(addr, this.defaultMQAdminExt.getCreateTopicKey(), config, timeoutMillis); } @@ -258,7 +257,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { } @Override - public TopicConfig examineTopicConfig(String addr, String topic) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException { + public TopicConfig examineTopicConfig(String addr, String topic) throws InterruptedException, MQClientException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException { return this.mqClientInstance.getMQClientAPIImpl().getTopicConfig(addr, topic, timeoutMillis); } @@ -1171,20 +1170,63 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { } @Override - public Map examineTopicConfigAll(ClientMetadata clientMetadata, String topic) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { - TopicRouteData routeData = examineTopicRouteInfo(topic); - clientMetadata.freshTopicRoute(topic, routeData); + public Map examineTopicConfigAll(ClientMetadata clientMetadata, String topic) throws RemotingException, InterruptedException, MQClientException { Map brokerConfigMap = new HashMap<>(); + try { + TopicRouteData routeData = examineTopicRouteInfo(topic); + clientMetadata.freshTopicRoute(topic, routeData); + if (routeData != null + && !routeData.getQueueDatas().isEmpty()) { + for (QueueData queueData: routeData.getQueueDatas()) { + String bname = queueData.getBrokerName(); + String addr = clientMetadata.findMasterBrokerAddr(bname); + try { + TopicConfigAndQueueMapping mapping = (TopicConfigAndQueueMapping) examineTopicConfig(addr, topic); + //allow the config is null + if (mapping != null) { + brokerConfigMap.put(bname, mapping); + } + } catch (MQClientException exception) { + if (exception.getResponseCode() != ResponseCode.TOPIC_NOT_EXIST) { + throw exception; + } + + } + + } + } + } catch (MQClientException exception) { + if (exception.getResponseCode() != ResponseCode.TOPIC_NOT_EXIST) { + throw exception; + } + log.info("The topic {} dose not exist in nameserver, so check it from all brokers", topic); + //if cannot get from nameserver, then check all the brokers + try { + ClusterInfo clusterInfo = examineBrokerClusterInfo(); + if (clusterInfo != null + && clusterInfo.getBrokerAddrTable() != null) { + clientMetadata.refreshClusterInfo(clusterInfo); + } + }catch (MQBrokerException e) { + throw new MQClientException(e.getResponseCode(), e.getMessage()); + } + for (Entry> entry : clientMetadata.getBrokerAddrTable().entrySet()) { + String bname = entry.getKey(); + HashMap map = entry.getValue(); + String addr = map.get(MixAll.MASTER_ID); + if (addr != null) { + try { + TopicConfigAndQueueMapping mapping = (TopicConfigAndQueueMapping) examineTopicConfig(addr, topic); + //allow the config is null + if (mapping != null) { + brokerConfigMap.put(bname, mapping); + } + } catch (MQClientException clientException) { + if (clientException.getResponseCode() != ResponseCode.TOPIC_NOT_EXIST) { + throw clientException; + } + } - if (routeData != null - && !routeData.getQueueDatas().isEmpty()) { - for (QueueData queueData: routeData.getQueueDatas()) { - String bname = queueData.getBrokerName(); - String addr = clientMetadata.findMasterBrokerAddr(bname); - TopicConfigAndQueueMapping mapping = (TopicConfigAndQueueMapping) examineTopicConfig(addr, topic); - //allow the config is null - if (mapping != null) { - brokerConfigMap.put(bname, mapping); } } } diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java index f01ccc3bf820f7ea64c3e6bcd8a670f5177001ed..60a366d5d2625a4e9071a2fec99cf96e71a58114 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java @@ -109,7 +109,7 @@ public interface MQAdminExt extends MQAdmin { SubscriptionGroupConfig examineSubscriptionGroupConfig(final String addr, final String group) throws InterruptedException, RemotingException, MQClientException, MQBrokerException; TopicConfig examineTopicConfig(final String addr, - final String topic) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException; + final String topic) throws InterruptedException, MQClientException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException; TopicStatsTable examineTopicStats( final String topic) throws RemotingException, MQClientException, InterruptedException, @@ -344,11 +344,9 @@ public interface MQAdminExt extends MQAdmin { LogicalQueueRouteData toQueueRouteData) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException; - void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail mappingDetail, final boolean force) throws RemotingException, MQBrokerException, - InterruptedException, MQClientException; + void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail mappingDetail, final boolean force) throws RemotingException, InterruptedException, MQClientException; - Map examineTopicConfigAll(ClientMetadata clientMetadata, String topic) throws RemotingException, MQBrokerException, - InterruptedException, MQClientException; + Map examineTopicConfigAll(ClientMetadata clientMetadata, String topic) throws RemotingException, InterruptedException, MQClientException; void remappingStaticTopic(ClientMetadata clientMetadata, String topic, Set brokersToMapIn, Set brokersToMapOut, Map brokerConfigMap, int blockSeqSize, boolean force) throws RemotingException, MQBrokerException, InterruptedException, MQClientException;