From 46bd4be66b1566e3386dc3f1d55b6cfb6ac207ce Mon Sep 17 00:00:00 2001 From: zhangjidi2016 <1017543663@qq.com> Date: Wed, 2 Jun 2021 09:52:17 +0800 Subject: [PATCH] [ISSUE #2964] Add a query consumer config command in mqadmin. --- .../tools/admin/DefaultMQAdminExt.java | 3 +- .../tools/admin/DefaultMQAdminExtImpl.java | 6 +- .../rocketmq/tools/admin/MQAdminExt.java | 2 +- .../tools/command/MQAdminStartup.java | 2 + .../consumer/GetConsumerConfigSubCommand.java | 146 ++++++++++++++++++ .../GetConsumerConfigSubCommandTest.java | 83 ++++++++++ 6 files changed, 238 insertions(+), 4 deletions(-) create mode 100644 tools/src/main/java/org/apache/rocketmq/tools/command/consumer/GetConsumerConfigSubCommand.java create mode 100644 tools/src/test/java/org/apache/rocketmq/tools/command/consumer/GetConsumerConfigSubCommandTest.java diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java index 65926390..4b83bb41 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java @@ -203,7 +203,8 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt { } @Override - public SubscriptionGroupConfig examineSubscriptionGroupConfig(String addr, String group) { + public SubscriptionGroupConfig examineSubscriptionGroupConfig(String addr, String group) + throws InterruptedException, RemotingException, MQClientException, MQBrokerException { return defaultMQAdminExtImpl.examineSubscriptionGroupConfig(addr, group); } diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java index 8930bbe4..543f265f 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java @@ -217,8 +217,10 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { } @Override - public SubscriptionGroupConfig examineSubscriptionGroupConfig(String addr, String group) { - return null; + public SubscriptionGroupConfig examineSubscriptionGroupConfig(String addr, String group) + throws InterruptedException, RemotingException, MQClientException, MQBrokerException { + SubscriptionGroupWrapper wrapper = this.mqClientInstance.getMQClientAPIImpl().getAllSubscriptionGroup(addr, timeoutMillis); + return wrapper.getSubscriptionGroupTable().get(group); } @Override diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java index d5462cb0..188f14d2 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java @@ -90,7 +90,7 @@ public interface MQAdminExt extends MQAdmin { final SubscriptionGroupConfig config) throws RemotingException, MQBrokerException, InterruptedException, MQClientException; - SubscriptionGroupConfig examineSubscriptionGroupConfig(final String addr, final String group); + SubscriptionGroupConfig examineSubscriptionGroupConfig(final String addr, final String group) throws InterruptedException, RemotingException, MQClientException, MQBrokerException; TopicConfig examineTopicConfig(final String addr, final String topic); diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java index f9477445..a055f7e3 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java @@ -49,6 +49,7 @@ import org.apache.rocketmq.tools.command.connection.ProducerConnectionSubCommand import org.apache.rocketmq.tools.command.consumer.ConsumerProgressSubCommand; import org.apache.rocketmq.tools.command.consumer.ConsumerStatusSubCommand; import org.apache.rocketmq.tools.command.consumer.DeleteSubscriptionGroupCommand; +import org.apache.rocketmq.tools.command.consumer.GetConsumerConfigSubCommand; import org.apache.rocketmq.tools.command.consumer.StartMonitoringSubCommand; import org.apache.rocketmq.tools.command.consumer.UpdateSubGroupSubCommand; import org.apache.rocketmq.tools.command.message.CheckMsgSendRTCommand; @@ -202,6 +203,7 @@ public class MQAdminStartup { initCommand(new GetNamesrvConfigCommand()); initCommand(new UpdateNamesrvConfigCommand()); initCommand(new GetBrokerConfigCommand()); + initCommand(new GetConsumerConfigSubCommand()); initCommand(new QueryConsumeQueueCommand()); initCommand(new SendMessageCommand()); diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/GetConsumerConfigSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/GetConsumerConfigSubCommand.java new file mode 100644 index 00000000..be2b9466 --- /dev/null +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/GetConsumerConfigSubCommand.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tools.command.consumer; + +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.collections.CollectionUtils; +import org.apache.rocketmq.common.protocol.body.ClusterInfo; +import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; +import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.apache.rocketmq.tools.command.SubCommand; +import org.apache.rocketmq.tools.command.SubCommandException; + +public class GetConsumerConfigSubCommand implements SubCommand { + + @Override + public String commandName() { + return "getConsumerConfig"; + } + + @Override + public String commandDesc() { + return "Get consumer config by subscription group name!"; + } + + @Override + public Options buildCommandlineOptions(final Options options) { + Option opt = new Option("g", "groupName", true, "subscription group name"); + opt.setRequired(true); + options.addOption(opt); + return options; + } + + @Override + public void execute(CommandLine commandLine, Options options, + RPCHook rpcHook) throws SubCommandException { + DefaultMQAdminExt adminExt = new DefaultMQAdminExt(rpcHook); + adminExt.setInstanceName(Long.toString(System.currentTimeMillis())); + String groupName = commandLine.getOptionValue('g').trim(); + try { + adminExt.start(); + List consumerConfigInfoList = new ArrayList<>(); + ClusterInfo clusterInfo = adminExt.examineBrokerClusterInfo(); + Map> clusterAddrTable = clusterInfo.getClusterAddrTable(); + for (String brokerName : clusterInfo.getBrokerAddrTable().keySet()) { + String clusterName = this.getClusterName(brokerName, clusterAddrTable); + String brokerAddress = clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(); + SubscriptionGroupConfig subscriptionGroupConfig = adminExt.examineSubscriptionGroupConfig(brokerAddress, groupName); + if (subscriptionGroupConfig == null) { + continue; + } + consumerConfigInfoList.add(new ConsumerConfigInfo(clusterName, brokerName, subscriptionGroupConfig)); + } + if (CollectionUtils.isEmpty(consumerConfigInfoList)) { + return; + } + for (ConsumerConfigInfo info : consumerConfigInfoList) { + System.out.printf("=============================%s:%s=============================\n", + info.getClusterName(), info.getBrokerName()); + SubscriptionGroupConfig config = info.getSubscriptionGroupConfig(); + Field[] fields = config.getClass().getDeclaredFields(); + for (Field field : fields) { + field.setAccessible(true); + if (field.get(config) != null) { + System.out.printf("%s%-40s= %s\n", "", field.getName(), field.get(config).toString()); + } else { + System.out.printf("%s%-40s= %s\n", "", field.getName(), ""); + } + } + } + } catch (Exception e) { + throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e); + } finally { + adminExt.shutdown(); + } + } + + private String getClusterName(String brokeName, Map> clusterAddrTable) { + for (Map.Entry> entry : clusterAddrTable.entrySet()) { + Set brokerNameSet = entry.getValue(); + if (brokerNameSet.contains(brokeName)) { + return entry.getKey(); + } + } + return null; + } +} + +class ConsumerConfigInfo { + private String clusterName; + + private String brokerName; + + private SubscriptionGroupConfig subscriptionGroupConfig; + + public ConsumerConfigInfo(String clusterName, String brokerName, SubscriptionGroupConfig subscriptionGroupConfig) { + this.clusterName = clusterName; + this.brokerName = brokerName; + this.subscriptionGroupConfig = subscriptionGroupConfig; + } + + public String getClusterName() { + return clusterName; + } + + public void setClusterName(String clusterName) { + this.clusterName = clusterName; + } + + public String getBrokerName() { + return brokerName; + } + + public void setBrokerName(String brokerNameList) { + this.brokerName = brokerName; + } + + public SubscriptionGroupConfig getSubscriptionGroupConfig() { + return subscriptionGroupConfig; + } + + public void setSubscriptionGroupConfig(SubscriptionGroupConfig subscriptionGroupConfig) { + this.subscriptionGroupConfig = subscriptionGroupConfig; + } +} diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/consumer/GetConsumerConfigSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/consumer/GetConsumerConfigSubCommandTest.java new file mode 100644 index 00000000..1ec68ff0 --- /dev/null +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/consumer/GetConsumerConfigSubCommandTest.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tools.command.consumer; + +import java.io.UnsupportedEncodingException; +import java.lang.reflect.Field; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.apache.rocketmq.client.ClientConfig; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.impl.MQClientAPIImpl; +import org.apache.rocketmq.client.impl.MQClientManager; +import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.apache.rocketmq.remoting.exception.RemotingConnectException; +import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; +import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; +import org.apache.rocketmq.srvutil.ServerUtil; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl; +import org.apache.rocketmq.tools.command.SubCommandException; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; + +import static org.mockito.Mockito.mock; + +public class GetConsumerConfigSubCommandTest { + private static DefaultMQAdminExt defaultMQAdminExt; + private static DefaultMQAdminExtImpl defaultMQAdminExtImpl; + private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig()); + private static MQClientAPIImpl mQClientAPIImpl; + + @BeforeClass + public static void init() throws NoSuchFieldException, IllegalAccessException, InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException, MQBrokerException, UnsupportedEncodingException { + mQClientAPIImpl = mock(MQClientAPIImpl.class); + defaultMQAdminExt = new DefaultMQAdminExt(); + defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 1000); + + Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance"); + field.setAccessible(true); + field.set(defaultMQAdminExtImpl, mqClientInstance); + field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl"); + field.setAccessible(true); + field.set(mqClientInstance, mQClientAPIImpl); + field = DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl"); + field.setAccessible(true); + field.set(defaultMQAdminExt, defaultMQAdminExtImpl); + } + + @AfterClass + public static void terminate() { + defaultMQAdminExt.shutdown(); + } + + @Ignore + @Test + public void testExecute() throws SubCommandException { + System.setProperty("rocketmq.namesrv.addr", "127.0.0.1:9876"); + GetConsumerConfigSubCommand cmd = new GetConsumerConfigSubCommand(); + Options options = ServerUtil.buildCommandlineOptions(new Options()); + String[] subargs = new String[] {"-g group_test"}; + final CommandLine commandLine = + ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser()); + cmd.execute(commandLine, options, null); + } +} \ No newline at end of file -- GitLab