From 6c6402868984356136335e437abcadb6c08c95d9 Mon Sep 17 00:00:00 2001 From: dongeforever Date: Wed, 24 Nov 2021 14:39:52 +0800 Subject: [PATCH] Polish the json problem --- .../processor/AdminBrokerProcessor.java | 7 ++- .../rocketmq/client/impl/MQAdminImpl.java | 17 ------ .../rocketmq/client/impl/MQClientAPIImpl.java | 8 +-- .../protocol/body/TopicQueueMappingBody.java | 51 ----------------- .../statictopic/LogicQueueMappingItem.java | 12 ++++ .../statictopic/TopicQueueMappingDetail.java | 11 ++++ .../statictopic/TopicQueueMappingInfo.java | 4 ++ .../statictopic/TopicQueueMappingTest.java | 55 +++++++++++-------- .../rocketmq/test/smoke/StaticTopicIT.java | 1 + .../tools/admin/DefaultMQAdminExt.java | 6 +- .../tools/admin/DefaultMQAdminExtImpl.java | 52 +++++++++--------- .../rocketmq/tools/admin/MQAdminExt.java | 9 +-- 12 files changed, 105 insertions(+), 128 deletions(-) delete mode 100644 common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicQueueMappingBody.java diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 0341851b..443ae463 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -313,7 +313,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements (CreateTopicRequestHeader) request.decodeCommandCustomHeader(CreateTopicRequestHeader.class); log.info("updateAndCreateTopic called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel())); - final TopicQueueMappingBody topicQueueMappingBody = RemotingSerializable.decode(request.getBody(), TopicQueueMappingBody.class); + final TopicQueueMappingDetail topicQueueMappingDetail = RemotingSerializable.decode(request.getBody(), TopicQueueMappingDetail.class); String topic = requestHeader.getTopic(); @@ -338,7 +338,10 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements try { this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig); - this.brokerController.getTopicQueueMappingManager().updateTopicQueueMapping(topicQueueMappingBody.getMappingDetail(), force); + System.out.println("Broker body:" + new String(request.getBody())); + System.out.println("Broker bodetaildy:" + topicQueueMappingDetail.toJson()); + + this.brokerController.getTopicQueueMappingManager().updateTopicQueueMapping(topicQueueMappingDetail, force); this.brokerController.registerIncrementBrokerData(topicConfig, this.brokerController.getTopicConfigManager().getDataVersion()); response.setCode(ResponseCode.SUCCESS); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java index 7d539cd7..29465888 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java @@ -80,23 +80,6 @@ public class MQAdminImpl { this.timeoutMillis = timeoutMillis; } - public void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail mappingDetail, boolean force) throws MQClientException { - MQClientException exception = null; - for (int i = 0; i < 3; i++) { - try { - this.mQClientFactory.getMQClientAPIImpl().createStaticTopic(addr, defaultTopic, topicConfig, mappingDetail, force, timeoutMillis); - break; - } catch (Exception e) { - if (2 == i) { - exception = new MQClientException("create topic to broker exception", e); - } - } - } - if (exception != null) { - throw exception; - } - } - public void createTopic(String key, String newTopic, int queueNum) throws MQClientException { createTopic(key, newTopic, queueNum, 0); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index b229283e..d57532c5 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -2708,7 +2708,7 @@ public class MQClientAPIImpl { public TopicConfigAndQueueMapping getTopicConfig(final String brokerAddr, String topic, long timeoutMillis) throws InterruptedException, - RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException { + RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException { GetTopicConfigRequestHeader header = new GetTopicConfigRequestHeader(); header.setTopic(topic); header.setWithMapping(true); @@ -2728,11 +2728,11 @@ public class MQClientAPIImpl { default: break; } - throw new MQClientException(response.getCode(), response.getRemark()); + throw new MQBrokerException(response.getCode(), response.getRemark()); } public void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail topicQueueMappingDetail, boolean force, - final long timeoutMillis) throws RemotingException, InterruptedException, MQClientException { + final long timeoutMillis) throws RemotingException, InterruptedException, MQBrokerException { CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader(); requestHeader.setTopic(topicConfig.getTopicName()); requestHeader.setDefaultTopic(defaultTopic); @@ -2757,6 +2757,6 @@ public class MQClientAPIImpl { break; } - throw new MQClientException(response.getCode(), response.getRemark()); + throw new MQBrokerException(response.getCode(), response.getRemark()); } } diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicQueueMappingBody.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicQueueMappingBody.java deleted file mode 100644 index 7e8918f1..00000000 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicQueueMappingBody.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.rocketmq.common.protocol.body; - -import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail; -import org.apache.rocketmq.remoting.protocol.RemotingSerializable; - -public class TopicQueueMappingBody extends RemotingSerializable { - - private boolean force; - private int prevGen; - private TopicQueueMappingDetail mappingDetail; - - public int getPrevGen() { - return prevGen; - } - - public void setPrevGen(int prevGen) { - this.prevGen = prevGen; - } - - public TopicQueueMappingDetail getMappingDetail() { - return mappingDetail; - } - - public void setMappingDetail(TopicQueueMappingDetail mappingDetail) { - this.mappingDetail = mappingDetail; - } - - public boolean isForce() { - return force; - } - - public void setForce(boolean force) { - this.force = force; - } -} diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java index 479f75d6..b87d2f1a 100644 --- a/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java +++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java @@ -100,6 +100,18 @@ public class LogicQueueMappingItem { this.logicOffset = logicOffset; } + public void setEndOffset(long endOffset) { + this.endOffset = endOffset; + } + + public void setTimeOfStart(long timeOfStart) { + this.timeOfStart = timeOfStart; + } + + public void setTimeOfEnd(long timeOfEnd) { + this.timeOfEnd = timeOfEnd; + } + @Override public String toString() { return "LogicQueueMappingItem{" + diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java index 7117cad9..b80aa9d5 100644 --- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java +++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java @@ -29,6 +29,13 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo { // make sure this value is not null private ConcurrentMap> hostedQueues = new ConcurrentHashMap>(); + + + public TopicQueueMappingDetail() { + + } + + public TopicQueueMappingDetail(String topic, int totalQueues, String bname, long epoch) { super(topic, totalQueues, bname, epoch); buildIdMap(); @@ -120,6 +127,10 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo { return hostedQueues; } + public void setHostedQueues(ConcurrentMap> hostedQueues) { + this.hostedQueues = hostedQueues; + } + public boolean checkIfAsPhysical(Integer globalId) { List mappingItems = getMappingInfo(globalId); return mappingItems == null diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingInfo.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingInfo.java index f6122c05..39747b3a 100644 --- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingInfo.java +++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingInfo.java @@ -32,6 +32,10 @@ public class TopicQueueMappingInfo extends RemotingSerializable { //register to broker to construct the route transient ConcurrentMap currIdMap = new ConcurrentHashMap(); + public TopicQueueMappingInfo() { + + } + public TopicQueueMappingInfo(String topic, int totalQueues, String bname, long epoch) { this.topic = topic; this.totalQueues = totalQueues; diff --git a/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingTest.java b/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingTest.java index a1b3d277..b0cc5dd7 100644 --- a/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingTest.java +++ b/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingTest.java @@ -4,8 +4,7 @@ import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.google.common.collect.ImmutableList; -import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem; -import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail; +import org.apache.rocketmq.remoting.protocol.RemotingSerializable; import org.junit.Assert; import org.junit.Test; @@ -20,33 +19,45 @@ public class TopicQueueMappingTest { System.out.println(File.separator); } - @Test public void testJsonSerialize() { LogicQueueMappingItem mappingItem = new LogicQueueMappingItem(1, 2, "broker01", 33333333333333333L, 44444444444444444L, 555555555555555555L, 6666666666666666L, 77777777777777777L); String mappingItemJson = JSON.toJSONString(mappingItem) ; - System.out.println(mappingItemJson); - - Map mappingItemMap = JSON.parseObject(mappingItemJson, Map.class); - Assert.assertEquals(8, mappingItemMap.size()); - Assert.assertEquals(mappingItemMap.get("bname"), mappingItem.getBname()); - Assert.assertEquals(mappingItemMap.get("gen"), mappingItem.getGen()); - Assert.assertEquals(mappingItemMap.get("logicOffset"), mappingItem.getLogicOffset()); - Assert.assertEquals(mappingItemMap.get("queueId"), mappingItem.getQueueId()); - Assert.assertEquals(mappingItemMap.get("startOffset"), mappingItem.getStartOffset()); - Assert.assertEquals(mappingItemMap.get("endOffset"), mappingItem.getEndOffset()); - Assert.assertEquals(mappingItemMap.get("timeOfStart"), mappingItem.getTimeOfStart()); - Assert.assertEquals(mappingItemMap.get("timeOfEnd"), mappingItem.getTimeOfEnd()); - + { + Map mappingItemMap = JSON.parseObject(mappingItemJson, Map.class); + Assert.assertEquals(8, mappingItemMap.size()); + Assert.assertEquals(mappingItemMap.get("bname"), mappingItem.getBname()); + Assert.assertEquals(mappingItemMap.get("gen"), mappingItem.getGen()); + Assert.assertEquals(mappingItemMap.get("logicOffset"), mappingItem.getLogicOffset()); + Assert.assertEquals(mappingItemMap.get("queueId"), mappingItem.getQueueId()); + Assert.assertEquals(mappingItemMap.get("startOffset"), mappingItem.getStartOffset()); + Assert.assertEquals(mappingItemMap.get("endOffset"), mappingItem.getEndOffset()); + Assert.assertEquals(mappingItemMap.get("timeOfStart"), mappingItem.getTimeOfStart()); + Assert.assertEquals(mappingItemMap.get("timeOfEnd"), mappingItem.getTimeOfEnd()); + } + { + String mappingItemJson2 = RemotingSerializable.toJson(RemotingSerializable.decode(mappingItemJson.getBytes(), LogicQueueMappingItem.class), false); + Assert.assertEquals(mappingItemJson, mappingItemJson2); + } TopicQueueMappingDetail mappingDetail = new TopicQueueMappingDetail("test", 1, "broker01", System.currentTimeMillis()); mappingDetail.putMappingInfo(0, ImmutableList.of(mappingItem)); String mappingDetailJson = JSON.toJSONString(mappingDetail); - Map mappingDetailMap = JSON.parseObject(mappingDetailJson); - Assert.assertFalse(mappingDetailMap.containsKey("prevIdMap")); - Assert.assertFalse(mappingDetailMap.containsKey("currIdMap")); - Assert.assertEquals(4, mappingDetailMap.size()); - Assert.assertEquals(1, ((JSONObject) mappingDetailMap.get("hostedQueues")).size()); - Assert.assertEquals(1, ((JSONArray)((JSONObject) mappingDetailMap.get("hostedQueues")).get("0")).size()); + { + Map mappingDetailMap = JSON.parseObject(mappingDetailJson); + Assert.assertFalse(mappingDetailMap.containsKey("prevIdMap")); + Assert.assertFalse(mappingDetailMap.containsKey("currIdMap")); + Assert.assertEquals(6, mappingDetailMap.size()); + Assert.assertEquals(1, ((JSONObject) mappingDetailMap.get("hostedQueues")).size()); + Assert.assertEquals(1, ((JSONArray)((JSONObject) mappingDetailMap.get("hostedQueues")).get("0")).size()); + } + { + System.out.println(mappingDetailJson); + TopicQueueMappingDetail detailFromJson = RemotingSerializable.decode(mappingDetailJson.getBytes(), TopicQueueMappingDetail.class); + System.out.println(JSON.toJSONString(detailFromJson)); + + //Assert.assertEquals(1, detailFromJson.getHostedQueues().size()); + //Assert.assertEquals(1, detailFromJson.getHostedQueues().get("0").size()); + } } } diff --git a/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java b/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java index bb21dc29..dac33d0e 100644 --- a/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java @@ -57,6 +57,7 @@ public class StaticTopicIT extends BaseConf { String broker = entry.getKey(); String addr = clientMetadata.findMasterBrokerAddr(broker); TopicConfigAndQueueMapping configMapping = entry.getValue(); + System.out.println(configMapping.getMappingDetail().toJson()); defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), false); } } diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java index 19136125..0c93a7ac 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java @@ -218,12 +218,12 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt { @Override public TopicConfig examineTopicConfig(String addr, - String topic) throws RemotingSendRequestException, RemotingConnectException, RemotingTimeoutException, MQBrokerException, InterruptedException { + String topic) throws RemotingSendRequestException, RemotingConnectException, RemotingTimeoutException, InterruptedException, MQBrokerException { return defaultMQAdminExtImpl.examineTopicConfig(addr, topic); } @Override - public Map examineTopicConfigAll(ClientMetadata clientMetadata, String topic) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + public Map examineTopicConfigAll(ClientMetadata clientMetadata, String topic) throws RemotingException, InterruptedException, MQBrokerException { return this.defaultMQAdminExtImpl.examineTopicConfigAll(clientMetadata, topic); } @@ -668,7 +668,7 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt { } @Override - public void createStaticTopic(String addr, String defaultTopic, TopicConfig topicConfig, TopicQueueMappingDetail mappingDetail, boolean force) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + public void createStaticTopic(String addr, String defaultTopic, TopicConfig topicConfig, TopicQueueMappingDetail mappingDetail, boolean force) throws RemotingException, InterruptedException, MQBrokerException { this.defaultMQAdminExtImpl.createStaticTopic(addr, defaultTopic, topicConfig, mappingDetail, force); } diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java index d805f8dc..3c491cd0 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java @@ -211,7 +211,8 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { } @Override - public void createAndUpdateTopicConfig(String addr, TopicConfig config) throws RemotingException, InterruptedException, MQClientException { + public void createAndUpdateTopicConfig(String addr, TopicConfig config) throws RemotingException, MQBrokerException, + InterruptedException, MQClientException { this.mqClientInstance.getMQClientAPIImpl().createTopic(addr, this.defaultMQAdminExt.getCreateTopicKey(), config, timeoutMillis); } @@ -257,7 +258,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { } @Override - public TopicConfig examineTopicConfig(String addr, String topic) throws InterruptedException, MQClientException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException { + public TopicConfig examineTopicConfig(String addr, String topic) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException { return this.mqClientInstance.getMQClientAPIImpl().getTopicConfig(addr, topic, timeoutMillis); } @@ -1105,8 +1106,8 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { } @Override - public void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail mappingDetail, final boolean force) throws MQClientException { - this.mqClientInstance.getMQAdminImpl().createStaticTopic(addr, defaultTopic, topicConfig, mappingDetail, force); + public void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail mappingDetail, final boolean force) throws RemotingException, InterruptedException, MQBrokerException { + this.mqClientInstance.getMQClientAPIImpl().createStaticTopic(addr, defaultTopic, topicConfig, mappingDetail, force, timeoutMillis); } @@ -1170,13 +1171,23 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { } @Override - public Map examineTopicConfigAll(ClientMetadata clientMetadata, String topic) throws RemotingException, InterruptedException, MQClientException { + public Map examineTopicConfigAll(ClientMetadata clientMetadata, String topic) throws RemotingException, InterruptedException, MQBrokerException { Map brokerConfigMap = new HashMap<>(); + boolean getFromBrokers = false; + TopicRouteData routeData = null; try { - TopicRouteData routeData = examineTopicRouteInfo(topic); - clientMetadata.freshTopicRoute(topic, routeData); + routeData = examineTopicRouteInfo(topic); + } catch (MQClientException exception) { + if (exception.getResponseCode() != ResponseCode.TOPIC_NOT_EXIST) { + throw new MQBrokerException(exception.getResponseCode(), exception.getErrorMessage()); + } else { + getFromBrokers = true; + } + } + if (!getFromBrokers) { if (routeData != null && !routeData.getQueueDatas().isEmpty()) { + clientMetadata.freshTopicRoute(topic, routeData); for (QueueData queueData: routeData.getQueueDatas()) { String bname = queueData.getBrokerName(); String addr = clientMetadata.findMasterBrokerAddr(bname); @@ -1186,29 +1197,21 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { if (mapping != null) { brokerConfigMap.put(bname, mapping); } - } catch (MQClientException exception) { + } catch (MQBrokerException exception) { if (exception.getResponseCode() != ResponseCode.TOPIC_NOT_EXIST) { throw exception; } - } } } - } catch (MQClientException exception) { - if (exception.getResponseCode() != ResponseCode.TOPIC_NOT_EXIST) { - throw exception; - } + } else { log.info("The topic {} dose not exist in nameserver, so check it from all brokers", topic); //if cannot get from nameserver, then check all the brokers - try { - ClusterInfo clusterInfo = examineBrokerClusterInfo(); - if (clusterInfo != null - && clusterInfo.getBrokerAddrTable() != null) { - clientMetadata.refreshClusterInfo(clusterInfo); - } - }catch (MQBrokerException e) { - throw new MQClientException(e.getResponseCode(), e.getMessage()); + ClusterInfo clusterInfo = examineBrokerClusterInfo(); + if (clusterInfo != null + && clusterInfo.getBrokerAddrTable() != null) { + clientMetadata.refreshClusterInfo(clusterInfo); } for (Entry> entry : clientMetadata.getBrokerAddrTable().entrySet()) { String bname = entry.getKey(); @@ -1221,12 +1224,11 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { if (mapping != null) { brokerConfigMap.put(bname, mapping); } - } catch (MQClientException clientException) { - if (clientException.getResponseCode() != ResponseCode.TOPIC_NOT_EXIST) { - throw clientException; + } catch (MQBrokerException exception1) { + if (exception1.getResponseCode() != ResponseCode.TOPIC_NOT_EXIST) { + throw exception1; } } - } } } diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java index 60a366d5..6414a9be 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java @@ -108,8 +108,6 @@ public interface MQAdminExt extends MQAdmin { SubscriptionGroupConfig examineSubscriptionGroupConfig(final String addr, final String group) throws InterruptedException, RemotingException, MQClientException, MQBrokerException; - TopicConfig examineTopicConfig(final String addr, - final String topic) throws InterruptedException, MQClientException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException; TopicStatsTable examineTopicStats( final String topic) throws RemotingException, MQClientException, InterruptedException, @@ -344,9 +342,12 @@ public interface MQAdminExt extends MQAdmin { LogicalQueueRouteData toQueueRouteData) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException; - void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail mappingDetail, final boolean force) throws RemotingException, InterruptedException, MQClientException; + TopicConfig examineTopicConfig(final String addr, + final String topic) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException; + + void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail mappingDetail, final boolean force) throws RemotingException, InterruptedException, MQBrokerException; - Map examineTopicConfigAll(ClientMetadata clientMetadata, String topic) throws RemotingException, InterruptedException, MQClientException; + Map examineTopicConfigAll(ClientMetadata clientMetadata, String topic) throws RemotingException, InterruptedException, MQBrokerException; void remappingStaticTopic(ClientMetadata clientMetadata, String topic, Set brokersToMapIn, Set brokersToMapOut, Map brokerConfigMap, int blockSeqSize, boolean force) throws RemotingException, MQBrokerException, InterruptedException, MQClientException; -- GitLab