From e6768baae7a1e6514bbc82e518a21d8c42da13a0 Mon Sep 17 00:00:00 2001 From: HeChengyang Date: Wed, 23 Jun 2021 15:41:31 +0800 Subject: [PATCH] [ISSUE #2962] Implement DefaultMQAdminExt::examineTopicConfig function --- .../rocketmq/tools/admin/DefaultMQAdminExt.java | 2 +- .../tools/admin/DefaultMQAdminExtImpl.java | 5 +++-- .../apache/rocketmq/tools/admin/MQAdminExt.java | 2 +- .../tools/admin/DefaultMQAdminExtTest.java | 16 ++++++++++++++++ 4 files changed, 21 insertions(+), 4 deletions(-) diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java index 65926390..cac64f02 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java @@ -208,7 +208,7 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt { } @Override - public TopicConfig examineTopicConfig(String addr, String topic) { + public TopicConfig examineTopicConfig(String addr, String topic) throws RemotingException, InterruptedException, MQBrokerException { return defaultMQAdminExtImpl.examineTopicConfig(addr, topic); } diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java index 8930bbe4..e2295631 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java @@ -222,8 +222,9 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { } @Override - public TopicConfig examineTopicConfig(String addr, String topic) { - return null; + public TopicConfig examineTopicConfig(String addr, String topic) throws RemotingException, InterruptedException, MQBrokerException { + TopicConfigSerializeWrapper topicConfigSerializeWrapper = this.mqClientInstance.getMQClientAPIImpl().getAllTopicConfig(addr,timeoutMillis); + return topicConfigSerializeWrapper.getTopicConfigTable().get(topic); } @Override diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java index d5462cb0..b7527bed 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java @@ -92,7 +92,7 @@ public interface MQAdminExt extends MQAdmin { SubscriptionGroupConfig examineSubscriptionGroupConfig(final String addr, final String group); - TopicConfig examineTopicConfig(final String addr, final String topic); + TopicConfig examineTopicConfig(final String addr, final String topic) throws RemotingException, InterruptedException, MQBrokerException; TopicStatsTable examineTopicStats( final String topic) throws RemotingException, MQClientException, InterruptedException, diff --git a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java index 3146b178..bfb9d674 100644 --- a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java +++ b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java @@ -34,6 +34,7 @@ import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.impl.MQClientAPIImpl; import org.apache.rocketmq.client.impl.MQClientManager; import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.admin.ConsumeStats; import org.apache.rocketmq.common.admin.OffsetWrapper; import org.apache.rocketmq.common.admin.TopicOffset; @@ -54,6 +55,7 @@ import org.apache.rocketmq.common.protocol.body.ProcessQueueInfo; import org.apache.rocketmq.common.protocol.body.ProducerConnection; import org.apache.rocketmq.common.protocol.body.QueueTimeSpan; import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper; +import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper; import org.apache.rocketmq.common.protocol.body.TopicList; import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; @@ -225,6 +227,14 @@ public class DefaultMQAdminExtTest { consumerRunningInfo.setStatusTable(new TreeMap()); consumerRunningInfo.setSubscriptionSet(new TreeSet()); when(mQClientAPIImpl.getConsumerRunningInfo(anyString(), anyString(), anyString(), anyBoolean(), anyLong())).thenReturn(consumerRunningInfo); + + TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper(); + topicConfigSerializeWrapper.setTopicConfigTable(new ConcurrentHashMap() { + { + put("topic_test_examine_topicConfig", new TopicConfig("topic_test_examine_topicConfig")); + } + }); + when(mQClientAPIImpl.getAllTopicConfig(anyString(),anyLong())).thenReturn(topicConfigSerializeWrapper); } @AfterClass @@ -406,4 +416,10 @@ public class DefaultMQAdminExtTest { assertThat(subscriptionGroupWrapper.getSubscriptionGroupTable().get("Consumer-group-one").getGroupName()).isEqualTo("Consumer-group-one"); assertThat(subscriptionGroupWrapper.getSubscriptionGroupTable().get("Consumer-group-one").isConsumeBroadcastEnable()).isTrue(); } + + @Test + public void testExamineTopicConfig() throws MQBrokerException, RemotingException, InterruptedException { + TopicConfig topicConfig = defaultMQAdminExt.examineTopicConfig("127.0.0.1:10911", "topic_test_examine_topicConfig"); + assertThat(topicConfig.getTopicName().equals("topic_test_examine_topicConfig")); + } } \ No newline at end of file -- GitLab