提交 39409245 编写于 作者: S shroman 提交者: yukon

[ROCKETMQ-104] Make MQAdmin commands throw exceptions, closes apache/incubator-rocketmq#65

上级 d545f86f
......@@ -21,11 +21,11 @@ import org.apache.commons.cli.Options;
import org.apache.rocketmq.remoting.RPCHook;
public interface SubCommand {
public String commandName();
String commandName();
public String commandDesc();
String commandDesc();
public Options buildCommandlineOptions(final Options options);
Options buildCommandlineOptions(final Options options);
public void execute(final CommandLine commandLine, final Options options, RPCHook rpcHook);
void execute(final CommandLine commandLine, final Options options, RPCHook rpcHook) throws SubCommandException;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.tools.command;
public class SubCommandException extends Exception {
private static final long serialVersionUID = 0L;
/**
* @param msg Message.
*/
public SubCommandException(String msg) {
super(msg);
}
/**
* @param msg Message.
* @param cause Cause.
*/
public SubCommandException(String msg, Throwable cause) {
super(msg, cause);
}
}
......@@ -32,6 +32,7 @@ import org.apache.rocketmq.common.protocol.body.ConsumeStatsList;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
public class BrokerConsumeStatsSubCommad implements SubCommand {
......@@ -67,7 +68,7 @@ public class BrokerConsumeStatsSubCommad implements SubCommand {
}
@Override
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
try {
......@@ -134,7 +135,7 @@ public class BrokerConsumeStatsSubCommad implements SubCommand {
}
System.out.printf("%nDiff Total: %d%n", consumeStatsList.getTotalDiff());
} catch (Exception e) {
e.printStackTrace();
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
defaultMQAdminExt.shutdown();
}
......
......@@ -32,6 +32,7 @@ import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.CommandUtil;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
public class BrokerStatusSubCommand implements SubCommand {
......@@ -59,7 +60,7 @@ public class BrokerStatusSubCommand implements SubCommand {
}
@Override
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
......@@ -84,7 +85,7 @@ public class BrokerStatusSubCommand implements SubCommand {
}
} catch (Exception e) {
e.printStackTrace();
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
defaultMQAdminExt.shutdown();
}
......
......@@ -23,6 +23,7 @@ import org.apache.commons.cli.Options;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
public class CleanExpiredCQSubCommand implements SubCommand {
......@@ -50,7 +51,7 @@ public class CleanExpiredCQSubCommand implements SubCommand {
}
@Override
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
......@@ -69,7 +70,7 @@ public class CleanExpiredCQSubCommand implements SubCommand {
}
System.out.printf(result ? "success" : "false");
} catch (Exception e) {
e.printStackTrace();
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
defaultMQAdminExt.shutdown();
}
......
......@@ -23,6 +23,7 @@ import org.apache.commons.cli.Options;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
public class CleanUnusedTopicCommand implements SubCommand {
......@@ -50,7 +51,7 @@ public class CleanUnusedTopicCommand implements SubCommand {
}
@Override
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
......@@ -69,7 +70,7 @@ public class CleanUnusedTopicCommand implements SubCommand {
}
System.out.printf(result ? "success" : "false");
} catch (Exception e) {
e.printStackTrace();
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
defaultMQAdminExt.shutdown();
}
......
......@@ -33,6 +33,7 @@ import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.admin.MQAdminExt;
import org.apache.rocketmq.tools.command.CommandUtil;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
public class GetBrokerConfigCommand implements SubCommand {
@Override
......@@ -59,7 +60,7 @@ public class GetBrokerConfigCommand implements SubCommand {
}
@Override
public void execute(final CommandLine commandLine, final Options options, final RPCHook rpcHook) {
public void execute(final CommandLine commandLine, final Options options, final RPCHook rpcHook) throws SubCommandException {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
......@@ -100,7 +101,7 @@ public class GetBrokerConfigCommand implements SubCommand {
}
} catch (Exception e) {
e.printStackTrace();
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
defaultMQAdminExt.shutdown();
}
......
......@@ -26,6 +26,7 @@ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
public class SendMsgStatusCommand implements SubCommand {
......@@ -69,7 +70,7 @@ public class SendMsgStatusCommand implements SubCommand {
}
@Override
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
final DefaultMQProducer producer = new DefaultMQProducer("PID_SMSC", rpcHook);
producer.setInstanceName("PID_SMSC_" + System.currentTimeMillis());
......@@ -87,7 +88,7 @@ public class SendMsgStatusCommand implements SubCommand {
System.out.printf("rt:" + (System.currentTimeMillis() - begin) + "ms, SendResult=" + result);
}
} catch (Exception e) {
e.printStackTrace();
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
producer.shutdown();
}
......
......@@ -26,6 +26,7 @@ import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.CommandUtil;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
public class UpdateBrokerConfigSubCommand implements SubCommand {
......@@ -61,7 +62,7 @@ public class UpdateBrokerConfigSubCommand implements SubCommand {
}
@Override
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
......@@ -101,7 +102,7 @@ public class UpdateBrokerConfigSubCommand implements SubCommand {
ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
} catch (Exception e) {
e.printStackTrace();
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
defaultMQAdminExt.shutdown();
}
......
......@@ -34,6 +34,7 @@ import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
public class CLusterSendMsgRTCommand implements SubCommand {
......@@ -80,7 +81,7 @@ public class CLusterSendMsgRTCommand implements SubCommand {
}
@Override
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
......@@ -183,7 +184,7 @@ public class CLusterSendMsgRTCommand implements SubCommand {
}
} catch (Exception e) {
e.printStackTrace();
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
defaultMQAdminExt.shutdown();
producer.shutdown();
......
......@@ -33,6 +33,7 @@ import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
public class ClusterListSubCommand implements SubCommand {
......@@ -60,7 +61,7 @@ public class ClusterListSubCommand implements SubCommand {
}
@Override
public void execute(final CommandLine commandLine, final Options options, RPCHook rpcHook) {
public void execute(final CommandLine commandLine, final Options options, RPCHook rpcHook) throws SubCommandException {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
......@@ -88,7 +89,7 @@ public class ClusterListSubCommand implements SubCommand {
}
while (enableInterval);
} catch (Exception e) {
e.printStackTrace();
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
defaultMQAdminExt.shutdown();
}
......
......@@ -28,6 +28,7 @@ import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
public class ConsumerConnectionSubCommand implements SubCommand {
......@@ -51,7 +52,7 @@ public class ConsumerConnectionSubCommand implements SubCommand {
}
@Override
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
......@@ -92,7 +93,7 @@ public class ConsumerConnectionSubCommand implements SubCommand {
System.out.printf("MessageModel: %s%n", cc.getMessageModel());
System.out.printf("ConsumeFromWhere: %s%n", cc.getConsumeFromWhere());
} catch (Exception e) {
e.printStackTrace();
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
defaultMQAdminExt.shutdown();
}
......
......@@ -25,6 +25,7 @@ import org.apache.rocketmq.common.protocol.body.ProducerConnection;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
public class ProducerConnectionSubCommand implements SubCommand {
......@@ -52,7 +53,7 @@ public class ProducerConnectionSubCommand implements SubCommand {
}
@Override
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
......@@ -76,7 +77,7 @@ public class ProducerConnectionSubCommand implements SubCommand {
);
}
} catch (Exception e) {
e.printStackTrace();
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
defaultMQAdminExt.shutdown();
}
......
......@@ -37,6 +37,7 @@ import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
import org.slf4j.Logger;
public class ConsumerProgressSubCommand implements SubCommand {
......@@ -62,7 +63,7 @@ public class ConsumerProgressSubCommand implements SubCommand {
}
@Override
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
......@@ -168,7 +169,7 @@ public class ConsumerProgressSubCommand implements SubCommand {
}
}
} catch (Exception e) {
e.printStackTrace();
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
defaultMQAdminExt.shutdown();
}
......
......@@ -31,6 +31,7 @@ import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.MQAdminStartup;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
public class ConsumerStatusSubCommand implements SubCommand {
......@@ -67,7 +68,7 @@ public class ConsumerStatusSubCommand implements SubCommand {
}
@Override
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
......@@ -130,7 +131,7 @@ public class ConsumerStatusSubCommand implements SubCommand {
}
}
} catch (Exception e) {
e.printStackTrace();
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
defaultMQAdminExt.shutdown();
}
......
......@@ -31,6 +31,7 @@ import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.MQAdminStartup;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
public class ConsumerSubCommand implements SubCommand {
......@@ -63,7 +64,7 @@ public class ConsumerSubCommand implements SubCommand {
}
@Override
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
......@@ -129,7 +130,7 @@ public class ConsumerSubCommand implements SubCommand {
}
}
} catch (Exception e) {
e.printStackTrace();
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
defaultMQAdminExt.shutdown();
}
......
......@@ -26,6 +26,7 @@ import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.CommandUtil;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
import org.apache.rocketmq.tools.command.topic.DeleteTopicSubCommand;
public class DeleteSubscriptionGroupCommand implements SubCommand {
......@@ -57,7 +58,7 @@ public class DeleteSubscriptionGroupCommand implements SubCommand {
}
@Override
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
DefaultMQAdminExt adminExt = new DefaultMQAdminExt(rpcHook);
adminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
try {
......@@ -98,7 +99,7 @@ public class DeleteSubscriptionGroupCommand implements SubCommand {
ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
} catch (Exception e) {
e.printStackTrace();
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
adminExt.shutdown();
}
......
......@@ -21,6 +21,7 @@ import org.apache.commons.cli.Options;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
import org.apache.rocketmq.tools.monitor.DefaultMonitorListener;
import org.apache.rocketmq.tools.monitor.MonitorConfig;
import org.apache.rocketmq.tools.monitor.MonitorService;
......@@ -45,14 +46,14 @@ public class StartMonitoringSubCommand implements SubCommand {
}
@Override
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
try {
MonitorService monitorService =
new MonitorService(new MonitorConfig(), new DefaultMonitorListener(), rpcHook);
monitorService.start();
} catch (Exception e) {
e.printStackTrace();
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
}
}
}
......@@ -26,6 +26,7 @@ import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.CommandUtil;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
public class UpdateSubGroupSubCommand implements SubCommand {
......@@ -89,7 +90,7 @@ public class UpdateSubGroupSubCommand implements SubCommand {
}
@Override
public void execute(final CommandLine commandLine, final Options options, RPCHook rpcHook) {
public void execute(final CommandLine commandLine, final Options options, RPCHook rpcHook) throws SubCommandException {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
......@@ -180,7 +181,7 @@ public class UpdateSubGroupSubCommand implements SubCommand {
ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
} catch (Exception e) {
e.printStackTrace();
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
defaultMQAdminExt.shutdown();
}
......
......@@ -27,6 +27,7 @@ import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
public class CheckMsgSendRTCommand implements SubCommand {
private static String brokerName = "";
......@@ -59,7 +60,7 @@ public class CheckMsgSendRTCommand implements SubCommand {
}
@Override
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
DefaultMQProducer producer = new DefaultMQProducer(rpcHook);
producer.setProducerGroup(Long.toString(System.currentTimeMillis()));
......@@ -117,7 +118,7 @@ public class CheckMsgSendRTCommand implements SubCommand {
double rt = (double) timeElapsed / (amount - 1);
System.out.printf("Avg RT: %s%n", String.format("%.2f", rt));
} catch (Exception e) {
e.printStackTrace();
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
producer.shutdown();
}
......
......@@ -23,6 +23,7 @@ import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.MessageClientIDSetter;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
public class DecodeMessageIdCommond implements SubCommand {
@Override
......@@ -44,7 +45,7 @@ public class DecodeMessageIdCommond implements SubCommand {
}
@Override
public void execute(final CommandLine commandLine, final Options options, RPCHook rpcHook) {
public void execute(final CommandLine commandLine, final Options options, RPCHook rpcHook) throws SubCommandException {
String messageId = commandLine.getOptionValue('i').trim();
try {
......@@ -57,7 +58,7 @@ public class DecodeMessageIdCommond implements SubCommand {
String date = UtilAll.formatDate(MessageClientIDSetter.getNearlyTimeFromID(messageId), UtilAll.YYYY_MM_DD_HH_MM_SS_SSS);
System.out.printf("date=" + date);
} catch (Exception e) {
e.printStackTrace();
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
}
}
}
......@@ -36,6 +36,7 @@ import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
public class PrintMessageByQueueCommand implements SubCommand {
......@@ -154,7 +155,7 @@ public class PrintMessageByQueueCommand implements SubCommand {
}
@Override
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP, rpcHook);
try {
......@@ -214,7 +215,7 @@ public class PrintMessageByQueueCommand implements SubCommand {
printCalculateByTag(tagCalmap, calByTag);
} catch (Exception e) {
e.printStackTrace();
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
consumer.shutdown();
}
......
......@@ -30,6 +30,7 @@ import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
public class PrintMessageSubCommand implements SubCommand {
......@@ -101,7 +102,7 @@ public class PrintMessageSubCommand implements SubCommand {
}
@Override
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP, rpcHook);
try {
......@@ -161,7 +162,7 @@ public class PrintMessageSubCommand implements SubCommand {
}
} catch (Exception e) {
e.printStackTrace();
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
consumer.shutdown();
}
......
......@@ -39,6 +39,7 @@ import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.admin.api.MessageTrack;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
public class QueryMsgByIdSubCommand implements SubCommand {
public static void queryById(final DefaultMQAdminExt admin, final String msgId) throws MQClientException,
......@@ -209,7 +210,7 @@ public class QueryMsgByIdSubCommand implements SubCommand {
}
@Override
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
DefaultMQProducer defaultMQProducer = new DefaultMQProducer("ReSendMsgById");
......@@ -254,7 +255,7 @@ public class QueryMsgByIdSubCommand implements SubCommand {
}
} catch (Exception e) {
e.printStackTrace();
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
defaultMQProducer.shutdown();
defaultMQAdminExt.shutdown();
......
......@@ -25,6 +25,7 @@ import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
public class QueryMsgByKeySubCommand implements SubCommand {
......@@ -52,7 +53,7 @@ public class QueryMsgByKeySubCommand implements SubCommand {
}
@Override
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
......@@ -63,7 +64,7 @@ public class QueryMsgByKeySubCommand implements SubCommand {
this.queryByKey(defaultMQAdminExt, topic, key);
} catch (Exception e) {
e.printStackTrace();
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
defaultMQAdminExt.shutdown();
}
......
......@@ -26,6 +26,7 @@ import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
public class QueryMsgByOffsetSubCommand implements SubCommand {
......@@ -61,7 +62,7 @@ public class QueryMsgByOffsetSubCommand implements SubCommand {
}
@Override
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
DefaultMQPullConsumer defaultMQPullConsumer = new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP, rpcHook);
......@@ -96,7 +97,7 @@ public class QueryMsgByOffsetSubCommand implements SubCommand {
}
}
} catch (Exception e) {
e.printStackTrace();
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
defaultMQPullConsumer.shutdown();
defaultMQAdminExt.shutdown();
......
......@@ -35,6 +35,7 @@ import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.admin.api.MessageTrack;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
public class QueryMsgByUniqueKeySubCommand implements SubCommand {
......@@ -179,7 +180,7 @@ public class QueryMsgByUniqueKeySubCommand implements SubCommand {
}
@Override
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
......@@ -198,7 +199,7 @@ public class QueryMsgByUniqueKeySubCommand implements SubCommand {
queryById(defaultMQAdminExt, topic, msgId);
}
} catch (Exception e) {
e.printStackTrace();
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
defaultMQAdminExt.shutdown();
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.tools.command.message;
import java.io.File;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.store.ConsumeQueue;
import org.apache.rocketmq.store.MappedFile;
import org.apache.rocketmq.store.MappedFileQueue;
import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.apache.rocketmq.store.config.StorePathConfigHelper;
public class Store {
public final static int MESSAGE_MAGIC_CODE = 0xAABBCCDD ^ 1880681586 + 8;
private final static int BLANK_MAGIC_CODE = 0xBBCCDDEE ^ 1880681586 + 8;
private MappedFileQueue mapedFileQueue;
private ConcurrentHashMap<String/* topic */, ConcurrentHashMap<Integer/* queueId */, ConsumeQueue>> consumeQueueTable;
private String cStorePath;
private int cSize;
private String lStorePath;
private int lSize;
public Store(String cStorePath, int cSize, String lStorePath, int lSize) {
this.cStorePath = cStorePath;
this.cSize = cSize;
this.lStorePath = lStorePath;
this.lSize = lSize;
mapedFileQueue = new MappedFileQueue(cStorePath, cSize, null);
consumeQueueTable =
new ConcurrentHashMap<String/* topic */, ConcurrentHashMap<Integer/* queueId */, ConsumeQueue>>();
}
public boolean load() {
boolean result = this.mapedFileQueue.load();
System.out.printf("load commit log " + (result ? "OK" : "Failed"));
if (result) {
result = loadConsumeQueue();
}
System.out.printf("load logics log " + (result ? "OK" : "Failed"));
return result;
}
private boolean loadConsumeQueue() {
File dirLogic = new File(StorePathConfigHelper.getStorePathConsumeQueue(lStorePath));
File[] fileTopicList = dirLogic.listFiles();
if (fileTopicList != null) {
for (File fileTopic : fileTopicList) {
String topic = fileTopic.getName();
File[] fileQueueIdList = fileTopic.listFiles();
if (fileQueueIdList != null) {
for (File fileQueueId : fileQueueIdList) {
int queueId = Integer.parseInt(fileQueueId.getName());
ConsumeQueue logic = new ConsumeQueue(
topic,
queueId,
StorePathConfigHelper.getStorePathConsumeQueue(lStorePath),
lSize,
null);
this.putConsumeQueue(topic, queueId, logic);
if (!logic.load()) {
return false;
}
}
}
}
}
System.out.printf("load logics queue all over, OK");
return true;
}
private void putConsumeQueue(final String topic, final int queueId, final ConsumeQueue consumeQueue) {
ConcurrentHashMap<Integer/* queueId */, ConsumeQueue> map = this.consumeQueueTable.get(topic);
if (null == map) {
map = new ConcurrentHashMap<Integer/* queueId */, ConsumeQueue>();
map.put(queueId, consumeQueue);
this.consumeQueueTable.put(topic, map);
} else {
map.put(queueId, consumeQueue);
}
}
public void traval(boolean openAll) {
boolean success = true;
byte[] bytesContent = new byte[1024];
List<MappedFile> mapedFiles = this.mapedFileQueue.getMappedFiles();
ALL:
for (MappedFile mapedFile : mapedFiles) {
long startOffset = mapedFile.getFileFromOffset();
int position = 0;
int msgCount = 0;
int errorCount = 0;
System.out.printf("start travel " + mapedFile.getFileName());
long startTime = System.currentTimeMillis();
ByteBuffer byteBuffer = mapedFile.sliceByteBuffer();
while (byteBuffer.hasRemaining()) {
// 1 TOTALSIZE
int totalSize = byteBuffer.getInt();
// 2 MAGICCODE
int magicCode = byteBuffer.getInt();
if (BLANK_MAGIC_CODE == magicCode) {
position = byteBuffer.limit();
break;
}
// 3 BODYCRC
int bodyCRC = byteBuffer.getInt();
// 4 QUEUEID
int queueId = byteBuffer.getInt();
// 5 FLAG
int flag = byteBuffer.getInt();
// 6 QUEUEOFFSET
long queueOffset = byteBuffer.getLong();
// 7 PHYSICALOFFSET
long physicOffset = byteBuffer.getLong();
// 8 SYSFLAG
int sysFlag = byteBuffer.getInt();
// 9 BORNTIMESTAMP
long bornTimeStamp = byteBuffer.getLong();
// 10 BORNHOST(IP+PORT)
byteBuffer.position(byteBuffer.position() + 8);
// 11 STORETIMESTAMP
long storeTimestamp = byteBuffer.getLong();
// 12 STOREHOST(IP+PORT)
byteBuffer.position(byteBuffer.position() + 8);
// 13 RECONSUMETIMES
int reconsumeTimes = byteBuffer.getInt();
// 14 Prepared Transaction Offset
long preparedTransactionOffset = byteBuffer.getLong();
// 15 BODY
int bodyLen = byteBuffer.getInt();
if (bodyLen > 0) {
byteBuffer.position(byteBuffer.position() + bodyLen);
}
// 16 TOPIC
byte topicLen = byteBuffer.get();
byteBuffer.get(bytesContent, 0, topicLen);
String topic = null;
try {
topic = new String(bytesContent, 0, topicLen, MixAll.DEFAULT_CHARSET);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
Date storeTime = new Date(storeTimestamp);
long currentPhyOffset = startOffset + position;
if (physicOffset != currentPhyOffset) {
System.out.printf(storeTime
+ " [fetal error] physicOffset != currentPhyOffset. position=" + position
+ ", msgCount=" + msgCount + ", physicOffset=" + physicOffset
+ ", currentPhyOffset=" + currentPhyOffset);
errorCount++;
if (!openAll) {
success = false;
break ALL;
}
}
ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);
SelectMappedBufferResult smb = consumeQueue.getIndexBuffer(queueOffset);
try {
long offsetPy = smb.getByteBuffer().getLong();
int sizePy = smb.getByteBuffer().getInt();
if (physicOffset != offsetPy) {
System.out.printf(storeTime + " [fetal error] physicOffset != offsetPy. position="
+ position + ", msgCount=" + msgCount + ", physicOffset=" + physicOffset
+ ", offsetPy=" + offsetPy);
errorCount++;
if (!openAll) {
success = false;
break ALL;
}
}
if (totalSize != sizePy) {
System.out.printf(storeTime + " [fetal error] totalSize != sizePy. position="
+ position + ", msgCount=" + msgCount + ", totalSize=" + totalSize
+ ", sizePy=" + sizePy);
errorCount++;
if (!openAll) {
success = false;
break ALL;
}
}
} finally {
smb.release();
}
msgCount++;
position += totalSize;
byteBuffer.position(position);
}
System.out.printf("end travel " + mapedFile.getFileName() + ", total msg=" + msgCount
+ ", error count=" + errorCount + ", cost:" + (System.currentTimeMillis() - startTime));
}
System.out.printf("travel " + (success ? "ok" : "fail"));
}
public ConsumeQueue findConsumeQueue(String topic, int queueId) {
ConcurrentHashMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);
if (null == map) {
ConcurrentHashMap<Integer, ConsumeQueue> newMap =
new ConcurrentHashMap<Integer, ConsumeQueue>(128);
ConcurrentHashMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);
if (oldMap != null) {
map = oldMap;
} else {
map = newMap;
}
}
ConsumeQueue logic = map.get(queueId);
if (null == logic) {
ConsumeQueue newLogic = new ConsumeQueue(
topic,
queueId,
StorePathConfigHelper.getStorePathConsumeQueue(lStorePath),
lSize,
null);
ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);
if (oldLogic != null) {
logic = oldLogic;
} else {
logic = newLogic;
}
}
return logic;
}
}
\ No newline at end of file
......@@ -22,6 +22,7 @@ import org.apache.commons.cli.Options;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
public class DeleteKvConfigCommand implements SubCommand {
@Override
......@@ -47,7 +48,7 @@ public class DeleteKvConfigCommand implements SubCommand {
}
@Override
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
try {
......@@ -60,7 +61,7 @@ public class DeleteKvConfigCommand implements SubCommand {
defaultMQAdminExt.deleteKvConfig(namespace, key);
System.out.printf("delete kv config from namespace success.%n");
} catch (Exception e) {
e.printStackTrace();
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
defaultMQAdminExt.shutdown();
}
......
......@@ -26,6 +26,7 @@ import org.apache.commons.cli.Options;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
public class GetNamesrvConfigCommand implements SubCommand {
......@@ -45,7 +46,7 @@ public class GetNamesrvConfigCommand implements SubCommand {
}
@Override
public void execute(final CommandLine commandLine, final Options options, final RPCHook rpcHook) {
public void execute(final CommandLine commandLine, final Options options, final RPCHook rpcHook) throws SubCommandException {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
try {
......@@ -72,7 +73,7 @@ public class GetNamesrvConfigCommand implements SubCommand {
}
}
} catch (Exception e) {
e.printStackTrace();
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
defaultMQAdminExt.shutdown();
}
......
......@@ -22,6 +22,7 @@ import org.apache.commons.cli.Options;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
public class UpdateKvConfigCommand implements SubCommand {
@Override
......@@ -51,7 +52,7 @@ public class UpdateKvConfigCommand implements SubCommand {
}
@Override
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
try {
......@@ -66,7 +67,7 @@ public class UpdateKvConfigCommand implements SubCommand {
defaultMQAdminExt.createAndUpdateKvConfig(namespace, key, value);
System.out.printf("create or update kv config to namespace success.%n");
} catch (Exception e) {
e.printStackTrace();
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
defaultMQAdminExt.shutdown();
}
......
......@@ -26,6 +26,7 @@ import org.apache.commons.cli.Options;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
public class UpdateNamesrvConfigCommand implements SubCommand {
@Override
......@@ -52,7 +53,7 @@ public class UpdateNamesrvConfigCommand implements SubCommand {
}
@Override
public void execute(final CommandLine commandLine, final Options options, final RPCHook rpcHook) {
public void execute(final CommandLine commandLine, final Options options, final RPCHook rpcHook) throws SubCommandException {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
try {
......@@ -81,7 +82,7 @@ public class UpdateNamesrvConfigCommand implements SubCommand {
System.out.printf("update name server config success!%s\n%s : %s\n",
serverList == null ? "" : serverList, key, value);
} catch (Exception e) {
e.printStackTrace();
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
defaultMQAdminExt.shutdown();
}
......
......@@ -23,6 +23,7 @@ import org.apache.commons.cli.Options;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
public class WipeWritePermSubCommand implements SubCommand {
......@@ -45,7 +46,7 @@ public class WipeWritePermSubCommand implements SubCommand {
}
@Override
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
......@@ -74,7 +75,7 @@ public class WipeWritePermSubCommand implements SubCommand {
}
}
} catch (Exception e) {
e.printStackTrace();
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
defaultMQAdminExt.shutdown();
}
......
......@@ -28,6 +28,7 @@ import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
public class CloneGroupOffsetCommand implements SubCommand {
@Override
......@@ -64,7 +65,7 @@ public class CloneGroupOffsetCommand implements SubCommand {
}
@Override
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
String srcGroup = commandLine.getOptionValue("s").trim();
String destGroup = commandLine.getOptionValue("d").trim();
String topic = commandLine.getOptionValue("t").trim();
......@@ -95,7 +96,7 @@ public class CloneGroupOffsetCommand implements SubCommand {
System.out.printf("clone group offset success. srcGroup[%s], destGroup=[%s], topic[%s]",
srcGroup, destGroup, topic);
} catch (Exception e) {
e.printStackTrace();
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
defaultMQAdminExt.shutdown();
}
......
......@@ -26,6 +26,7 @@ import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
public class GetConsumerStatusCommand implements SubCommand {
@Override
......@@ -56,7 +57,7 @@ public class GetConsumerStatusCommand implements SubCommand {
}
@Override
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
try {
......@@ -92,7 +93,7 @@ public class GetConsumerStatusCommand implements SubCommand {
}
}
} catch (Exception e) {
e.printStackTrace();
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
defaultMQAdminExt.shutdown();
}
......
......@@ -22,25 +22,16 @@ import java.util.Map;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
public class ResetOffsetByTimeCommand implements SubCommand {
public static void main(String[] args) {
ResetOffsetByTimeCommand cmd = new ResetOffsetByTimeCommand();
Options options = ServerUtil.buildCommandlineOptions(new Options());
String[] subargs = new String[] {"-t Jodie_rest_test", "-g CID_Jodie_rest_test", "-s -1", "-f true"};
final CommandLine commandLine =
ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser());
cmd.execute(commandLine, options, null);
}
@Override
public String commandName() {
......@@ -77,7 +68,7 @@ public class ResetOffsetByTimeCommand implements SubCommand {
}
@Override
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
try {
......@@ -134,7 +125,7 @@ public class ResetOffsetByTimeCommand implements SubCommand {
entry.getValue());
}
} catch (Exception e) {
e.printStackTrace();
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
defaultMQAdminExt.shutdown();
}
......
......@@ -30,9 +30,11 @@ import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
public class ResetOffsetByTimeOldCommand implements SubCommand {
public static void resetOffset(DefaultMQAdminExt defaultMQAdminExt, String consumerGroup, String topic, long timestamp, boolean force,
public static void resetOffset(DefaultMQAdminExt defaultMQAdminExt, String consumerGroup, String topic,
long timestamp, boolean force,
String timeStampStr) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
List<RollbackStats> rollbackStatsList = defaultMQAdminExt.resetOffsetByTimestampOld(consumerGroup, topic, timestamp, force);
System.out.printf(
......@@ -91,7 +93,7 @@ public class ResetOffsetByTimeOldCommand implements SubCommand {
}
@Override
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
try {
......@@ -121,7 +123,7 @@ public class ResetOffsetByTimeOldCommand implements SubCommand {
}
} catch (Exception e) {
e.printStackTrace();
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
defaultMQAdminExt.shutdown();
}
......
......@@ -35,6 +35,7 @@ import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
public class StatsAllSubCommand implements SubCommand {
public static void printTopicDetail(final DefaultMQAdminExt admin, final String topic, final boolean activeTopic)
......@@ -160,7 +161,7 @@ public class StatsAllSubCommand implements SubCommand {
}
@Override
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
......@@ -198,7 +199,7 @@ public class StatsAllSubCommand implements SubCommand {
}
}
} catch (Exception e) {
e.printStackTrace();
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
defaultMQAdminExt.shutdown();
}
......
......@@ -31,6 +31,7 @@ import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
public class AllocateMQSubCommand implements SubCommand {
@Override
......@@ -57,7 +58,7 @@ public class AllocateMQSubCommand implements SubCommand {
}
@Override
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
DefaultMQAdminExt adminExt = new DefaultMQAdminExt(rpcHook);
adminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
try {
......@@ -86,7 +87,7 @@ public class AllocateMQSubCommand implements SubCommand {
final String json = RemotingSerializable.toJson(rr, false);
System.out.printf("%s%n", json);
} catch (Exception e) {
e.printStackTrace();
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
adminExt.shutdown();
}
......
......@@ -30,6 +30,7 @@ import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.CommandUtil;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
public class DeleteTopicSubCommand implements SubCommand {
public static void deleteTopic(final DefaultMQAdminExt adminExt,
......@@ -75,7 +76,7 @@ public class DeleteTopicSubCommand implements SubCommand {
}
@Override
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
DefaultMQAdminExt adminExt = new DefaultMQAdminExt(rpcHook);
adminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
try {
......@@ -91,7 +92,7 @@ public class DeleteTopicSubCommand implements SubCommand {
ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
} catch (Exception e) {
e.printStackTrace();
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
adminExt.shutdown();
}
......
......@@ -23,6 +23,7 @@ import org.apache.commons.cli.Options;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
public class TopicClusterSubCommand implements SubCommand {
......@@ -45,7 +46,7 @@ public class TopicClusterSubCommand implements SubCommand {
}
@Override
public void execute(final CommandLine commandLine, final Options options, RPCHook rpcHook) {
public void execute(final CommandLine commandLine, final Options options, RPCHook rpcHook) throws SubCommandException {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
String topic = commandLine.getOptionValue('t').trim();
......@@ -56,7 +57,7 @@ public class TopicClusterSubCommand implements SubCommand {
System.out.printf("%s%n", value);
}
} catch (Exception e) {
e.printStackTrace();
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
defaultMQAdminExt.shutdown();
}
......
......@@ -34,6 +34,7 @@ import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
public class TopicListSubCommand implements SubCommand {
......@@ -56,7 +57,7 @@ public class TopicListSubCommand implements SubCommand {
}
@Override
public void execute(final CommandLine commandLine, final Options options, RPCHook rpcHook) {
public void execute(final CommandLine commandLine, final Options options, RPCHook rpcHook) throws SubCommandException {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
......@@ -108,7 +109,7 @@ public class TopicListSubCommand implements SubCommand {
}
}
} catch (Exception e) {
e.printStackTrace();
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
defaultMQAdminExt.shutdown();
}
......
......@@ -23,6 +23,7 @@ import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
public class TopicRouteSubCommand implements SubCommand {
......@@ -46,7 +47,7 @@ public class TopicRouteSubCommand implements SubCommand {
}
@Override
public void execute(final CommandLine commandLine, final Options options, RPCHook rpcHook) {
public void execute(final CommandLine commandLine, final Options options, RPCHook rpcHook) throws SubCommandException {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
......@@ -59,7 +60,7 @@ public class TopicRouteSubCommand implements SubCommand {
String json = topicRouteData.toJson(true);
System.out.printf("%s%n", json);
} catch (Exception e) {
e.printStackTrace();
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
defaultMQAdminExt.shutdown();
}
......
......@@ -29,6 +29,7 @@ import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
public class TopicStatusSubCommand implements SubCommand {
......@@ -51,7 +52,7 @@ public class TopicStatusSubCommand implements SubCommand {
}
@Override
public void execute(final CommandLine commandLine, final Options options, RPCHook rpcHook) {
public void execute(final CommandLine commandLine, final Options options, RPCHook rpcHook) throws SubCommandException {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
......@@ -90,7 +91,7 @@ public class TopicStatusSubCommand implements SubCommand {
);
}
} catch (Exception e) {
e.printStackTrace();
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
defaultMQAdminExt.shutdown();
}
......
......@@ -25,6 +25,7 @@ import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
public class UpdateOrderConfCommand implements SubCommand {
......@@ -56,7 +57,7 @@ public class UpdateOrderConfCommand implements SubCommand {
}
@Override
public void execute(final CommandLine commandLine, final Options options, RPCHook rpcHook) {
public void execute(final CommandLine commandLine, final Options options, RPCHook rpcHook) throws SubCommandException {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
......@@ -98,7 +99,7 @@ public class UpdateOrderConfCommand implements SubCommand {
ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
} catch (Exception e) {
e.printStackTrace();
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
defaultMQAdminExt.shutdown();
}
......
......@@ -29,6 +29,7 @@ import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.CommandUtil;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
public class UpdateTopicPermSubCommand implements SubCommand {
......@@ -64,7 +65,7 @@ public class UpdateTopicPermSubCommand implements SubCommand {
}
@Override
public void execute(final CommandLine commandLine, final Options options, RPCHook rpcHook) {
public void execute(final CommandLine commandLine, final Options options, RPCHook rpcHook) throws SubCommandException {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
try {
......@@ -110,7 +111,7 @@ public class UpdateTopicPermSubCommand implements SubCommand {
}
ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
} catch (Exception e) {
e.printStackTrace();
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
defaultMQAdminExt.shutdown();
}
......
......@@ -27,6 +27,7 @@ import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.CommandUtil;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
public class UpdateTopicSubCommand implements SubCommand {
......@@ -82,7 +83,7 @@ public class UpdateTopicSubCommand implements SubCommand {
}
@Override
public void execute(final CommandLine commandLine, final Options options, RPCHook rpcHook) {
public void execute(final CommandLine commandLine, final Options options, RPCHook rpcHook) throws SubCommandException {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
......@@ -177,7 +178,7 @@ public class UpdateTopicSubCommand implements SubCommand {
ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
} catch (Exception e) {
e.printStackTrace();
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
defaultMQAdminExt.shutdown();
}
......
......@@ -36,8 +36,10 @@ import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
import org.apache.rocketmq.tools.command.SubCommandException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import static org.mockito.ArgumentMatchers.anyBoolean;
......@@ -79,8 +81,9 @@ public class BrokerConsumeStatsSubCommadTest {
public static void terminate() {
}
@Ignore
@Test
public void testExecute() {
public void testExecute() throws SubCommandException {
BrokerConsumeStatsSubCommad cmd = new BrokerConsumeStatsSubCommad();
Options options = ServerUtil.buildCommandlineOptions(new Options());
String[] subargs = new String[] {"-b 127.0.0.1:10911", "-t 3000", "-l 5", "-o true"};
......@@ -88,5 +91,4 @@ public class BrokerConsumeStatsSubCommadTest {
ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser());
cmd.execute(commandLine, options, null);
}
}
......@@ -34,8 +34,10 @@ import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
import org.apache.rocketmq.tools.command.SubCommandException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import static org.mockito.ArgumentMatchers.anyLong;
......@@ -75,8 +77,9 @@ public class BrokerStatusSubCommandTest {
defaultMQAdminExt.shutdown();
}
@Ignore
@Test
public void testExecute() {
public void testExecute() throws SubCommandException {
BrokerStatusSubCommand cmd = new BrokerStatusSubCommand();
Options options = ServerUtil.buildCommandlineOptions(new Options());
String[] subargs = new String[] {"-b 127.0.0.1:10911", "-c default-cluster"};
......@@ -84,5 +87,4 @@ public class BrokerStatusSubCommandTest {
ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser());
cmd.execute(commandLine, options, null);
}
}
......@@ -32,8 +32,10 @@ import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
import org.apache.rocketmq.tools.command.SubCommandException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import static org.mockito.ArgumentMatchers.anyLong;
......@@ -71,8 +73,9 @@ public class CleanExpiredCQSubCommandTest {
defaultMQAdminExt.shutdown();
}
@Ignore
@Test
public void testExecute() {
public void testExecute() throws SubCommandException {
CleanExpiredCQSubCommand cmd = new CleanExpiredCQSubCommand();
Options options = ServerUtil.buildCommandlineOptions(new Options());
String[] subargs = new String[] {"-b 127.0.0.1:10911", "-c default-cluster"};
......@@ -80,5 +83,4 @@ public class CleanExpiredCQSubCommandTest {
ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser());
cmd.execute(commandLine, options, null);
}
}
......@@ -32,8 +32,10 @@ import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
import org.apache.rocketmq.tools.command.SubCommandException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import static org.mockito.ArgumentMatchers.anyLong;
......@@ -71,8 +73,9 @@ public class CleanUnusedTopicCommandTest {
defaultMQAdminExt.shutdown();
}
@Ignore
@Test
public void testExecute() {
public void testExecute() throws SubCommandException {
CleanUnusedTopicCommand cmd = new CleanUnusedTopicCommand();
Options options = ServerUtil.buildCommandlineOptions(new Options());
String[] subargs = new String[] {"-b 127.0.0.1:10911", "-c default-cluster"};
......@@ -80,5 +83,4 @@ public class CleanUnusedTopicCommandTest {
ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser());
cmd.execute(commandLine, options, null);
}
}
......@@ -34,8 +34,10 @@ import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
import org.apache.rocketmq.tools.command.SubCommandException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import static org.mockito.ArgumentMatchers.anyLong;
......@@ -77,8 +79,9 @@ public class GetBrokerConfigCommandTest {
defaultMQAdminExt.shutdown();
}
@Ignore
@Test
public void testExecute() {
public void testExecute() throws SubCommandException {
GetBrokerConfigCommand cmd = new GetBrokerConfigCommand();
Options options = ServerUtil.buildCommandlineOptions(new Options());
String[] subargs = new String[] {"-b 127.0.0.1:10911", "-c default-cluster"};
......
......@@ -33,8 +33,10 @@ import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
import org.apache.rocketmq.tools.command.SubCommandException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import static org.mockito.Mockito.mock;
......@@ -67,8 +69,9 @@ public class UpdateBrokerConfigSubCommandTest {
defaultMQAdminExt.shutdown();
}
@Ignore
@Test
public void testExecute() {
public void testExecute() throws SubCommandException {
UpdateBrokerConfigSubCommand cmd = new UpdateBrokerConfigSubCommand();
Options options = ServerUtil.buildCommandlineOptions(new Options());
String[] subargs = new String[] {"-b 127.0.0.1:10911", "-c default-cluster", "-k topicname", "-v unit_test"};
......
......@@ -40,8 +40,10 @@ import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
import org.apache.rocketmq.tools.command.SubCommandException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import static org.mockito.ArgumentMatchers.anyLong;
......@@ -87,8 +89,9 @@ public class ConsumerConnectionSubCommandTest {
defaultMQAdminExt.shutdown();
}
@Ignore
@Test
public void testExecute() {
public void testExecute() throws SubCommandException {
ConsumerConnectionSubCommand cmd = new ConsumerConnectionSubCommand();
Options options = ServerUtil.buildCommandlineOptions(new Options());
String[] subargs = new String[] {"-g default-consumer-group"};
......
......@@ -35,8 +35,10 @@ import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
import org.apache.rocketmq.tools.command.SubCommandException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import static org.mockito.ArgumentMatchers.anyLong;
......@@ -81,8 +83,9 @@ public class ProducerConnectionSubCommandTest {
defaultMQAdminExt.shutdown();
}
@Ignore
@Test
public void testExecute() {
public void testExecute() throws SubCommandException {
ProducerConnectionSubCommand cmd = new ProducerConnectionSubCommand();
Options options = ServerUtil.buildCommandlineOptions(new Options());
String[] subargs = new String[] {"-g default-producer-group", "-t unit-test"};
......
......@@ -39,8 +39,10 @@ import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
import org.apache.rocketmq.tools.command.SubCommandException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import static org.mockito.ArgumentMatchers.anyLong;
......@@ -99,8 +101,9 @@ public class ConsumerProgressSubCommandTest {
defaultMQAdminExt.shutdown();
}
@Ignore
@Test
public void testExecute() {
public void testExecute() throws SubCommandException {
ConsumerProgressSubCommand cmd = new ConsumerProgressSubCommand();
Options options = ServerUtil.buildCommandlineOptions(new Options());
String[] subargs = new String[] {"-g default-group"};
......
......@@ -50,8 +50,10 @@ import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
import org.apache.rocketmq.tools.command.SubCommandException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import static org.mockito.ArgumentMatchers.anyBoolean;
......@@ -119,8 +121,9 @@ public class ConsumerStatusSubCommandTest {
defaultMQAdminExt.shutdown();
}
@Ignore
@Test
public void testExecute() {
public void testExecute() throws SubCommandException {
ConsumerStatusSubCommand cmd = new ConsumerStatusSubCommand();
Options options = ServerUtil.buildCommandlineOptions(new Options());
String[] subargs = new String[] {"-g default-group", "-i cid_one"};
......
......@@ -38,8 +38,10 @@ import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
import org.apache.rocketmq.tools.command.SubCommandException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
......@@ -79,8 +81,9 @@ public class GetNamesrvConfigCommandTest {
defaultMQAdminExt.shutdown();
}
// @Ignore
@Test
public void testExecute() {
public void testExecute() throws SubCommandException {
GetNamesrvConfigCommand cmd = new GetNamesrvConfigCommand();
Options options = ServerUtil.buildCommandlineOptions(new Options());
String[] subargs = new String[] {};
......
......@@ -35,8 +35,10 @@ import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
import org.apache.rocketmq.tools.command.SubCommandException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import static org.mockito.ArgumentMatchers.anyLong;
......@@ -78,8 +80,9 @@ public class WipeWritePermSubCommandTest {
defaultMQAdminExt.shutdown();
}
@Ignore
@Test
public void testExecute() {
public void testExecute() throws SubCommandException {
WipeWritePermSubCommand cmd = new WipeWritePermSubCommand();
Options options = ServerUtil.buildCommandlineOptions(new Options());
String[] subargs = new String[] {"-b default-broker"};
......
......@@ -33,8 +33,10 @@ import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
import org.apache.rocketmq.tools.command.SubCommandException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import static org.mockito.ArgumentMatchers.anyLong;
......@@ -73,8 +75,9 @@ public class GetConsumerStatusCommandTest {
defaultMQAdminExt.shutdown();
}
@Ignore
@Test
public void testExecute() {
public void testExecute() throws SubCommandException {
GetConsumerStatusCommand cmd = new GetConsumerStatusCommand();
Options options = ServerUtil.buildCommandlineOptions(new Options());
String[] subargs = new String[] {"-g default-group", "-t unit-test", "-i clientid"};
......
......@@ -38,8 +38,10 @@ import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
import org.apache.rocketmq.tools.command.SubCommandException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import static org.mockito.ArgumentMatchers.anyBoolean;
......@@ -93,8 +95,9 @@ public class ResetOffsetByTimeCommandTest {
defaultMQAdminExt.shutdown();
}
@Ignore
@Test
public void testExecute() {
public void testExecute() throws SubCommandException {
ResetOffsetByTimeCommand cmd = new ResetOffsetByTimeCommand();
Options options = ServerUtil.buildCommandlineOptions(new Options());
String[] subargs = new String[] {"-g default-group", "-t unit-test", "-s 1412131213231", "-f false"};
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册