未验证 提交 cd45023d 编写于 作者: P panzhi 提交者: GitHub

[ISSUE #3148]Support metadata export (#3149)

上级 23506fd4
#!/bin/bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
if [ -z "$ROCKETMQ_HOME" ]; then
## resolve links - $0 may be a link to maven's home
PRG="$0"
# need this for relative symlinks
while [ -h "$PRG" ]; do
ls=$(ls -ld "$PRG")
link=$(expr "$ls" : '.*-> \(.*\)$')
if expr "$link" : '/.*' >/dev/null; then
PRG="$link"
else
PRG="$(dirname "$PRG")/$link"
fi
done
saveddir=$(pwd)
ROCKETMQ_HOME=$(dirname "$PRG")/..
# make it fully qualified
ROCKETMQ_HOME=$(cd "$ROCKETMQ_HOME" && pwd)
cd "$saveddir"
fi
export ROCKETMQ_HOME
namesrvAddr=
while [ -z "${namesrvAddr}" ]; do
read -p "Enter name server address list:" namesrvAddr
done
clusterName=
while [ -z "${clusterName}" ]; do
read -p "Choose a cluster to export:" clusterName
done
read -p "Enter file path to export [default /tmp/rocketmq/export]:" filePath
if [ -z "${filePath}" ]; then
filePath="/tmp/rocketmq/config"
fi
if [[ -e ${filePath} ]]; then
rm -rf ${filePath}
fi
sh ${ROCKETMQ_HOME}/bin/mqadmin exportMetrics -c ${clusterName} -n ${namesrvAddr} -f ${filePath}
sh ${ROCKETMQ_HOME}/bin/mqadmin exportConfigs -c ${clusterName} -n ${namesrvAddr} -f ${filePath}
sh ${ROCKETMQ_HOME}/bin/mqadmin exportMetadata -c ${clusterName} -n ${namesrvAddr} -f ${filePath}
cd ${filePath} || exit
configs=$(cat ./configs.json)
if [ -z "$configs" ]; then
configs="{}"
fi
metadata=$(cat ./metadata.json)
if [ -z "$metadata" ]; then
metadata="{}"
fi
metrics=$(cat ./metrics.json)
if [ -z "$metrics" ]; then
metrics="{}"
fi
echo "{
\"configs\": ${configs},
\"metadata\": ${metadata},
\"metrics\": ${metrics}
}" >rocketmq-metadata-export.json
echo -e "[INFO] The RocketMQ metadata has been exported to the file:${filePath}/rocketmq-metadata-export.json"
......@@ -501,10 +501,24 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
}
@Override
public TopicConfigSerializeWrapper getAllTopicGroup(final String brokerAddr,
public SubscriptionGroupWrapper getUserSubscriptionGroup(final String brokerAddr,
long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
RemotingConnectException, MQBrokerException {
return this.defaultMQAdminExtImpl.getAllTopicGroup(brokerAddr, timeoutMillis);
return this.defaultMQAdminExtImpl.getUserSubscriptionGroup(brokerAddr, timeoutMillis);
}
@Override
public TopicConfigSerializeWrapper getAllTopicConfig(final String brokerAddr,
long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
RemotingConnectException, MQBrokerException {
return this.defaultMQAdminExtImpl.getAllTopicConfig(brokerAddr, timeoutMillis);
}
@Override
public TopicConfigSerializeWrapper getUserTopicConfig(final String brokerAddr, final boolean specialTopic,
long timeoutMillis) throws InterruptedException, RemotingException,
MQBrokerException, MQClientException {
return this.defaultMQAdminExtImpl.getUserTopicConfig(brokerAddr, specialTopic, timeoutMillis);
}
/* (non-Javadoc)
......
......@@ -99,6 +99,24 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
private long timeoutMillis = 20000;
private Random random = new Random();
private static final Set<String> SYSTEM_GROUP_SET = new HashSet<String>();
static {
SYSTEM_GROUP_SET.add(MixAll.DEFAULT_CONSUMER_GROUP);
SYSTEM_GROUP_SET.add(MixAll.DEFAULT_PRODUCER_GROUP);
SYSTEM_GROUP_SET.add(MixAll.TOOLS_CONSUMER_GROUP);
SYSTEM_GROUP_SET.add(MixAll.FILTERSRV_CONSUMER_GROUP);
SYSTEM_GROUP_SET.add(MixAll.MONITOR_CONSUMER_GROUP);
SYSTEM_GROUP_SET.add(MixAll.CLIENT_INNER_PRODUCER_GROUP);
SYSTEM_GROUP_SET.add(MixAll.SELF_TEST_PRODUCER_GROUP);
SYSTEM_GROUP_SET.add(MixAll.SELF_TEST_CONSUMER_GROUP);
SYSTEM_GROUP_SET.add(MixAll.ONS_HTTP_PROXY_GROUP);
SYSTEM_GROUP_SET.add(MixAll.CID_ONSAPI_PERMISSION_GROUP);
SYSTEM_GROUP_SET.add(MixAll.CID_ONSAPI_OWNER_GROUP);
SYSTEM_GROUP_SET.add(MixAll.CID_ONSAPI_PULL_GROUP);
SYSTEM_GROUP_SET.add(MixAll.CID_SYS_RMQ_TRANS);
}
public DefaultMQAdminExtImpl(DefaultMQAdminExt defaultMQAdminExt, long timeoutMillis) {
this(defaultMQAdminExt, null, timeoutMillis);
}
......@@ -964,12 +982,49 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
}
@Override
public TopicConfigSerializeWrapper getAllTopicGroup(final String brokerAddr,
public SubscriptionGroupWrapper getUserSubscriptionGroup(final String brokerAddr,
long timeoutMillis) throws InterruptedException,
RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
SubscriptionGroupWrapper subscriptionGroupWrapper = this.mqClientInstance.getMQClientAPIImpl()
.getAllSubscriptionGroup(brokerAddr, timeoutMillis);
Iterator<Entry<String, SubscriptionGroupConfig>> iterator = subscriptionGroupWrapper.getSubscriptionGroupTable()
.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, SubscriptionGroupConfig> configEntry = iterator.next();
if (MixAll.isSysConsumerGroup(configEntry.getKey()) || SYSTEM_GROUP_SET.contains(configEntry.getKey())) {
iterator.remove();
}
}
return subscriptionGroupWrapper;
}
@Override
public TopicConfigSerializeWrapper getAllTopicConfig(final String brokerAddr,
long timeoutMillis) throws InterruptedException,
RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
return this.mqClientInstance.getMQClientAPIImpl().getAllTopicConfig(brokerAddr, timeoutMillis);
}
@Override
public TopicConfigSerializeWrapper getUserTopicConfig(final String brokerAddr, final boolean specialTopic,
long timeoutMillis) throws InterruptedException, RemotingException,
MQBrokerException, MQClientException {
TopicConfigSerializeWrapper topicConfigSerializeWrapper = this.getAllTopicConfig(brokerAddr, timeoutMillis);
TopicList topicList = this.mqClientInstance.getMQClientAPIImpl().getSystemTopicListFromBroker(brokerAddr,
timeoutMillis);
Iterator<Entry<String, TopicConfig>> iterator = topicConfigSerializeWrapper.getTopicConfigTable().entrySet()
.iterator();
while (iterator.hasNext()) {
String topic = iterator.next().getKey();
if (topicList.getTopicList().contains(topic) || (!specialTopic && (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) || topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX)))) {
iterator.remove();
}
}
return topicConfigSerializeWrapper;
}
@Override
public void createTopic(String key, String newTopic, int queueNum) throws MQClientException {
createTopic(key, newTopic, queueNum, 0);
......
......@@ -238,10 +238,18 @@ public interface MQAdminExt extends MQAdmin {
long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
RemotingConnectException, MQBrokerException;
TopicConfigSerializeWrapper getAllTopicGroup(final String brokerAddr,
SubscriptionGroupWrapper getUserSubscriptionGroup(final String brokerAddr,
long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
RemotingConnectException, MQBrokerException;
TopicConfigSerializeWrapper getAllTopicConfig(final String brokerAddr,
long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
RemotingConnectException, MQBrokerException;
TopicConfigSerializeWrapper getUserTopicConfig(final String brokerAddr, final boolean specialTopic,
long timeoutMillis) throws InterruptedException, RemotingException,
MQBrokerException, MQClientException;
void updateConsumeOffset(String brokerAddr, String consumeGroup, MessageQueue mq,
long offset) throws RemotingException, InterruptedException, MQBrokerException;
......
......@@ -146,5 +146,4 @@ public class CommandUtil {
}
throw new Exception(ERROR_MESSAGE);
}
}
......@@ -52,6 +52,8 @@ import org.apache.rocketmq.tools.command.consumer.DeleteSubscriptionGroupCommand
import org.apache.rocketmq.tools.command.consumer.GetConsumerConfigSubCommand;
import org.apache.rocketmq.tools.command.consumer.StartMonitoringSubCommand;
import org.apache.rocketmq.tools.command.consumer.UpdateSubGroupSubCommand;
import org.apache.rocketmq.tools.command.export.ExportMetricsCommand;
import org.apache.rocketmq.tools.command.export.ExportConfigsCommand;
import org.apache.rocketmq.tools.command.message.CheckMsgSendRTCommand;
import org.apache.rocketmq.tools.command.message.ConsumeMessageCommand;
import org.apache.rocketmq.tools.command.message.PrintMessageByQueueCommand;
......@@ -74,6 +76,7 @@ import org.apache.rocketmq.tools.command.queue.QueryConsumeQueueCommand;
import org.apache.rocketmq.tools.command.stats.StatsAllSubCommand;
import org.apache.rocketmq.tools.command.topic.AllocateMQSubCommand;
import org.apache.rocketmq.tools.command.topic.DeleteTopicSubCommand;
import org.apache.rocketmq.tools.command.export.ExportMetadataCommand;
import org.apache.rocketmq.tools.command.topic.TopicClusterSubCommand;
import org.apache.rocketmq.tools.command.topic.TopicListSubCommand;
import org.apache.rocketmq.tools.command.topic.TopicRouteSubCommand;
......@@ -217,6 +220,10 @@ public class MQAdminStartup {
initCommand(new ClusterAclConfigVersionListSubCommand());
initCommand(new UpdateGlobalWhiteAddrSubCommand());
initCommand(new GetAccessConfigSubCommand());
initCommand(new ExportMetadataCommand());
initCommand(new ExportConfigsCommand());
initCommand(new ExportMetricsCommand());
}
private static void initLogback() throws JoranException {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.tools.command.export;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import com.alibaba.fastjson.JSON;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.CommandUtil;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
public class ExportConfigsCommand implements SubCommand {
@Override
public String commandName() {
return "exportConfigs";
}
@Override
public String commandDesc() {
return "export configs";
}
@Override
public Options buildCommandlineOptions(Options options) {
Option opt = new Option("c", "clusterName", true, "choose a cluster to export");
opt.setRequired(true);
options.addOption(opt);
opt = new Option("f", "filePath", true,
"export configs.json path | default /tmp/rocketmq/export");
opt.setRequired(false);
options.addOption(opt);
return options;
}
@Override
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook)
throws SubCommandException {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
try {
String clusterName = commandLine.getOptionValue('c').trim();
String filePath = !commandLine.hasOption('f') ? "/tmp/rocketmq/export" : commandLine.getOptionValue('f')
.trim();
defaultMQAdminExt.start();
Map<String, Object> result = new HashMap<>();
// name servers
List<String> nameServerAddressList = defaultMQAdminExt.getNameServerAddressList();
//broker
int masterBrokerSize = 0;
int slaveBrokerSize = 0;
Map<String, Properties> brokerConfigs = new HashMap<>();
Map<String, List<String>> masterAndSlaveMap
= CommandUtil.fetchMasterAndSlaveDistinguish(defaultMQAdminExt, clusterName);
for (String masterAddr : masterAndSlaveMap.keySet()) {
Properties masterProperties = defaultMQAdminExt.getBrokerConfig(masterAddr);
masterBrokerSize++;
slaveBrokerSize += masterAndSlaveMap.get(masterAddr).size();
brokerConfigs.put(masterProperties.getProperty("brokerName"), needBrokerProprties(masterProperties));
}
Map<String, Integer> clusterScaleMap = new HashMap<>();
clusterScaleMap.put("namesrvSize", nameServerAddressList.size());
clusterScaleMap.put("masterBrokerSize", masterBrokerSize);
clusterScaleMap.put("slaveBrokerSize", slaveBrokerSize);
result.put("brokerConfigs", brokerConfigs);
result.put("clusterScale", clusterScaleMap);
String path = filePath + "/configs.json";
MixAll.string2FileNotSafe(JSON.toJSONString(result, true), path);
System.out.printf("export %s success", path);
} catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
defaultMQAdminExt.shutdown();
}
}
private Properties needBrokerProprties(Properties properties) {
Properties newProperties = new Properties();
newProperties.setProperty("brokerClusterName", properties.getProperty("brokerClusterName"));
newProperties.setProperty("brokerId", properties.getProperty("brokerId"));
newProperties.setProperty("brokerName", properties.getProperty("brokerName"));
newProperties.setProperty("brokerRole", properties.getProperty("brokerRole"));
newProperties.setProperty("fileReservedTime", properties.getProperty("fileReservedTime"));
newProperties.setProperty("filterServerNums", properties.getProperty("filterServerNums"));
newProperties.setProperty("flushDiskType", properties.getProperty("flushDiskType"));
newProperties.setProperty("maxMessageSize", properties.getProperty("maxMessageSize"));
newProperties.setProperty("messageDelayLevel", properties.getProperty("messageDelayLevel"));
newProperties.setProperty("msgTraceTopicName", properties.getProperty("msgTraceTopicName"));
newProperties.setProperty("slaveReadEnable", properties.getProperty("slaveReadEnable"));
newProperties.setProperty("traceOn", properties.getProperty("traceOn"));
newProperties.setProperty("traceTopicEnable", properties.getProperty("traceTopicEnable"));
newProperties.setProperty("useTLS", properties.getProperty("useTLS"));
newProperties.setProperty("autoCreateTopicEnable", properties.getProperty("autoCreateTopicEnable"));
newProperties.setProperty("autoCreateSubscriptionGroup", properties.getProperty("autoCreateSubscriptionGroup"));
return newProperties;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.tools.command.export;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import com.alibaba.fastjson.JSON;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.CommandUtil;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
public class ExportMetadataCommand implements SubCommand {
private static final String DEFAULT_FILE_PATH = "/tmp/rocketmq/export";
@Override
public String commandName() {
return "exportMetadata";
}
@Override
public String commandDesc() {
return "export metadata";
}
@Override
public Options buildCommandlineOptions(Options options) {
Option opt = new Option("c", "clusterName", true, "choose a cluster to export");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("b", "brokerAddr", true, "choose a broker to export");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("f", "filePath", true, "export metadata.json path | default /tmp/rocketmq/export");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("t", "topic", false, "only export topic metadata");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("g", "subscriptionGroup", false, "only export subscriptionGroup metadata");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("s", "specialTopic", false, "need retryTopic and dlqTopic");
opt.setRequired(false);
options.addOption(opt);
return options;
}
@Override
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook)
throws SubCommandException {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
try {
defaultMQAdminExt.start();
String filePath = !commandLine.hasOption('f') ? DEFAULT_FILE_PATH : commandLine.getOptionValue('f')
.trim();
boolean specialTopic = commandLine.hasOption('s');
if (commandLine.hasOption('b')) {
final String brokerAddr = commandLine.getOptionValue('b').trim();
if (commandLine.hasOption('t')) {
filePath = filePath + "/topic.json";
TopicConfigSerializeWrapper topicConfigSerializeWrapper = defaultMQAdminExt.getUserTopicConfig(
brokerAddr, specialTopic, 10000L);
MixAll.string2FileNotSafe(JSON.toJSONString(topicConfigSerializeWrapper, true), filePath);
System.out.printf("export %s success", filePath);
} else if (commandLine.hasOption('g')) {
filePath = filePath + "/subscriptionGroup.json";
SubscriptionGroupWrapper subscriptionGroupWrapper = defaultMQAdminExt.getUserSubscriptionGroup(
brokerAddr, 10000L);
MixAll.string2FileNotSafe(JSON.toJSONString(subscriptionGroupWrapper, true), filePath);
System.out.printf("export %s success", filePath);
}
} else if (commandLine.hasOption('c')) {
String clusterName = commandLine.getOptionValue('c').trim();
Set<String> masterSet =
CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
Map<String, TopicConfig> topicConfigMap = new HashMap<>();
Map<String, SubscriptionGroupConfig> subGroupConfigMap = new HashMap<>();
for (String addr : masterSet) {
TopicConfigSerializeWrapper topicConfigSerializeWrapper = defaultMQAdminExt.getUserTopicConfig(
addr, specialTopic, 10000L);
SubscriptionGroupWrapper subscriptionGroupWrapper = defaultMQAdminExt.getUserSubscriptionGroup(
addr, 10000);
if (commandLine.hasOption('t')) {
filePath = filePath + "/topic.json";
MixAll.string2FileNotSafe(JSON.toJSONString(topicConfigSerializeWrapper, true), filePath);
System.out.printf("export %s success", filePath);
return;
} else if (commandLine.hasOption('g')) {
filePath = filePath + "/subscriptionGroup.json";
MixAll.string2FileNotSafe(JSON.toJSONString(subscriptionGroupWrapper, true), filePath);
System.out.printf("export %s success", filePath);
return;
} else {
for (Map.Entry<String, TopicConfig> entry : topicConfigSerializeWrapper.getTopicConfigTable().entrySet()) {
TopicConfig topicConfig = topicConfigMap.get(entry.getKey());
if (null != topicConfig) {
entry.getValue().setWriteQueueNums(
topicConfig.getWriteQueueNums() + entry.getValue().getWriteQueueNums());
entry.getValue().setReadQueueNums(
topicConfig.getReadQueueNums() + entry.getValue().getReadQueueNums());
}
topicConfigMap.put(entry.getKey(), entry.getValue());
}
for (Map.Entry<String, SubscriptionGroupConfig> entry : subscriptionGroupWrapper.getSubscriptionGroupTable().entrySet()) {
SubscriptionGroupConfig subscriptionGroupConfig = subGroupConfigMap.get(entry.getKey());
if (null != subscriptionGroupConfig) {
entry.getValue().setRetryQueueNums(
subscriptionGroupConfig.getRetryQueueNums() + entry.getValue().getRetryQueueNums());
}
subGroupConfigMap.put(entry.getKey(), entry.getValue());
}
Map<String, Object> result = new HashMap<>();
result.put("topicConfigTable", topicConfigMap);
result.put("subscriptionGroupTable", subGroupConfigMap);
result.put("rocketmqVersion", MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION));
result.put("exportTime", System.currentTimeMillis());
filePath = filePath + "/metadata.json";
MixAll.string2FileNotSafe(JSON.toJSONString(result, true), filePath);
System.out.printf("export %s success", filePath);
}
}
} else {
ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
}
} catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
defaultMQAdminExt.shutdown();
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.tools.command.export;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import com.alibaba.fastjson.JSON;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.protocol.body.BrokerStatsData;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.body.Connection;
import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
import org.apache.rocketmq.common.protocol.body.KVTable;
import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
import org.apache.rocketmq.tools.command.stats.StatsAllSubCommand;
public class ExportMetricsCommand implements SubCommand {
@Override
public String commandName() {
return "exportMetrics";
}
@Override
public String commandDesc() {
return "export metrics";
}
@Override
public Options buildCommandlineOptions(Options options) {
Option opt = new Option("c", "clusterName", true, "choose a cluster to export");
opt.setRequired(true);
options.addOption(opt);
opt = new Option("f", "filePath", true,
"export metrics.json path | default /tmp/rocketmq/export");
opt.setRequired(false);
options.addOption(opt);
return options;
}
@Override
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook)
throws SubCommandException {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
try {
String clusterName = commandLine.getOptionValue('c').trim();
String filePath = !commandLine.hasOption('f') ? "/tmp/rocketmq/export" : commandLine.getOptionValue('f')
.trim();
defaultMQAdminExt.start();
Map<String, Map<String, Map<String, Object>>> evaluateReportMap = new HashMap<>();
Map<String, Double> totalTpsMap = new HashMap<>();
Map<String, Long> totalOneDayNumMap = new HashMap<>();
initTotalMap(totalTpsMap, totalOneDayNumMap);
ClusterInfo clusterInfoSerializeWrapper = defaultMQAdminExt.examineBrokerClusterInfo();
Set<String> brokerNameSet = clusterInfoSerializeWrapper.getClusterAddrTable().get(clusterName);
for (String brokerName : brokerNameSet) {
BrokerData brokerData = clusterInfoSerializeWrapper.getBrokerAddrTable().get(brokerName);
if (brokerData != null) {
String addr = brokerData.getBrokerAddrs().get(0L);
KVTable kvTable = defaultMQAdminExt.fetchBrokerRuntimeStats(addr);
Properties properties = defaultMQAdminExt.getBrokerConfig(addr);
SubscriptionGroupWrapper subscriptionGroupWrapper = defaultMQAdminExt.getUserSubscriptionGroup(addr,
10000);
Map<String, Map<String, Object>> brokerInfo = new HashMap<>();
//broker environment,machine configuration
brokerInfo.put("runtimeEnv", getRuntimeEnv(kvTable, properties));
brokerInfo.put("runtimeQuota",
getRuntimeQuota(kvTable, defaultMQAdminExt, addr, totalTpsMap,
totalOneDayNumMap, subscriptionGroupWrapper));
// runtime version
brokerInfo.put("runtimeVersion",
getRuntimeVersion(defaultMQAdminExt, subscriptionGroupWrapper));
evaluateReportMap.put(brokerName, brokerInfo);
}
}
String path = filePath + "/metrics.json";
Map<String, Object> totalData = new HashMap<>();
totalData.put("totalTps", totalTpsMap);
totalData.put("totalOneDayNum", totalOneDayNumMap);
Map<String, Object> result = new HashMap<>();
result.put("evaluateReport", evaluateReportMap);
result.put("totalData", totalData);
MixAll.string2FileNotSafe(JSON.toJSONString(result, true), path);
System.out.printf("export %s success", path);
} catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
defaultMQAdminExt.shutdown();
}
}
private Map<String, Object> getRuntimeVersion(DefaultMQAdminExt defaultMQAdminExt,
SubscriptionGroupWrapper subscriptionGroupWrapper) {
Map<String, Object> runtimeVersionMap = new HashMap();
Set<String> clientInfoSet = new HashSet<>();
for (Map.Entry<String, SubscriptionGroupConfig> entry : subscriptionGroupWrapper
.getSubscriptionGroupTable().entrySet()) {
try {
ConsumerConnection cc = defaultMQAdminExt.examineConsumerConnectionInfo(
entry.getValue().getGroupName());
for (Connection conn : cc.getConnectionSet()) {
String clientInfo = conn.getLanguage() + "%" + MQVersion.getVersionDesc(conn.getVersion());
clientInfoSet.add(clientInfo);
}
} catch (Exception e) {
continue;
}
}
runtimeVersionMap.put("rocketmqVersion", MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION));
runtimeVersionMap.put("clientInfo", clientInfoSet);
return runtimeVersionMap;
}
private Map<String, Object> getRuntimeEnv(KVTable kvTable, Properties properties) {
Map<String, Object> runtimeEnvMap = new HashMap<>();
runtimeEnvMap.put("cpuNum", properties.getProperty("clientCallbackExecutorThreads"));
runtimeEnvMap.put("totalMemKBytes", kvTable.getTable().get("totalMemKBytes"));
return runtimeEnvMap;
}
private Map<String, Object> getRuntimeQuota(KVTable kvTable, DefaultMQAdminExt defaultMQAdminExt, String brokerAddr,
Map<String, Double> totalTpsMap, Map<String, Long> totalOneDayNumMap,
SubscriptionGroupWrapper subscriptionGroupWrapper)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
TopicConfigSerializeWrapper topicConfigSerializeWrapper = defaultMQAdminExt.getUserTopicConfig(
brokerAddr, false, 10000);
BrokerStatsData transStatsData = null;
try {
transStatsData = defaultMQAdminExt.viewBrokerStatsData(brokerAddr,
BrokerStatsManager.TOPIC_PUT_NUMS,
TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC);
} catch (MQClientException e) {
}
BrokerStatsData scheduleStatsData = null;
try {
scheduleStatsData = defaultMQAdminExt.viewBrokerStatsData(brokerAddr,
BrokerStatsManager.TOPIC_PUT_NUMS, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC);
} catch (MQClientException e) {
}
Map<String, Object> runtimeQuotaMap = new HashMap<>();
//disk use ratio
Map<String, Object> diskRatioMap = new HashMap<>();
diskRatioMap.put("commitLogDiskRatio", kvTable.getTable().get("commitLogDiskRatio"));
diskRatioMap.put("consumeQueueDiskRatio", kvTable.getTable().get("consumeQueueDiskRatio"));
runtimeQuotaMap.put("diskRatio", diskRatioMap);
//inTps and outTps
Map<String, Double> tpsMap = new HashMap<>();
double normalInTps = 0;
double normalOutTps = 0;
String putTps = kvTable.getTable().get("putTps");
String getTransferedTps = kvTable.getTable().get("getTransferedTps");
String[] inTpss = putTps.split(" ");
if (inTpss.length > 0) {
normalInTps = Double.parseDouble(inTpss[0]);
}
String[] outTpss = getTransferedTps.split(" ");
if (outTpss.length > 0) {
normalOutTps = Double.parseDouble(outTpss[0]);
}
double transInTps = null != transStatsData ? transStatsData.getStatsMinute().getTps() : 0.0;
double scheduleInTps = null != scheduleStatsData ? scheduleStatsData.getStatsMinute().getTps() : 0.0;
long transOneDayInNum = null != transStatsData ? StatsAllSubCommand.compute24HourSum(transStatsData) : 0;
long scheduleOneDayInNum = null != scheduleStatsData ? StatsAllSubCommand.compute24HourSum(scheduleStatsData) : 0;
//current minute tps
tpsMap.put("normalInTps", normalInTps);
tpsMap.put("normalOutTps", normalOutTps);
tpsMap.put("transInTps", transInTps);
tpsMap.put("scheduleInTps", scheduleInTps);
runtimeQuotaMap.put("tps", tpsMap);
//one day num
Map<String, Long> oneDayNumMap = new HashMap<>();
long normalOneDayInNum = Long.parseLong(kvTable.getTable().get("msgPutTotalTodayMorning")) -
Long.parseLong(kvTable.getTable().get("msgPutTotalYesterdayMorning"));
long normalOneDayOutNum = Long.parseLong(kvTable.getTable().get("msgGetTotalTodayMorning")) -
Long.parseLong(kvTable.getTable().get("msgGetTotalYesterdayMorning"));
oneDayNumMap.put("normalOneDayInNum", normalOneDayInNum);
oneDayNumMap.put("normalOneDayOutNum", normalOneDayOutNum);
oneDayNumMap.put("transOneDayInNum", transOneDayInNum);
oneDayNumMap.put("scheduleOneDayInNum", scheduleOneDayInNum);
runtimeQuotaMap.put("oneDayNum", oneDayNumMap);
//all broker current minute tps
totalTpsMap.put("totalNormalInTps", totalTpsMap.get("totalNormalInTps") + normalInTps);
totalTpsMap.put("totalNormalOutTps", totalTpsMap.get("totalNormalOutTps") + normalOutTps);
totalTpsMap.put("totalTransInTps", totalTpsMap.get("totalTransInTps") + transInTps);
totalTpsMap.put("totalScheduleInTps", totalTpsMap.get("totalScheduleInTps") + scheduleInTps);
//all broker one day num
totalOneDayNumMap.put("normalOneDayInNum", totalOneDayNumMap.get("normalOneDayInNum") + normalOneDayInNum);
totalOneDayNumMap.put("normalOneDayOutNum", totalOneDayNumMap.get("normalOneDayOutNum") + normalOneDayOutNum);
totalOneDayNumMap.put("transOneDayInNum", totalOneDayNumMap.get("transOneDayInNum") + transOneDayInNum);
totalOneDayNumMap.put("scheduleOneDayInNum", totalOneDayNumMap.get("scheduleOneDayInNum") + scheduleOneDayInNum);
// putMessageAverageSize 平均
runtimeQuotaMap.put("messageAverageSize", kvTable.getTable().get("putMessageAverageSize"));
//topicSize
runtimeQuotaMap.put("topicSize", topicConfigSerializeWrapper.getTopicConfigTable().size());
runtimeQuotaMap.put("groupSize", subscriptionGroupWrapper.getSubscriptionGroupTable().size());
return runtimeQuotaMap;
}
private void initTotalMap(Map<String, Double> totalTpsMap, Map<String, Long> totalOneDayNumMap) {
totalTpsMap.put("totalNormalInTps", 0.0);
totalTpsMap.put("totalNormalOutTps", 0.0);
totalTpsMap.put("totalTransInTps", 0.0);
totalTpsMap.put("totalScheduleInTps", 0.0);
totalOneDayNumMap.put("normalOneDayInNum", 0L);
totalOneDayNumMap.put("normalOneDayOutNum", 0L);
totalOneDayNumMap.put("transOneDayInNum", 0L);
totalOneDayNumMap.put("scheduleOneDayInNum", 0L);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册