diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java index 6592639035f922a362f7503342d34f7ff06ac11b..cac64f021ea1aaa1fc0a002cefafcf80c082e3ab 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java @@ -208,7 +208,7 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt { } @Override - public TopicConfig examineTopicConfig(String addr, String topic) { + public TopicConfig examineTopicConfig(String addr, String topic) throws RemotingException, InterruptedException, MQBrokerException { return defaultMQAdminExtImpl.examineTopicConfig(addr, topic); } diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java index 8930bbe49d207ab2dc361caaee6d88f75762cbea..e22956315829fad8203d831ceed3c966f7d979fb 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java @@ -222,8 +222,9 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { } @Override - public TopicConfig examineTopicConfig(String addr, String topic) { - return null; + public TopicConfig examineTopicConfig(String addr, String topic) throws RemotingException, InterruptedException, MQBrokerException { + TopicConfigSerializeWrapper topicConfigSerializeWrapper = this.mqClientInstance.getMQClientAPIImpl().getAllTopicConfig(addr,timeoutMillis); + return topicConfigSerializeWrapper.getTopicConfigTable().get(topic); } @Override diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java index d5462cb04e5a41873690c10cf67b880e21ad6bd1..b7527bedae3d4fcd92fda60627fa86fa16f3d1b7 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java @@ -92,7 +92,7 @@ public interface MQAdminExt extends MQAdmin { SubscriptionGroupConfig examineSubscriptionGroupConfig(final String addr, final String group); - TopicConfig examineTopicConfig(final String addr, final String topic); + TopicConfig examineTopicConfig(final String addr, final String topic) throws RemotingException, InterruptedException, MQBrokerException; TopicStatsTable examineTopicStats( final String topic) throws RemotingException, MQClientException, InterruptedException, diff --git a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java index 3146b1781154792a2d7928e0306e9cf172b5ddac..bfb9d6746932f50becce022cd27f3dc1660c378a 100644 --- a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java +++ b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java @@ -34,6 +34,7 @@ import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.impl.MQClientAPIImpl; import org.apache.rocketmq.client.impl.MQClientManager; import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.admin.ConsumeStats; import org.apache.rocketmq.common.admin.OffsetWrapper; import org.apache.rocketmq.common.admin.TopicOffset; @@ -54,6 +55,7 @@ import org.apache.rocketmq.common.protocol.body.ProcessQueueInfo; import org.apache.rocketmq.common.protocol.body.ProducerConnection; import org.apache.rocketmq.common.protocol.body.QueueTimeSpan; import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper; +import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper; import org.apache.rocketmq.common.protocol.body.TopicList; import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; @@ -225,6 +227,14 @@ public class DefaultMQAdminExtTest { consumerRunningInfo.setStatusTable(new TreeMap()); consumerRunningInfo.setSubscriptionSet(new TreeSet()); when(mQClientAPIImpl.getConsumerRunningInfo(anyString(), anyString(), anyString(), anyBoolean(), anyLong())).thenReturn(consumerRunningInfo); + + TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper(); + topicConfigSerializeWrapper.setTopicConfigTable(new ConcurrentHashMap() { + { + put("topic_test_examine_topicConfig", new TopicConfig("topic_test_examine_topicConfig")); + } + }); + when(mQClientAPIImpl.getAllTopicConfig(anyString(),anyLong())).thenReturn(topicConfigSerializeWrapper); } @AfterClass @@ -406,4 +416,10 @@ public class DefaultMQAdminExtTest { assertThat(subscriptionGroupWrapper.getSubscriptionGroupTable().get("Consumer-group-one").getGroupName()).isEqualTo("Consumer-group-one"); assertThat(subscriptionGroupWrapper.getSubscriptionGroupTable().get("Consumer-group-one").isConsumeBroadcastEnable()).isTrue(); } + + @Test + public void testExamineTopicConfig() throws MQBrokerException, RemotingException, InterruptedException { + TopicConfig topicConfig = defaultMQAdminExt.examineTopicConfig("127.0.0.1:10911", "topic_test_examine_topicConfig"); + assertThat(topicConfig.getTopicName().equals("topic_test_examine_topicConfig")); + } } \ No newline at end of file