diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java index 44a5113293641938efe4d48f360e44413f497c83..a00722585b3aff592206cc889c127333c390f968 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java @@ -209,7 +209,7 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt { } @Override - public TopicConfig examineTopicConfig(String addr, String topic) { + public TopicConfig examineTopicConfig(String addr, String topic) throws RemotingException, InterruptedException, MQBrokerException { return defaultMQAdminExtImpl.examineTopicConfig(addr, topic); } diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java index d0e65ba7f63e1f4beab45fff3731c7d6be0c3b3a..66ecc54279e2910ffa600abe0aef678c7b1be3a1 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java @@ -224,8 +224,9 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { } @Override - public TopicConfig examineTopicConfig(String addr, String topic) { - return null; + public TopicConfig examineTopicConfig(String addr, String topic) throws RemotingException, InterruptedException, MQBrokerException { + TopicConfigSerializeWrapper topicConfigSerializeWrapper = this.mqClientInstance.getMQClientAPIImpl().getAllTopicConfig(addr,timeoutMillis); + return topicConfigSerializeWrapper.getTopicConfigTable().get(topic); } @Override diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java index d7010561a7ff0c6357ab8b8a62b5ea95cccfb247..94791d071e6494581a2e81ed7c74dfdf44e5ed00 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java @@ -92,7 +92,7 @@ public interface MQAdminExt extends MQAdmin { SubscriptionGroupConfig examineSubscriptionGroupConfig(final String addr, final String group) throws InterruptedException, RemotingException, MQClientException, MQBrokerException; - TopicConfig examineTopicConfig(final String addr, final String topic); + TopicConfig examineTopicConfig(final String addr, final String topic) throws RemotingException, InterruptedException, MQBrokerException; TopicStatsTable examineTopicStats( final String topic) throws RemotingException, MQClientException, InterruptedException, diff --git a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java index 8a29e0f34ec22b03e87d3275d890557945ab449c..5c69774c11c2329a5edb31c1a73c1fe768d1a300 100644 --- a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java +++ b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java @@ -34,6 +34,7 @@ import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.impl.MQClientAPIImpl; import org.apache.rocketmq.client.impl.MQClientManager; import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.admin.ConsumeStats; import org.apache.rocketmq.common.admin.OffsetWrapper; import org.apache.rocketmq.common.admin.TopicOffset; @@ -54,6 +55,7 @@ import org.apache.rocketmq.common.protocol.body.ProcessQueueInfo; import org.apache.rocketmq.common.protocol.body.ProducerConnection; import org.apache.rocketmq.common.protocol.body.QueueTimeSpan; import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper; +import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper; import org.apache.rocketmq.common.protocol.body.TopicList; import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; @@ -226,6 +228,14 @@ public class DefaultMQAdminExtTest { consumerRunningInfo.setStatusTable(new TreeMap()); consumerRunningInfo.setSubscriptionSet(new TreeSet()); when(mQClientAPIImpl.getConsumerRunningInfo(anyString(), anyString(), anyString(), anyBoolean(), anyLong())).thenReturn(consumerRunningInfo); + + TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper(); + topicConfigSerializeWrapper.setTopicConfigTable(new ConcurrentHashMap() { + { + put("topic_test_examine_topicConfig", new TopicConfig("topic_test_examine_topicConfig")); + } + }); + when(mQClientAPIImpl.getAllTopicConfig(anyString(),anyLong())).thenReturn(topicConfigSerializeWrapper); } @AfterClass @@ -413,4 +423,10 @@ public class DefaultMQAdminExtTest { assertThat(subscriptionGroupWrapper.getSubscriptionGroupTable().get("Consumer-group-one").getGroupName()).isEqualTo("Consumer-group-one"); assertThat(subscriptionGroupWrapper.getSubscriptionGroupTable().get("Consumer-group-one").isConsumeBroadcastEnable()).isTrue(); } + + @Test + public void testExamineTopicConfig() throws MQBrokerException, RemotingException, InterruptedException { + TopicConfig topicConfig = defaultMQAdminExt.examineTopicConfig("127.0.0.1:10911", "topic_test_examine_topicConfig"); + assertThat(topicConfig.getTopicName().equals("topic_test_examine_topicConfig")); + } } \ No newline at end of file