From cd45023d61c6184cf798a66e9d83e00cd8c8f4be Mon Sep 17 00:00:00 2001 From: panzhi Date: Wed, 13 Oct 2021 19:47:51 +0800 Subject: [PATCH] [ISSUE #3148]Support metadata export (#3149) --- distribution/bin/export.sh | 89 ++++++ .../tools/admin/DefaultMQAdminExt.java | 18 +- .../tools/admin/DefaultMQAdminExtImpl.java | 57 +++- .../rocketmq/tools/admin/MQAdminExt.java | 10 +- .../rocketmq/tools/command/CommandUtil.java | 1 - .../tools/command/MQAdminStartup.java | 7 + .../command/export/ExportConfigsCommand.java | 128 ++++++++ .../command/export/ExportMetadataCommand.java | 184 ++++++++++++ .../command/export/ExportMetricsCommand.java | 282 ++++++++++++++++++ 9 files changed, 771 insertions(+), 5 deletions(-) create mode 100644 distribution/bin/export.sh create mode 100644 tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportConfigsCommand.java create mode 100644 tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetadataCommand.java create mode 100644 tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetricsCommand.java diff --git a/distribution/bin/export.sh b/distribution/bin/export.sh new file mode 100644 index 00000000..2b323e8b --- /dev/null +++ b/distribution/bin/export.sh @@ -0,0 +1,89 @@ +#!/bin/bash + +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +if [ -z "$ROCKETMQ_HOME" ]; then + ## resolve links - $0 may be a link to maven's home + PRG="$0" + + # need this for relative symlinks + while [ -h "$PRG" ]; do + ls=$(ls -ld "$PRG") + link=$(expr "$ls" : '.*-> \(.*\)$') + if expr "$link" : '/.*' >/dev/null; then + PRG="$link" + else + PRG="$(dirname "$PRG")/$link" + fi + done + + saveddir=$(pwd) + + ROCKETMQ_HOME=$(dirname "$PRG")/.. + + # make it fully qualified + ROCKETMQ_HOME=$(cd "$ROCKETMQ_HOME" && pwd) + + cd "$saveddir" +fi + +export ROCKETMQ_HOME + +namesrvAddr= +while [ -z "${namesrvAddr}" ]; do + read -p "Enter name server address list:" namesrvAddr +done + +clusterName= +while [ -z "${clusterName}" ]; do + read -p "Choose a cluster to export:" clusterName +done + +read -p "Enter file path to export [default /tmp/rocketmq/export]:" filePath +if [ -z "${filePath}" ]; then + filePath="/tmp/rocketmq/config" +fi + +if [[ -e ${filePath} ]]; then + rm -rf ${filePath} +fi + +sh ${ROCKETMQ_HOME}/bin/mqadmin exportMetrics -c ${clusterName} -n ${namesrvAddr} -f ${filePath} +sh ${ROCKETMQ_HOME}/bin/mqadmin exportConfigs -c ${clusterName} -n ${namesrvAddr} -f ${filePath} +sh ${ROCKETMQ_HOME}/bin/mqadmin exportMetadata -c ${clusterName} -n ${namesrvAddr} -f ${filePath} + +cd ${filePath} || exit + +configs=$(cat ./configs.json) +if [ -z "$configs" ]; then + configs="{}" +fi +metadata=$(cat ./metadata.json) +if [ -z "$metadata" ]; then + metadata="{}" +fi +metrics=$(cat ./metrics.json) +if [ -z "$metrics" ]; then + metrics="{}" +fi + +echo "{ + \"configs\": ${configs}, + \"metadata\": ${metadata}, + \"metrics\": ${metrics} + }" >rocketmq-metadata-export.json + +echo -e "[INFO] The RocketMQ metadata has been exported to the file:${filePath}/rocketmq-metadata-export.json" diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java index a0072258..c3e3a30d 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java @@ -501,10 +501,24 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt { } @Override - public TopicConfigSerializeWrapper getAllTopicGroup(final String brokerAddr, + public SubscriptionGroupWrapper getUserSubscriptionGroup(final String brokerAddr, long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException { - return this.defaultMQAdminExtImpl.getAllTopicGroup(brokerAddr, timeoutMillis); + return this.defaultMQAdminExtImpl.getUserSubscriptionGroup(brokerAddr, timeoutMillis); + } + + @Override + public TopicConfigSerializeWrapper getAllTopicConfig(final String brokerAddr, + long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, + RemotingConnectException, MQBrokerException { + return this.defaultMQAdminExtImpl.getAllTopicConfig(brokerAddr, timeoutMillis); + } + + @Override + public TopicConfigSerializeWrapper getUserTopicConfig(final String brokerAddr, final boolean specialTopic, + long timeoutMillis) throws InterruptedException, RemotingException, + MQBrokerException, MQClientException { + return this.defaultMQAdminExtImpl.getUserTopicConfig(brokerAddr, specialTopic, timeoutMillis); } /* (non-Javadoc) diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java index 66ecc542..80999c24 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java @@ -99,6 +99,24 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { private long timeoutMillis = 20000; private Random random = new Random(); + private static final Set SYSTEM_GROUP_SET = new HashSet(); + + static { + SYSTEM_GROUP_SET.add(MixAll.DEFAULT_CONSUMER_GROUP); + SYSTEM_GROUP_SET.add(MixAll.DEFAULT_PRODUCER_GROUP); + SYSTEM_GROUP_SET.add(MixAll.TOOLS_CONSUMER_GROUP); + SYSTEM_GROUP_SET.add(MixAll.FILTERSRV_CONSUMER_GROUP); + SYSTEM_GROUP_SET.add(MixAll.MONITOR_CONSUMER_GROUP); + SYSTEM_GROUP_SET.add(MixAll.CLIENT_INNER_PRODUCER_GROUP); + SYSTEM_GROUP_SET.add(MixAll.SELF_TEST_PRODUCER_GROUP); + SYSTEM_GROUP_SET.add(MixAll.SELF_TEST_CONSUMER_GROUP); + SYSTEM_GROUP_SET.add(MixAll.ONS_HTTP_PROXY_GROUP); + SYSTEM_GROUP_SET.add(MixAll.CID_ONSAPI_PERMISSION_GROUP); + SYSTEM_GROUP_SET.add(MixAll.CID_ONSAPI_OWNER_GROUP); + SYSTEM_GROUP_SET.add(MixAll.CID_ONSAPI_PULL_GROUP); + SYSTEM_GROUP_SET.add(MixAll.CID_SYS_RMQ_TRANS); + } + public DefaultMQAdminExtImpl(DefaultMQAdminExt defaultMQAdminExt, long timeoutMillis) { this(defaultMQAdminExt, null, timeoutMillis); } @@ -964,12 +982,49 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { } @Override - public TopicConfigSerializeWrapper getAllTopicGroup(final String brokerAddr, + public SubscriptionGroupWrapper getUserSubscriptionGroup(final String brokerAddr, + long timeoutMillis) throws InterruptedException, + RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException { + SubscriptionGroupWrapper subscriptionGroupWrapper = this.mqClientInstance.getMQClientAPIImpl() + .getAllSubscriptionGroup(brokerAddr, timeoutMillis); + + Iterator> iterator = subscriptionGroupWrapper.getSubscriptionGroupTable() + .entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry configEntry = iterator.next(); + if (MixAll.isSysConsumerGroup(configEntry.getKey()) || SYSTEM_GROUP_SET.contains(configEntry.getKey())) { + iterator.remove(); + } + } + + return subscriptionGroupWrapper; + } + + @Override + public TopicConfigSerializeWrapper getAllTopicConfig(final String brokerAddr, long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException { return this.mqClientInstance.getMQClientAPIImpl().getAllTopicConfig(brokerAddr, timeoutMillis); } + @Override + public TopicConfigSerializeWrapper getUserTopicConfig(final String brokerAddr, final boolean specialTopic, + long timeoutMillis) throws InterruptedException, RemotingException, + MQBrokerException, MQClientException { + TopicConfigSerializeWrapper topicConfigSerializeWrapper = this.getAllTopicConfig(brokerAddr, timeoutMillis); + TopicList topicList = this.mqClientInstance.getMQClientAPIImpl().getSystemTopicListFromBroker(brokerAddr, + timeoutMillis); + Iterator> iterator = topicConfigSerializeWrapper.getTopicConfigTable().entrySet() + .iterator(); + while (iterator.hasNext()) { + String topic = iterator.next().getKey(); + if (topicList.getTopicList().contains(topic) || (!specialTopic && (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) || topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX)))) { + iterator.remove(); + } + } + return topicConfigSerializeWrapper; + } + @Override public void createTopic(String key, String newTopic, int queueNum) throws MQClientException { createTopic(key, newTopic, queueNum, 0); diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java index 94791d07..82c1cbd3 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java @@ -238,10 +238,18 @@ public interface MQAdminExt extends MQAdmin { long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException; - TopicConfigSerializeWrapper getAllTopicGroup(final String brokerAddr, + SubscriptionGroupWrapper getUserSubscriptionGroup(final String brokerAddr, long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException; + TopicConfigSerializeWrapper getAllTopicConfig(final String brokerAddr, + long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, + RemotingConnectException, MQBrokerException; + + TopicConfigSerializeWrapper getUserTopicConfig(final String brokerAddr, final boolean specialTopic, + long timeoutMillis) throws InterruptedException, RemotingException, + MQBrokerException, MQClientException; + void updateConsumeOffset(String brokerAddr, String consumeGroup, MessageQueue mq, long offset) throws RemotingException, InterruptedException, MQBrokerException; diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/CommandUtil.java b/tools/src/main/java/org/apache/rocketmq/tools/command/CommandUtil.java index 2e65f980..8984ca67 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/CommandUtil.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/CommandUtil.java @@ -146,5 +146,4 @@ public class CommandUtil { } throw new Exception(ERROR_MESSAGE); } - } diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java index 4411a6cc..e8572bd7 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java @@ -52,6 +52,8 @@ import org.apache.rocketmq.tools.command.consumer.DeleteSubscriptionGroupCommand import org.apache.rocketmq.tools.command.consumer.GetConsumerConfigSubCommand; import org.apache.rocketmq.tools.command.consumer.StartMonitoringSubCommand; import org.apache.rocketmq.tools.command.consumer.UpdateSubGroupSubCommand; +import org.apache.rocketmq.tools.command.export.ExportMetricsCommand; +import org.apache.rocketmq.tools.command.export.ExportConfigsCommand; import org.apache.rocketmq.tools.command.message.CheckMsgSendRTCommand; import org.apache.rocketmq.tools.command.message.ConsumeMessageCommand; import org.apache.rocketmq.tools.command.message.PrintMessageByQueueCommand; @@ -74,6 +76,7 @@ import org.apache.rocketmq.tools.command.queue.QueryConsumeQueueCommand; import org.apache.rocketmq.tools.command.stats.StatsAllSubCommand; import org.apache.rocketmq.tools.command.topic.AllocateMQSubCommand; import org.apache.rocketmq.tools.command.topic.DeleteTopicSubCommand; +import org.apache.rocketmq.tools.command.export.ExportMetadataCommand; import org.apache.rocketmq.tools.command.topic.TopicClusterSubCommand; import org.apache.rocketmq.tools.command.topic.TopicListSubCommand; import org.apache.rocketmq.tools.command.topic.TopicRouteSubCommand; @@ -217,6 +220,10 @@ public class MQAdminStartup { initCommand(new ClusterAclConfigVersionListSubCommand()); initCommand(new UpdateGlobalWhiteAddrSubCommand()); initCommand(new GetAccessConfigSubCommand()); + + initCommand(new ExportMetadataCommand()); + initCommand(new ExportConfigsCommand()); + initCommand(new ExportMetricsCommand()); } private static void initLogback() throws JoranException { diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportConfigsCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportConfigsCommand.java new file mode 100644 index 00000000..c3ca9d31 --- /dev/null +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportConfigsCommand.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tools.command.export; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import com.alibaba.fastjson.JSON; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.apache.rocketmq.tools.command.CommandUtil; +import org.apache.rocketmq.tools.command.SubCommand; +import org.apache.rocketmq.tools.command.SubCommandException; + +public class ExportConfigsCommand implements SubCommand { + @Override + public String commandName() { + return "exportConfigs"; + } + + @Override + public String commandDesc() { + return "export configs"; + } + + @Override + public Options buildCommandlineOptions(Options options) { + Option opt = new Option("c", "clusterName", true, "choose a cluster to export"); + opt.setRequired(true); + options.addOption(opt); + + opt = new Option("f", "filePath", true, + "export configs.json path | default /tmp/rocketmq/export"); + opt.setRequired(false); + options.addOption(opt); + return options; + } + + @Override + public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) + throws SubCommandException { + DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); + defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); + + try { + String clusterName = commandLine.getOptionValue('c').trim(); + String filePath = !commandLine.hasOption('f') ? "/tmp/rocketmq/export" : commandLine.getOptionValue('f') + .trim(); + + defaultMQAdminExt.start(); + Map result = new HashMap<>(); + // name servers + List nameServerAddressList = defaultMQAdminExt.getNameServerAddressList(); + + //broker + int masterBrokerSize = 0; + int slaveBrokerSize = 0; + Map brokerConfigs = new HashMap<>(); + Map> masterAndSlaveMap + = CommandUtil.fetchMasterAndSlaveDistinguish(defaultMQAdminExt, clusterName); + for (String masterAddr : masterAndSlaveMap.keySet()) { + Properties masterProperties = defaultMQAdminExt.getBrokerConfig(masterAddr); + masterBrokerSize++; + slaveBrokerSize += masterAndSlaveMap.get(masterAddr).size(); + + brokerConfigs.put(masterProperties.getProperty("brokerName"), needBrokerProprties(masterProperties)); + } + + Map clusterScaleMap = new HashMap<>(); + clusterScaleMap.put("namesrvSize", nameServerAddressList.size()); + clusterScaleMap.put("masterBrokerSize", masterBrokerSize); + clusterScaleMap.put("slaveBrokerSize", slaveBrokerSize); + + result.put("brokerConfigs", brokerConfigs); + result.put("clusterScale", clusterScaleMap); + + String path = filePath + "/configs.json"; + MixAll.string2FileNotSafe(JSON.toJSONString(result, true), path); + System.out.printf("export %s success", path); + } catch (Exception e) { + throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e); + } finally { + defaultMQAdminExt.shutdown(); + } + } + + private Properties needBrokerProprties(Properties properties) { + Properties newProperties = new Properties(); + newProperties.setProperty("brokerClusterName", properties.getProperty("brokerClusterName")); + newProperties.setProperty("brokerId", properties.getProperty("brokerId")); + newProperties.setProperty("brokerName", properties.getProperty("brokerName")); + newProperties.setProperty("brokerRole", properties.getProperty("brokerRole")); + newProperties.setProperty("fileReservedTime", properties.getProperty("fileReservedTime")); + newProperties.setProperty("filterServerNums", properties.getProperty("filterServerNums")); + newProperties.setProperty("flushDiskType", properties.getProperty("flushDiskType")); + newProperties.setProperty("maxMessageSize", properties.getProperty("maxMessageSize")); + newProperties.setProperty("messageDelayLevel", properties.getProperty("messageDelayLevel")); + newProperties.setProperty("msgTraceTopicName", properties.getProperty("msgTraceTopicName")); + newProperties.setProperty("slaveReadEnable", properties.getProperty("slaveReadEnable")); + newProperties.setProperty("traceOn", properties.getProperty("traceOn")); + newProperties.setProperty("traceTopicEnable", properties.getProperty("traceTopicEnable")); + newProperties.setProperty("useTLS", properties.getProperty("useTLS")); + newProperties.setProperty("autoCreateTopicEnable", properties.getProperty("autoCreateTopicEnable")); + newProperties.setProperty("autoCreateSubscriptionGroup", properties.getProperty("autoCreateSubscriptionGroup")); + return newProperties; + } +} diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetadataCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetadataCommand.java new file mode 100644 index 00000000..19094364 --- /dev/null +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetadataCommand.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tools.command.export; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import com.alibaba.fastjson.JSON; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.rocketmq.common.MQVersion; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper; +import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper; +import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; +import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.srvutil.ServerUtil; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.apache.rocketmq.tools.command.CommandUtil; +import org.apache.rocketmq.tools.command.SubCommand; +import org.apache.rocketmq.tools.command.SubCommandException; + +public class ExportMetadataCommand implements SubCommand { + + private static final String DEFAULT_FILE_PATH = "/tmp/rocketmq/export"; + + @Override + public String commandName() { + return "exportMetadata"; + } + + @Override + public String commandDesc() { + return "export metadata"; + } + + @Override + public Options buildCommandlineOptions(Options options) { + Option opt = new Option("c", "clusterName", true, "choose a cluster to export"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("b", "brokerAddr", true, "choose a broker to export"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("f", "filePath", true, "export metadata.json path | default /tmp/rocketmq/export"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("t", "topic", false, "only export topic metadata"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("g", "subscriptionGroup", false, "only export subscriptionGroup metadata"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("s", "specialTopic", false, "need retryTopic and dlqTopic"); + opt.setRequired(false); + options.addOption(opt); + return options; + } + + @Override + public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) + throws SubCommandException { + DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); + + defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); + + try { + defaultMQAdminExt.start(); + + String filePath = !commandLine.hasOption('f') ? DEFAULT_FILE_PATH : commandLine.getOptionValue('f') + .trim(); + + boolean specialTopic = commandLine.hasOption('s'); + + if (commandLine.hasOption('b')) { + final String brokerAddr = commandLine.getOptionValue('b').trim(); + + if (commandLine.hasOption('t')) { + filePath = filePath + "/topic.json"; + TopicConfigSerializeWrapper topicConfigSerializeWrapper = defaultMQAdminExt.getUserTopicConfig( + brokerAddr, specialTopic, 10000L); + MixAll.string2FileNotSafe(JSON.toJSONString(topicConfigSerializeWrapper, true), filePath); + System.out.printf("export %s success", filePath); + } else if (commandLine.hasOption('g')) { + filePath = filePath + "/subscriptionGroup.json"; + SubscriptionGroupWrapper subscriptionGroupWrapper = defaultMQAdminExt.getUserSubscriptionGroup( + brokerAddr, 10000L); + MixAll.string2FileNotSafe(JSON.toJSONString(subscriptionGroupWrapper, true), filePath); + System.out.printf("export %s success", filePath); + } + } else if (commandLine.hasOption('c')) { + String clusterName = commandLine.getOptionValue('c').trim(); + + Set masterSet = + CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName); + + Map topicConfigMap = new HashMap<>(); + Map subGroupConfigMap = new HashMap<>(); + + for (String addr : masterSet) { + TopicConfigSerializeWrapper topicConfigSerializeWrapper = defaultMQAdminExt.getUserTopicConfig( + addr, specialTopic, 10000L); + + SubscriptionGroupWrapper subscriptionGroupWrapper = defaultMQAdminExt.getUserSubscriptionGroup( + addr, 10000); + + if (commandLine.hasOption('t')) { + filePath = filePath + "/topic.json"; + MixAll.string2FileNotSafe(JSON.toJSONString(topicConfigSerializeWrapper, true), filePath); + System.out.printf("export %s success", filePath); + return; + } else if (commandLine.hasOption('g')) { + filePath = filePath + "/subscriptionGroup.json"; + MixAll.string2FileNotSafe(JSON.toJSONString(subscriptionGroupWrapper, true), filePath); + System.out.printf("export %s success", filePath); + return; + } else { + for (Map.Entry entry : topicConfigSerializeWrapper.getTopicConfigTable().entrySet()) { + TopicConfig topicConfig = topicConfigMap.get(entry.getKey()); + if (null != topicConfig) { + entry.getValue().setWriteQueueNums( + topicConfig.getWriteQueueNums() + entry.getValue().getWriteQueueNums()); + entry.getValue().setReadQueueNums( + topicConfig.getReadQueueNums() + entry.getValue().getReadQueueNums()); + } + topicConfigMap.put(entry.getKey(), entry.getValue()); + } + + for (Map.Entry entry : subscriptionGroupWrapper.getSubscriptionGroupTable().entrySet()) { + + SubscriptionGroupConfig subscriptionGroupConfig = subGroupConfigMap.get(entry.getKey()); + if (null != subscriptionGroupConfig) { + entry.getValue().setRetryQueueNums( + subscriptionGroupConfig.getRetryQueueNums() + entry.getValue().getRetryQueueNums()); + } + subGroupConfigMap.put(entry.getKey(), entry.getValue()); + } + + Map result = new HashMap<>(); + result.put("topicConfigTable", topicConfigMap); + result.put("subscriptionGroupTable", subGroupConfigMap); + result.put("rocketmqVersion", MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION)); + result.put("exportTime", System.currentTimeMillis()); + + filePath = filePath + "/metadata.json"; + MixAll.string2FileNotSafe(JSON.toJSONString(result, true), filePath); + System.out.printf("export %s success", filePath); + } + + } + } else { + ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options); + } + } catch (Exception e) { + throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e); + } finally { + defaultMQAdminExt.shutdown(); + } + } +} + diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetricsCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetricsCommand.java new file mode 100644 index 00000000..2b7db0b1 --- /dev/null +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetricsCommand.java @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tools.command.export; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +import com.alibaba.fastjson.JSON; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.MQVersion; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.protocol.body.BrokerStatsData; +import org.apache.rocketmq.common.protocol.body.ClusterInfo; +import org.apache.rocketmq.common.protocol.body.Connection; +import org.apache.rocketmq.common.protocol.body.ConsumerConnection; +import org.apache.rocketmq.common.protocol.body.KVTable; +import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper; +import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper; +import org.apache.rocketmq.common.protocol.route.BrokerData; +import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; +import org.apache.rocketmq.common.topic.TopicValidator; +import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.apache.rocketmq.store.stats.BrokerStatsManager; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.apache.rocketmq.tools.command.SubCommand; +import org.apache.rocketmq.tools.command.SubCommandException; +import org.apache.rocketmq.tools.command.stats.StatsAllSubCommand; + +public class ExportMetricsCommand implements SubCommand { + + @Override + public String commandName() { + return "exportMetrics"; + } + + @Override + public String commandDesc() { + return "export metrics"; + } + + @Override + public Options buildCommandlineOptions(Options options) { + Option opt = new Option("c", "clusterName", true, "choose a cluster to export"); + opt.setRequired(true); + options.addOption(opt); + + opt = new Option("f", "filePath", true, + "export metrics.json path | default /tmp/rocketmq/export"); + opt.setRequired(false); + options.addOption(opt); + return options; + } + + @Override + public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) + throws SubCommandException { + + DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); + defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); + + try { + String clusterName = commandLine.getOptionValue('c').trim(); + String filePath = !commandLine.hasOption('f') ? "/tmp/rocketmq/export" : commandLine.getOptionValue('f') + .trim(); + + defaultMQAdminExt.start(); + + Map>> evaluateReportMap = new HashMap<>(); + Map totalTpsMap = new HashMap<>(); + Map totalOneDayNumMap = new HashMap<>(); + initTotalMap(totalTpsMap, totalOneDayNumMap); + + ClusterInfo clusterInfoSerializeWrapper = defaultMQAdminExt.examineBrokerClusterInfo(); + Set brokerNameSet = clusterInfoSerializeWrapper.getClusterAddrTable().get(clusterName); + for (String brokerName : brokerNameSet) { + BrokerData brokerData = clusterInfoSerializeWrapper.getBrokerAddrTable().get(brokerName); + if (brokerData != null) { + String addr = brokerData.getBrokerAddrs().get(0L); + + KVTable kvTable = defaultMQAdminExt.fetchBrokerRuntimeStats(addr); + + Properties properties = defaultMQAdminExt.getBrokerConfig(addr); + + SubscriptionGroupWrapper subscriptionGroupWrapper = defaultMQAdminExt.getUserSubscriptionGroup(addr, + 10000); + + Map> brokerInfo = new HashMap<>(); + + //broker environment,machine configuration + brokerInfo.put("runtimeEnv", getRuntimeEnv(kvTable, properties)); + + brokerInfo.put("runtimeQuota", + getRuntimeQuota(kvTable, defaultMQAdminExt, addr, totalTpsMap, + totalOneDayNumMap, subscriptionGroupWrapper)); + + // runtime version + brokerInfo.put("runtimeVersion", + getRuntimeVersion(defaultMQAdminExt, subscriptionGroupWrapper)); + + evaluateReportMap.put(brokerName, brokerInfo); + } + + } + + String path = filePath + "/metrics.json"; + + Map totalData = new HashMap<>(); + totalData.put("totalTps", totalTpsMap); + totalData.put("totalOneDayNum", totalOneDayNumMap); + + Map result = new HashMap<>(); + result.put("evaluateReport", evaluateReportMap); + result.put("totalData", totalData); + + MixAll.string2FileNotSafe(JSON.toJSONString(result, true), path); + System.out.printf("export %s success", path); + } catch (Exception e) { + throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e); + } finally { + defaultMQAdminExt.shutdown(); + } + + } + + private Map getRuntimeVersion(DefaultMQAdminExt defaultMQAdminExt, + SubscriptionGroupWrapper subscriptionGroupWrapper) { + Map runtimeVersionMap = new HashMap(); + + Set clientInfoSet = new HashSet<>(); + for (Map.Entry entry : subscriptionGroupWrapper + .getSubscriptionGroupTable().entrySet()) { + try { + ConsumerConnection cc = defaultMQAdminExt.examineConsumerConnectionInfo( + entry.getValue().getGroupName()); + for (Connection conn : cc.getConnectionSet()) { + String clientInfo = conn.getLanguage() + "%" + MQVersion.getVersionDesc(conn.getVersion()); + clientInfoSet.add(clientInfo); + } + } catch (Exception e) { + continue; + } + } + runtimeVersionMap.put("rocketmqVersion", MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION)); + runtimeVersionMap.put("clientInfo", clientInfoSet); + return runtimeVersionMap; + } + + private Map getRuntimeEnv(KVTable kvTable, Properties properties) { + Map runtimeEnvMap = new HashMap<>(); + runtimeEnvMap.put("cpuNum", properties.getProperty("clientCallbackExecutorThreads")); + runtimeEnvMap.put("totalMemKBytes", kvTable.getTable().get("totalMemKBytes")); + return runtimeEnvMap; + } + + private Map getRuntimeQuota(KVTable kvTable, DefaultMQAdminExt defaultMQAdminExt, String brokerAddr, + Map totalTpsMap, Map totalOneDayNumMap, + SubscriptionGroupWrapper subscriptionGroupWrapper) + throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + TopicConfigSerializeWrapper topicConfigSerializeWrapper = defaultMQAdminExt.getUserTopicConfig( + brokerAddr, false, 10000); + + BrokerStatsData transStatsData = null; + + try { + transStatsData = defaultMQAdminExt.viewBrokerStatsData(brokerAddr, + BrokerStatsManager.TOPIC_PUT_NUMS, + TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC); + } catch (MQClientException e) { + } + + BrokerStatsData scheduleStatsData = null; + try { + scheduleStatsData = defaultMQAdminExt.viewBrokerStatsData(brokerAddr, + BrokerStatsManager.TOPIC_PUT_NUMS, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC); + } catch (MQClientException e) { + } + + Map runtimeQuotaMap = new HashMap<>(); + //disk use ratio + Map diskRatioMap = new HashMap<>(); + diskRatioMap.put("commitLogDiskRatio", kvTable.getTable().get("commitLogDiskRatio")); + diskRatioMap.put("consumeQueueDiskRatio", kvTable.getTable().get("consumeQueueDiskRatio")); + runtimeQuotaMap.put("diskRatio", diskRatioMap); + + //inTps and outTps + Map tpsMap = new HashMap<>(); + double normalInTps = 0; + double normalOutTps = 0; + String putTps = kvTable.getTable().get("putTps"); + String getTransferedTps = kvTable.getTable().get("getTransferedTps"); + String[] inTpss = putTps.split(" "); + if (inTpss.length > 0) { + normalInTps = Double.parseDouble(inTpss[0]); + } + + String[] outTpss = getTransferedTps.split(" "); + if (outTpss.length > 0) { + normalOutTps = Double.parseDouble(outTpss[0]); + } + + double transInTps = null != transStatsData ? transStatsData.getStatsMinute().getTps() : 0.0; + double scheduleInTps = null != scheduleStatsData ? scheduleStatsData.getStatsMinute().getTps() : 0.0; + + long transOneDayInNum = null != transStatsData ? StatsAllSubCommand.compute24HourSum(transStatsData) : 0; + long scheduleOneDayInNum = null != scheduleStatsData ? StatsAllSubCommand.compute24HourSum(scheduleStatsData) : 0; + + //current minute tps + tpsMap.put("normalInTps", normalInTps); + tpsMap.put("normalOutTps", normalOutTps); + tpsMap.put("transInTps", transInTps); + tpsMap.put("scheduleInTps", scheduleInTps); + runtimeQuotaMap.put("tps", tpsMap); + + //one day num + Map oneDayNumMap = new HashMap<>(); + long normalOneDayInNum = Long.parseLong(kvTable.getTable().get("msgPutTotalTodayMorning")) - + Long.parseLong(kvTable.getTable().get("msgPutTotalYesterdayMorning")); + long normalOneDayOutNum = Long.parseLong(kvTable.getTable().get("msgGetTotalTodayMorning")) - + Long.parseLong(kvTable.getTable().get("msgGetTotalYesterdayMorning")); + oneDayNumMap.put("normalOneDayInNum", normalOneDayInNum); + oneDayNumMap.put("normalOneDayOutNum", normalOneDayOutNum); + oneDayNumMap.put("transOneDayInNum", transOneDayInNum); + oneDayNumMap.put("scheduleOneDayInNum", scheduleOneDayInNum); + runtimeQuotaMap.put("oneDayNum", oneDayNumMap); + + //all broker current minute tps + totalTpsMap.put("totalNormalInTps", totalTpsMap.get("totalNormalInTps") + normalInTps); + totalTpsMap.put("totalNormalOutTps", totalTpsMap.get("totalNormalOutTps") + normalOutTps); + totalTpsMap.put("totalTransInTps", totalTpsMap.get("totalTransInTps") + transInTps); + totalTpsMap.put("totalScheduleInTps", totalTpsMap.get("totalScheduleInTps") + scheduleInTps); + + + //all broker one day num + totalOneDayNumMap.put("normalOneDayInNum", totalOneDayNumMap.get("normalOneDayInNum") + normalOneDayInNum); + totalOneDayNumMap.put("normalOneDayOutNum", totalOneDayNumMap.get("normalOneDayOutNum") + normalOneDayOutNum); + totalOneDayNumMap.put("transOneDayInNum", totalOneDayNumMap.get("transOneDayInNum") + transOneDayInNum); + totalOneDayNumMap.put("scheduleOneDayInNum", totalOneDayNumMap.get("scheduleOneDayInNum") + scheduleOneDayInNum); + + // putMessageAverageSize 平均 + runtimeQuotaMap.put("messageAverageSize", kvTable.getTable().get("putMessageAverageSize")); + + //topicSize + runtimeQuotaMap.put("topicSize", topicConfigSerializeWrapper.getTopicConfigTable().size()); + runtimeQuotaMap.put("groupSize", subscriptionGroupWrapper.getSubscriptionGroupTable().size()); + return runtimeQuotaMap; + } + + private void initTotalMap(Map totalTpsMap, Map totalOneDayNumMap) { + totalTpsMap.put("totalNormalInTps", 0.0); + totalTpsMap.put("totalNormalOutTps", 0.0); + totalTpsMap.put("totalTransInTps", 0.0); + totalTpsMap.put("totalScheduleInTps", 0.0); + + totalOneDayNumMap.put("normalOneDayInNum", 0L); + totalOneDayNumMap.put("normalOneDayOutNum", 0L); + totalOneDayNumMap.put("transOneDayInNum", 0L); + totalOneDayNumMap.put("scheduleOneDayInNum", 0L); + } +} -- GitLab