提交 13f4297e 编写于 作者: S stevenschew 提交者: yukon

[ROCKETMQ-57] Add unit test for all commands

上级 712dec57
......@@ -68,8 +68,8 @@ import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.tools.admin.api.MessageTrack;
import org.junit.After;
import org.junit.Before;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;
......@@ -83,19 +83,20 @@ import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class DefaultMQAdminExtTest {
private DefaultMQAdminExtImpl defaultMQAdminExtImpl;
private MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
private MQClientAPIImpl mQClientAPIImpl;
private Properties properties = new Properties();
private TopicList topicList = new TopicList();
private TopicRouteData topicRouteData = new TopicRouteData();
private KVTable kvTable = new KVTable();
private ClusterInfo clusterInfo = new ClusterInfo();
@Before
public void init() throws Exception {
private static DefaultMQAdminExt defaultMQAdminExt;
private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
private static MQClientAPIImpl mQClientAPIImpl;
private static Properties properties = new Properties();
private static TopicList topicList = new TopicList();
private static TopicRouteData topicRouteData = new TopicRouteData();
private static KVTable kvTable = new KVTable();
private static ClusterInfo clusterInfo = new ClusterInfo();
@BeforeClass
public static void init() throws Exception {
mQClientAPIImpl = mock(MQClientAPIImpl.class);
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt();
defaultMQAdminExt = new DefaultMQAdminExt();
defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 1000);
Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance");
......@@ -104,6 +105,9 @@ public class DefaultMQAdminExtTest {
field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
field.setAccessible(true);
field.set(mqClientInstance, mQClientAPIImpl);
field = DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl");
field.setAccessible(true);
field.set(defaultMQAdminExt, defaultMQAdminExtImpl);
properties.setProperty("maxMessageSize", "5000000");
properties.setProperty("flushDelayOffsetInterval", "15000");
......@@ -223,15 +227,15 @@ public class DefaultMQAdminExtTest {
when(mQClientAPIImpl.getConsumerRunningInfo(anyString(), anyString(), anyString(), anyBoolean(), anyLong())).thenReturn(consumerRunningInfo);
}
@After
public void terminate() throws Exception {
@AfterClass
public static void terminate() throws Exception {
if (defaultMQAdminExtImpl != null)
defaultMQAdminExtImpl.shutdown();
defaultMQAdminExt.shutdown();
}
@Test
public void testUpdateBrokerConfig() throws InterruptedException, RemotingConnectException, UnsupportedEncodingException, RemotingTimeoutException, MQBrokerException, RemotingSendRequestException {
Properties result = defaultMQAdminExtImpl.getBrokerConfig("127.0.0.1:10911");
Properties result = defaultMQAdminExt.getBrokerConfig("127.0.0.1:10911");
assertThat(result.getProperty("maxMessageSize")).isEqualTo("5000000");
assertThat(result.getProperty("flushDelayOffsetInterval")).isEqualTo("15000");
assertThat(result.getProperty("serverSocketRcvBufSize")).isEqualTo("655350");
......@@ -239,21 +243,21 @@ public class DefaultMQAdminExtTest {
@Test
public void testFetchAllTopicList() throws RemotingException, MQClientException, InterruptedException {
TopicList topicList = defaultMQAdminExtImpl.fetchAllTopicList();
TopicList topicList = defaultMQAdminExt.fetchAllTopicList();
assertThat(topicList.getTopicList().size()).isEqualTo(2);
assertThat(topicList.getTopicList()).contains("topic_one");
}
@Test
public void testFetchBrokerRuntimeStats() throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
KVTable brokerStats = defaultMQAdminExtImpl.fetchBrokerRuntimeStats("127.0.0.1:10911");
KVTable brokerStats = defaultMQAdminExt.fetchBrokerRuntimeStats("127.0.0.1:10911");
assertThat(brokerStats.getTable().get("id")).isEqualTo("1234");
assertThat(brokerStats.getTable().get("brokerName")).isEqualTo("default-broker");
}
@Test
public void testExamineBrokerClusterInfo() throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
ClusterInfo clusterInfo = defaultMQAdminExtImpl.examineBrokerClusterInfo();
ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
HashMap<String, BrokerData> brokerList = clusterInfo.getBrokerAddrTable();
assertThat(brokerList.get("default-broker").getBrokerName()).isEqualTo("default-broker");
assertThat(brokerList.containsKey("broker-test")).isTrue();
......@@ -272,32 +276,32 @@ public class DefaultMQAdminExtTest {
@Test
public void testExamineConsumeStats() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
ConsumeStats consumeStats = defaultMQAdminExtImpl.examineConsumeStats("default-consumer-group", "unit-test");
ConsumeStats consumeStats = defaultMQAdminExt.examineConsumeStats("default-consumer-group", "unit-test");
assertThat(consumeStats.getConsumeTps()).isEqualTo(1234);
}
@Test
public void testExamineConsumerConnectionInfo() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
ConsumerConnection consumerConnection = defaultMQAdminExtImpl.examineConsumerConnectionInfo("default-consumer-group");
ConsumerConnection consumerConnection = defaultMQAdminExt.examineConsumerConnectionInfo("default-consumer-group");
assertThat(consumerConnection.getConsumeType()).isEqualTo(ConsumeType.CONSUME_PASSIVELY);
assertThat(consumerConnection.getMessageModel()).isEqualTo(MessageModel.CLUSTERING);
}
@Test
public void testExamineProducerConnectionInfo() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
ProducerConnection producerConnection = defaultMQAdminExtImpl.examineProducerConnectionInfo("default-producer-group", "unit-test");
ProducerConnection producerConnection = defaultMQAdminExt.examineProducerConnectionInfo("default-producer-group", "unit-test");
assertThat(producerConnection.getConnectionSet().size()).isEqualTo(1);
}
@Test
public void testWipeWritePermOfBroker() throws InterruptedException, RemotingCommandException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, RemotingConnectException {
int result = defaultMQAdminExtImpl.wipeWritePermOfBroker("127.0.0.1:9876", "default-broker");
int result = defaultMQAdminExt.wipeWritePermOfBroker("127.0.0.1:9876", "default-broker");
assertThat(result).isEqualTo(6);
}
@Test
public void testExamineTopicRouteInfo() throws RemotingException, MQClientException, InterruptedException {
TopicRouteData topicRouteData = defaultMQAdminExtImpl.examineTopicRouteInfo("UnitTest");
TopicRouteData topicRouteData = defaultMQAdminExt.examineTopicRouteInfo("UnitTest");
assertThat(topicRouteData.getBrokerDatas().get(0).getBrokerName()).isEqualTo("default-broker");
assertThat(topicRouteData.getBrokerDatas().get(0).getCluster()).isEqualTo("default-cluster");
}
......@@ -308,53 +312,53 @@ public class DefaultMQAdminExtTest {
result.add("default-name-one");
result.add("default-name-two");
when(mqClientInstance.getMQClientAPIImpl().getNameServerAddressList()).thenReturn(result);
List<String> nameList = defaultMQAdminExtImpl.getNameServerAddressList();
List<String> nameList = defaultMQAdminExt.getNameServerAddressList();
assertThat(nameList.get(0)).isEqualTo("default-name-one");
assertThat(nameList.get(1)).isEqualTo("default-name-two");
}
@Test
public void testPutKVConfig() throws RemotingException, MQClientException, InterruptedException {
String topicConfig = defaultMQAdminExtImpl.getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG, "UnitTest");
String topicConfig = defaultMQAdminExt.getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG, "UnitTest");
assertThat(topicConfig).isEqualTo("topicListConfig");
KVTable kvs = defaultMQAdminExtImpl.getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);
KVTable kvs = defaultMQAdminExt.getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);
assertThat(kvs.getTable().get("broker-name")).isEqualTo("broker-one");
assertThat(kvs.getTable().get("cluster-name")).isEqualTo("default-cluster");
}
@Test
public void testQueryTopicConsumeByWho() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
GroupList groupList = defaultMQAdminExtImpl.queryTopicConsumeByWho("UnitTest");
GroupList groupList = defaultMQAdminExt.queryTopicConsumeByWho("UnitTest");
assertThat(groupList.getGroupList().contains("consumer-group-two")).isTrue();
}
@Test
public void testQueryConsumeTimeSpan() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
List<QueueTimeSpan> result = defaultMQAdminExtImpl.queryConsumeTimeSpan("unit-test", "default-broker-group");
List<QueueTimeSpan> result = defaultMQAdminExt.queryConsumeTimeSpan("unit-test", "default-broker-group");
assertThat(result.size()).isEqualTo(0);
}
@Test
public void testCleanExpiredConsumerQueue() throws InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException {
boolean result = defaultMQAdminExtImpl.cleanExpiredConsumerQueue("default-cluster");
boolean result = defaultMQAdminExt.cleanExpiredConsumerQueue("default-cluster");
assertThat(result).isFalse();
}
@Test
public void testCleanExpiredConsumerQueueByAddr() throws InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException {
boolean clean = defaultMQAdminExtImpl.cleanExpiredConsumerQueueByAddr("127.0.0.1:10911");
boolean clean = defaultMQAdminExt.cleanExpiredConsumerQueueByAddr("127.0.0.1:10911");
assertThat(clean).isTrue();
}
@Test
public void testCleanUnusedTopic() throws InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException {
boolean result = defaultMQAdminExtImpl.cleanUnusedTopic("default-cluster");
boolean result = defaultMQAdminExt.cleanUnusedTopic("default-cluster");
assertThat(result).isFalse();
}
@Test
public void testGetConsumerRunningInfo() throws RemotingException, MQClientException, InterruptedException {
ConsumerRunningInfo consumerRunningInfo = defaultMQAdminExtImpl.getConsumerRunningInfo("consumer-group", "cid_123", false);
ConsumerRunningInfo consumerRunningInfo = defaultMQAdminExt.getConsumerRunningInfo("consumer-group", "cid_123", false);
assertThat(consumerRunningInfo.getJstack()).isEqualTo("test");
}
......@@ -363,25 +367,25 @@ public class DefaultMQAdminExtTest {
MessageExt messageExt = new MessageExt();
messageExt.setMsgId("msgId");
messageExt.setTopic("unit-test");
List<MessageTrack> messageTrackList = defaultMQAdminExtImpl.messageTrackDetail(messageExt);
List<MessageTrack> messageTrackList = defaultMQAdminExt.messageTrackDetail(messageExt);
assertThat(messageTrackList.size()).isEqualTo(2);
}
@Test
public void testGetConsumeStatus() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
Map<String, Map<MessageQueue, Long>> result = defaultMQAdminExtImpl.getConsumeStatus("unit-test", "default-broker-group", "127.0.0.1:10911");
Map<String, Map<MessageQueue, Long>> result = defaultMQAdminExt.getConsumeStatus("unit-test", "default-broker-group", "127.0.0.1:10911");
assertThat(result.size()).isEqualTo(0);
}
@Test
public void testGetTopicClusterList() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
Set<String> result = defaultMQAdminExtImpl.getTopicClusterList("unit-test");
Set<String> result = defaultMQAdminExt.getTopicClusterList("unit-test");
assertThat(result.size()).isEqualTo(0);
}
@Test
public void testGetClusterList() throws InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException {
Set<String> clusterlist = defaultMQAdminExtImpl.getClusterList("UnitTest");
Set<String> clusterlist = defaultMQAdminExt.getClusterList("UnitTest");
assertThat(clusterlist.contains("default-cluster-one")).isTrue();
assertThat(clusterlist.contains("default-cluster-two")).isTrue();
}
......@@ -391,13 +395,13 @@ public class DefaultMQAdminExtTest {
ConsumeStatsList result = new ConsumeStatsList();
result.setBrokerAddr("127.0.0.1:10911");
when(mqClientInstance.getMQClientAPIImpl().fetchConsumeStatsInBroker("127.0.0.1:10911", false, 10000)).thenReturn(result);
ConsumeStatsList consumeStatsList = defaultMQAdminExtImpl.fetchConsumeStatsInBroker("127.0.0.1:10911", false, 10000);
ConsumeStatsList consumeStatsList = defaultMQAdminExt.fetchConsumeStatsInBroker("127.0.0.1:10911", false, 10000);
assertThat(consumeStatsList.getBrokerAddr()).isEqualTo("127.0.0.1:10911");
}
@Test
public void testGetAllSubscriptionGroup() throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
SubscriptionGroupWrapper subscriptionGroupWrapper = defaultMQAdminExtImpl.getAllSubscriptionGroup("127.0.0.1:10911", 10000);
SubscriptionGroupWrapper subscriptionGroupWrapper = defaultMQAdminExt.getAllSubscriptionGroup("127.0.0.1:10911", 10000);
assertThat(subscriptionGroupWrapper.getSubscriptionGroupTable().get("Consumer-group-one").getBrokerId()).isEqualTo(1234);
assertThat(subscriptionGroupWrapper.getSubscriptionGroupTable().get("Consumer-group-one").getGroupName()).isEqualTo("Consumer-group-one");
assertThat(subscriptionGroupWrapper.getSubscriptionGroupTable().get("Consumer-group-one").isConsumeBroadcastEnable()).isTrue();
......
......@@ -16,6 +16,12 @@
*/
package org.apache.rocketmq.tools.command;
import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
......@@ -33,9 +39,6 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.lang.reflect.Field;
import java.util.*;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.tools.command.broker;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.MQClientAPIImpl;
import org.apache.rocketmq.client.impl.MQClientManager;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.common.admin.ConsumeStats;
import org.apache.rocketmq.common.protocol.body.ConsumeStatsList;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class BrokerConsumeStatsSubCommadTest {
private static DefaultMQAdminExt defaultMQAdminExt;
private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
private static MQClientAPIImpl mQClientAPIImpl;
@BeforeClass
public static void init() throws NoSuchFieldException, IllegalAccessException, InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException {
mQClientAPIImpl = mock(MQClientAPIImpl.class);
defaultMQAdminExt = new DefaultMQAdminExt();
defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 1000);
Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance");
field.setAccessible(true);
field.set(defaultMQAdminExtImpl, mqClientInstance);
field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
field.setAccessible(true);
field.set(mqClientInstance, mQClientAPIImpl);
field = DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl");
field.setAccessible(true);
field.set(defaultMQAdminExt, defaultMQAdminExtImpl);
ConsumeStatsList consumeStatsList = new ConsumeStatsList();
consumeStatsList.setBrokerAddr("127.0l.0.1:10911");
consumeStatsList.setConsumeStatsList(new ArrayList<Map<String, List<ConsumeStats>>>());
consumeStatsList.setTotalDiff(123);
when(mQClientAPIImpl.fetchConsumeStatsInBroker(anyString(), anyBoolean(), anyLong())).thenReturn(consumeStatsList);
}
@AfterClass
public static void terminate() {
}
@Test
public void testExecute() {
BrokerConsumeStatsSubCommad cmd = new BrokerConsumeStatsSubCommad();
Options options = ServerUtil.buildCommandlineOptions(new Options());
String[] subargs = new String[] {"-b 127.0.0.1:10911", "-t 3000", "-l 5", "-o true"};
final CommandLine commandLine =
ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser());
cmd.execute(commandLine, options, null);
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.tools.command.broker;
import java.lang.reflect.Field;
import java.util.HashMap;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.MQClientAPIImpl;
import org.apache.rocketmq.client.impl.MQClientManager;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.common.protocol.body.KVTable;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class BrokerStatusSubCommandTest {
private static DefaultMQAdminExt defaultMQAdminExt;
private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
private static MQClientAPIImpl mQClientAPIImpl;
@BeforeClass
public static void init() throws NoSuchFieldException, IllegalAccessException, InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
mQClientAPIImpl = mock(MQClientAPIImpl.class);
defaultMQAdminExt = new DefaultMQAdminExt();
defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 1000);
Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance");
field.setAccessible(true);
field.set(defaultMQAdminExtImpl, mqClientInstance);
field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
field.setAccessible(true);
field.set(mqClientInstance, mQClientAPIImpl);
field = DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl");
field.setAccessible(true);
field.set(defaultMQAdminExt, defaultMQAdminExtImpl);
KVTable kvTable = new KVTable();
kvTable.setTable(new HashMap<String, String>());
when(mQClientAPIImpl.getBrokerRuntimeInfo(anyString(), anyLong())).thenReturn(kvTable);
}
@AfterClass
public static void terminate() {
defaultMQAdminExt.shutdown();
}
@Test
public void testExecute() {
BrokerStatusSubCommand cmd = new BrokerStatusSubCommand();
Options options = ServerUtil.buildCommandlineOptions(new Options());
String[] subargs = new String[] {"-b 127.0.0.1:10911", "-c default-cluster"};
final CommandLine commandLine =
ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser());
cmd.execute(commandLine, options, null);
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.tools.command.broker;
import java.lang.reflect.Field;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.MQClientAPIImpl;
import org.apache.rocketmq.client.impl.MQClientManager;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class CleanExpiredCQSubCommandTest {
private static DefaultMQAdminExt defaultMQAdminExt;
private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
private static MQClientAPIImpl mQClientAPIImpl;
@BeforeClass
public static void init() throws NoSuchFieldException, IllegalAccessException, InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
mQClientAPIImpl = mock(MQClientAPIImpl.class);
defaultMQAdminExt = new DefaultMQAdminExt();
defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 1000);
Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance");
field.setAccessible(true);
field.set(defaultMQAdminExtImpl, mqClientInstance);
field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
field.setAccessible(true);
field.set(mqClientInstance, mQClientAPIImpl);
field = DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl");
field.setAccessible(true);
field.set(defaultMQAdminExt, defaultMQAdminExtImpl);
when(mQClientAPIImpl.cleanExpiredConsumeQueue(anyString(), anyLong())).thenReturn(true);
}
@AfterClass
public static void terminate() {
defaultMQAdminExt.shutdown();
}
@Test
public void testExecute() {
CleanExpiredCQSubCommand cmd = new CleanExpiredCQSubCommand();
Options options = ServerUtil.buildCommandlineOptions(new Options());
String[] subargs = new String[] {"-b 127.0.0.1:10911", "-c default-cluster"};
final CommandLine commandLine =
ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser());
cmd.execute(commandLine, options, null);
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.tools.command.broker;
import java.lang.reflect.Field;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.MQClientAPIImpl;
import org.apache.rocketmq.client.impl.MQClientManager;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class CleanUnusedTopicCommandTest {
private static DefaultMQAdminExt defaultMQAdminExt;
private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
private static MQClientAPIImpl mQClientAPIImpl;
@BeforeClass
public static void init() throws NoSuchFieldException, IllegalAccessException, InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
mQClientAPIImpl = mock(MQClientAPIImpl.class);
defaultMQAdminExt = new DefaultMQAdminExt();
defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 1000);
Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance");
field.setAccessible(true);
field.set(defaultMQAdminExtImpl, mqClientInstance);
field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
field.setAccessible(true);
field.set(mqClientInstance, mQClientAPIImpl);
field = DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl");
field.setAccessible(true);
field.set(defaultMQAdminExt, defaultMQAdminExtImpl);
when(mQClientAPIImpl.cleanUnusedTopicByAddr(anyString(), anyLong())).thenReturn(true);
}
@AfterClass
public static void terminate() {
defaultMQAdminExt.shutdown();
}
@Test
public void testExecute() {
CleanUnusedTopicCommand cmd = new CleanUnusedTopicCommand();
Options options = ServerUtil.buildCommandlineOptions(new Options());
String[] subargs = new String[] {"-b 127.0.0.1:10911", "-c default-cluster"};
final CommandLine commandLine =
ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser());
cmd.execute(commandLine, options, null);
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.tools.command.broker;
import java.io.UnsupportedEncodingException;
import java.lang.reflect.Field;
import java.util.Properties;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.MQClientAPIImpl;
import org.apache.rocketmq.client.impl.MQClientManager;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class GetBrokerConfigCommandTest {
private static DefaultMQAdminExt defaultMQAdminExt;
private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
private static MQClientAPIImpl mQClientAPIImpl;
@BeforeClass
public static void init() throws NoSuchFieldException, IllegalAccessException, InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException, MQBrokerException, UnsupportedEncodingException {
mQClientAPIImpl = mock(MQClientAPIImpl.class);
defaultMQAdminExt = new DefaultMQAdminExt();
defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 1000);
Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance");
field.setAccessible(true);
field.set(defaultMQAdminExtImpl, mqClientInstance);
field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
field.setAccessible(true);
field.set(mqClientInstance, mQClientAPIImpl);
field = DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl");
field.setAccessible(true);
field.set(defaultMQAdminExt, defaultMQAdminExtImpl);
Properties properties = new Properties();
properties.setProperty("maxMessageSize", "5000000");
properties.setProperty("flushDelayOffsetInterval", "15000");
properties.setProperty("serverSocketRcvBufSize", "655350");
when(mQClientAPIImpl.getBrokerConfig(anyString(), anyLong())).thenReturn(properties);
}
@AfterClass
public static void terminate() {
defaultMQAdminExt.shutdown();
}
@Test
public void testExecute() {
GetBrokerConfigCommand cmd = new GetBrokerConfigCommand();
Options options = ServerUtil.buildCommandlineOptions(new Options());
String[] subargs = new String[] {"-b 127.0.0.1:10911", "-c default-cluster"};
final CommandLine commandLine =
ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser());
cmd.execute(commandLine, options, null);
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.tools.command.broker;
import java.lang.reflect.Field;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.MQClientAPIImpl;
import org.apache.rocketmq.client.impl.MQClientManager;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.mockito.Mockito.mock;
public class SendMsgStatusCommandTest {
private static DefaultMQAdminExt defaultMQAdminExt;
private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
private static MQClientAPIImpl mQClientAPIImpl;
@BeforeClass
public static void init() throws NoSuchFieldException, IllegalAccessException, InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
mQClientAPIImpl = mock(MQClientAPIImpl.class);
defaultMQAdminExt = new DefaultMQAdminExt();
defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 1000);
Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance");
field.setAccessible(true);
field.set(defaultMQAdminExtImpl, mqClientInstance);
field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
field.setAccessible(true);
field.set(mqClientInstance, mQClientAPIImpl);
field = DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl");
field.setAccessible(true);
field.set(defaultMQAdminExt, defaultMQAdminExtImpl);
}
@AfterClass
public static void terminate() {
defaultMQAdminExt.shutdown();
}
@Test
public void testExecute() {
SendMsgStatusCommand cmd = new SendMsgStatusCommand();
Options options = ServerUtil.buildCommandlineOptions(new Options());
String[] subargs = new String[] {"-b 127.0.0.1:10911", "-s 1024 -c 10"};
final CommandLine commandLine =
ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser());
//cmd.execute(commandLine, options, null);
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.tools.command.broker;
import java.io.UnsupportedEncodingException;
import java.lang.reflect.Field;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.MQClientAPIImpl;
import org.apache.rocketmq.client.impl.MQClientManager;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.mockito.Mockito.mock;
public class UpdateBrokerConfigSubCommandTest {
private static DefaultMQAdminExt defaultMQAdminExt;
private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
private static MQClientAPIImpl mQClientAPIImpl;
@BeforeClass
public static void init() throws NoSuchFieldException, IllegalAccessException, InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException, MQBrokerException, UnsupportedEncodingException {
mQClientAPIImpl = mock(MQClientAPIImpl.class);
defaultMQAdminExt = new DefaultMQAdminExt();
defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 1000);
Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance");
field.setAccessible(true);
field.set(defaultMQAdminExtImpl, mqClientInstance);
field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
field.setAccessible(true);
field.set(mqClientInstance, mQClientAPIImpl);
field = DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl");
field.setAccessible(true);
field.set(defaultMQAdminExt, defaultMQAdminExtImpl);
}
@AfterClass
public static void terminate() {
defaultMQAdminExt.shutdown();
}
@Test
public void testExecute() {
UpdateBrokerConfigSubCommand cmd = new UpdateBrokerConfigSubCommand();
Options options = ServerUtil.buildCommandlineOptions(new Options());
String[] subargs = new String[] {"-b 127.0.0.1:10911", "-c default-cluster", "-k topicname", "-v unit_test"};
final CommandLine commandLine =
ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser());
cmd.execute(commandLine, options, null);
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.tools.command.connection;
import java.lang.reflect.Field;
import java.util.HashSet;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.MQClientAPIImpl;
import org.apache.rocketmq.client.impl.MQClientManager;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.protocol.body.Connection;
import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class ConsumerConnectionSubCommandTest {
private static DefaultMQAdminExt defaultMQAdminExt;
private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
private static MQClientAPIImpl mQClientAPIImpl;
@BeforeClass
public static void init() throws NoSuchFieldException, IllegalAccessException, InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
mQClientAPIImpl = mock(MQClientAPIImpl.class);
defaultMQAdminExt = new DefaultMQAdminExt();
defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 1000);
Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance");
field.setAccessible(true);
field.set(defaultMQAdminExtImpl, mqClientInstance);
field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
field.setAccessible(true);
field.set(mqClientInstance, mQClientAPIImpl);
field = DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl");
field.setAccessible(true);
field.set(defaultMQAdminExt, defaultMQAdminExtImpl);
ConsumerConnection consumerConnection = new ConsumerConnection();
consumerConnection.setConsumeType(ConsumeType.CONSUME_PASSIVELY);
consumerConnection.setMessageModel(MessageModel.CLUSTERING);
HashSet<Connection> connections = new HashSet<>();
connections.add(new Connection());
consumerConnection.setConnectionSet(connections);
consumerConnection.setSubscriptionTable(new ConcurrentHashMap<String, SubscriptionData>());
consumerConnection.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
when(mQClientAPIImpl.getConsumerConnectionList(anyString(), anyString(), anyLong())).thenReturn(consumerConnection);
}
@AfterClass
public static void terminate() {
defaultMQAdminExt.shutdown();
}
@Test
public void testExecute() {
ConsumerConnectionSubCommand cmd = new ConsumerConnectionSubCommand();
Options options = ServerUtil.buildCommandlineOptions(new Options());
String[] subargs = new String[] {"-g default-consumer-group"};
final CommandLine commandLine =
ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser());
cmd.execute(commandLine, options, null);
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.tools.command.connection;
import java.lang.reflect.Field;
import java.util.HashSet;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.MQClientAPIImpl;
import org.apache.rocketmq.client.impl.MQClientManager;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.common.protocol.body.Connection;
import org.apache.rocketmq.common.protocol.body.ProducerConnection;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class ProducerConnectionSubCommandTest {
private static DefaultMQAdminExt defaultMQAdminExt;
private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
private static MQClientAPIImpl mQClientAPIImpl;
@BeforeClass
public static void init() throws NoSuchFieldException, IllegalAccessException, InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
mQClientAPIImpl = mock(MQClientAPIImpl.class);
defaultMQAdminExt = new DefaultMQAdminExt();
defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 1000);
Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance");
field.setAccessible(true);
field.set(defaultMQAdminExtImpl, mqClientInstance);
field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
field.setAccessible(true);
field.set(mqClientInstance, mQClientAPIImpl);
field = DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl");
field.setAccessible(true);
field.set(defaultMQAdminExt, defaultMQAdminExtImpl);
ProducerConnection producerConnection = new ProducerConnection();
Connection connection = new Connection();
connection.setClientAddr("127.0.0.1:9898");
connection.setClientId("PID_12345");
HashSet<Connection> connectionSet = new HashSet<>();
connectionSet.add(connection);
producerConnection.setConnectionSet(connectionSet);
when(mQClientAPIImpl.getProducerConnectionList(anyString(), anyString(), anyLong())).thenReturn(producerConnection);
}
@AfterClass
public static void terminate() {
defaultMQAdminExt.shutdown();
}
@Test
public void testExecute() {
ProducerConnectionSubCommand cmd = new ProducerConnectionSubCommand();
Options options = ServerUtil.buildCommandlineOptions(new Options());
String[] subargs = new String[] {"-g default-producer-group", "-t unit-test"};
final CommandLine commandLine =
ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser());
cmd.execute(commandLine, options, null);
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.tools.command.consumer;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.MQClientAPIImpl;
import org.apache.rocketmq.client.impl.MQClientManager;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.common.admin.ConsumeStats;
import org.apache.rocketmq.common.admin.OffsetWrapper;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class ConsumerProgressSubCommandTest {
private static DefaultMQAdminExt defaultMQAdminExt;
private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
private static MQClientAPIImpl mQClientAPIImpl;
@BeforeClass
public static void init() throws NoSuchFieldException, IllegalAccessException, InterruptedException, RemotingException, MQClientException, MQBrokerException {
mQClientAPIImpl = mock(MQClientAPIImpl.class);
defaultMQAdminExt = new DefaultMQAdminExt();
defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 1000);
Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance");
field.setAccessible(true);
field.set(defaultMQAdminExtImpl, mqClientInstance);
field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
field.setAccessible(true);
field.set(mqClientInstance, mQClientAPIImpl);
field = DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl");
field.setAccessible(true);
field.set(defaultMQAdminExt, defaultMQAdminExtImpl);
TopicRouteData topicRouteData = new TopicRouteData();
List<BrokerData> brokerDatas = new ArrayList<>();
HashMap<Long, String> brokerAddrs = new HashMap<>();
brokerAddrs.put(1234l, "127.0.0.1:10911");
BrokerData brokerData = new BrokerData();
brokerData.setCluster("default-cluster");
brokerData.setBrokerName("default-broker");
brokerData.setBrokerAddrs(brokerAddrs);
brokerDatas.add(brokerData);
topicRouteData.setBrokerDatas(brokerDatas);
topicRouteData.setQueueDatas(new ArrayList<QueueData>());
topicRouteData.setFilterServerTable(new HashMap<String, List<String>>());
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(topicRouteData);
ConsumeStats consumeStats = new ConsumeStats();
consumeStats.setConsumeTps(1234);
MessageQueue messageQueue = new MessageQueue();
OffsetWrapper offsetWrapper = new OffsetWrapper();
HashMap<MessageQueue, OffsetWrapper> stats = new HashMap<>();
stats.put(messageQueue, offsetWrapper);
consumeStats.setOffsetTable(stats);
when(mQClientAPIImpl.getConsumeStats(anyString(), anyString(), anyString(), anyLong())).thenReturn(consumeStats);
}
@AfterClass
public static void terminate() {
defaultMQAdminExt.shutdown();
}
@Test
public void testExecute() {
ConsumerProgressSubCommand cmd = new ConsumerProgressSubCommand();
Options options = ServerUtil.buildCommandlineOptions(new Options());
String[] subargs = new String[] {"-g default-group"};
final CommandLine commandLine =
ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser());
cmd.execute(commandLine, options, null);
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.tools.command.consumer;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.MQClientAPIImpl;
import org.apache.rocketmq.client.impl.MQClientManager;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.body.Connection;
import org.apache.rocketmq.common.protocol.body.ConsumeStatus;
import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.common.protocol.body.ProcessQueueInfo;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class ConsumerStatusSubCommandTest {
private static DefaultMQAdminExt defaultMQAdminExt;
private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
private static MQClientAPIImpl mQClientAPIImpl;
@BeforeClass
public static void init() throws NoSuchFieldException, IllegalAccessException, InterruptedException, RemotingException, MQClientException, MQBrokerException {
mQClientAPIImpl = mock(MQClientAPIImpl.class);
defaultMQAdminExt = new DefaultMQAdminExt();
defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 1000);
Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance");
field.setAccessible(true);
field.set(defaultMQAdminExtImpl, mqClientInstance);
field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
field.setAccessible(true);
field.set(mqClientInstance, mQClientAPIImpl);
field = DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl");
field.setAccessible(true);
field.set(defaultMQAdminExt, defaultMQAdminExtImpl);
TopicRouteData topicRouteData = new TopicRouteData();
List<BrokerData> brokerDatas = new ArrayList<>();
HashMap<Long, String> brokerAddrs = new HashMap<>();
brokerAddrs.put(1234l, "127.0.0.1:10911");
BrokerData brokerData = new BrokerData();
brokerData.setCluster("default-cluster");
brokerData.setBrokerName("default-broker");
brokerData.setBrokerAddrs(brokerAddrs);
brokerDatas.add(brokerData);
topicRouteData.setBrokerDatas(brokerDatas);
topicRouteData.setQueueDatas(new ArrayList<QueueData>());
topicRouteData.setFilterServerTable(new HashMap<String, List<String>>());
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(topicRouteData);
ConsumerConnection consumerConnection = new ConsumerConnection();
consumerConnection.setConsumeType(ConsumeType.CONSUME_PASSIVELY);
consumerConnection.setMessageModel(MessageModel.CLUSTERING);
HashSet<Connection> connections = new HashSet<>();
connections.add(new Connection());
consumerConnection.setConnectionSet(connections);
consumerConnection.setSubscriptionTable(new ConcurrentHashMap<String, SubscriptionData>());
consumerConnection.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
when(mQClientAPIImpl.getConsumerConnectionList(anyString(), anyString(), anyLong())).thenReturn(consumerConnection);
ConsumerRunningInfo consumerRunningInfo = new ConsumerRunningInfo();
consumerRunningInfo.setJstack("test");
consumerRunningInfo.setMqTable(new TreeMap<MessageQueue, ProcessQueueInfo>());
consumerRunningInfo.setStatusTable(new TreeMap<String, ConsumeStatus>());
consumerRunningInfo.setSubscriptionSet(new TreeSet<SubscriptionData>());
when(mQClientAPIImpl.getConsumerRunningInfo(anyString(), anyString(), anyString(), anyBoolean(), anyLong())).thenReturn(consumerRunningInfo);
}
@AfterClass
public static void terminate() {
defaultMQAdminExt.shutdown();
}
@Test
public void testExecute() {
ConsumerStatusSubCommand cmd = new ConsumerStatusSubCommand();
Options options = ServerUtil.buildCommandlineOptions(new Options());
String[] subargs = new String[] {"-g default-group", "-i cid_one"};
final CommandLine commandLine =
ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser());
cmd.execute(commandLine, options, null);
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.tools.command.namesrv;
import java.io.UnsupportedEncodingException;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.MQClientAPIImpl;
import org.apache.rocketmq.client.impl.MQClientManager;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class GetNamesrvConfigCommandTest {
private static DefaultMQAdminExt defaultMQAdminExt;
private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
private static MQClientAPIImpl mQClientAPIImpl;
@BeforeClass
public static void init() throws NoSuchFieldException, IllegalAccessException, InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException, MQBrokerException, UnsupportedEncodingException {
mQClientAPIImpl = mock(MQClientAPIImpl.class);
defaultMQAdminExt = new DefaultMQAdminExt();
defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 1000);
Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance");
field.setAccessible(true);
field.set(defaultMQAdminExtImpl, mqClientInstance);
field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
field.setAccessible(true);
field.set(mqClientInstance, mQClientAPIImpl);
field = DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl");
field.setAccessible(true);
field.set(defaultMQAdminExt, defaultMQAdminExtImpl);
Map<String, Properties> propertiesMap = new HashMap<>();
List<String> nameServers = new ArrayList<>();
when(mQClientAPIImpl.getNameServerConfig(ArgumentMatchers.<String>anyList(), anyLong())).thenReturn(propertiesMap);
}
@AfterClass
public static void terminate() {
defaultMQAdminExt.shutdown();
}
@Test
public void testExecute() {
GetNamesrvConfigCommand cmd = new GetNamesrvConfigCommand();
Options options = ServerUtil.buildCommandlineOptions(new Options());
String[] subargs = new String[] {};
final CommandLine commandLine =
ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser());
cmd.execute(commandLine, options, null);
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.tools.command.namesrv;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.MQClientAPIImpl;
import org.apache.rocketmq.client.impl.MQClientManager;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class WipeWritePermSubCommandTest {
private static DefaultMQAdminExt defaultMQAdminExt;
private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
private static MQClientAPIImpl mQClientAPIImpl;
@BeforeClass
public static void init() throws NoSuchFieldException, IllegalAccessException, InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException, MQBrokerException, RemotingCommandException {
mQClientAPIImpl = mock(MQClientAPIImpl.class);
defaultMQAdminExt = new DefaultMQAdminExt();
defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 1000);
Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance");
field.setAccessible(true);
field.set(defaultMQAdminExtImpl, mqClientInstance);
field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
field.setAccessible(true);
field.set(mqClientInstance, mQClientAPIImpl);
field = DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl");
field.setAccessible(true);
field.set(defaultMQAdminExt, defaultMQAdminExtImpl);
List<String> result = new ArrayList<>();
result.add("default-name-one");
result.add("default-name-two");
when(mqClientInstance.getMQClientAPIImpl().getNameServerAddressList()).thenReturn(result);
when(mQClientAPIImpl.wipeWritePermOfBroker(anyString(), anyString(), anyLong())).thenReturn(6);
}
@AfterClass
public static void terminate() {
defaultMQAdminExt.shutdown();
}
@Test
public void testExecute() {
WipeWritePermSubCommand cmd = new WipeWritePermSubCommand();
Options options = ServerUtil.buildCommandlineOptions(new Options());
String[] subargs = new String[] {"-b default-broker"};
final CommandLine commandLine =
ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser());
cmd.execute(commandLine, options, null);
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.tools.command.offset;
import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.MQClientAPIImpl;
import org.apache.rocketmq.client.impl.MQClientManager;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class GetConsumerStatusCommandTest {
private static DefaultMQAdminExt defaultMQAdminExt;
private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
private static MQClientAPIImpl mQClientAPIImpl;
@BeforeClass
public static void init() throws NoSuchFieldException, IllegalAccessException, InterruptedException, RemotingException, MQClientException, MQBrokerException {
mQClientAPIImpl = mock(MQClientAPIImpl.class);
defaultMQAdminExt = new DefaultMQAdminExt();
defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 1000);
Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance");
field.setAccessible(true);
field.set(defaultMQAdminExtImpl, mqClientInstance);
field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
field.setAccessible(true);
field.set(mqClientInstance, mQClientAPIImpl);
field = DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl");
field.setAccessible(true);
field.set(defaultMQAdminExt, defaultMQAdminExtImpl);
Map<String, Map<MessageQueue, Long>> consumerStatus = new HashMap<>();
when(mQClientAPIImpl.invokeBrokerToGetConsumerStatus(anyString(), anyString(), anyString(), anyString(), anyLong())).thenReturn(consumerStatus);
}
@AfterClass
public static void terminate() {
defaultMQAdminExt.shutdown();
}
@Test
public void testExecute() {
GetConsumerStatusCommand cmd = new GetConsumerStatusCommand();
Options options = ServerUtil.buildCommandlineOptions(new Options());
String[] subargs = new String[] {"-g default-group", "-t unit-test", "-i clientid"};
final CommandLine commandLine =
ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser());
cmd.execute(commandLine, options, null);
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.tools.command.offset;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.MQClientAPIImpl;
import org.apache.rocketmq.client.impl.MQClientManager;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class ResetOffsetByTimeCommandTest {
private static DefaultMQAdminExt defaultMQAdminExt;
private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
private static MQClientAPIImpl mQClientAPIImpl;
@BeforeClass
public static void init() throws NoSuchFieldException, IllegalAccessException, InterruptedException, RemotingException, MQClientException, MQBrokerException {
mQClientAPIImpl = mock(MQClientAPIImpl.class);
defaultMQAdminExt = new DefaultMQAdminExt();
defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 1000);
Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance");
field.setAccessible(true);
field.set(defaultMQAdminExtImpl, mqClientInstance);
field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
field.setAccessible(true);
field.set(mqClientInstance, mQClientAPIImpl);
field = DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl");
field.setAccessible(true);
field.set(defaultMQAdminExt, defaultMQAdminExtImpl);
TopicRouteData topicRouteData = new TopicRouteData();
List<BrokerData> brokerDatas = new ArrayList<>();
HashMap<Long, String> brokerAddrs = new HashMap<>();
brokerAddrs.put(1234l, "127.0.0.1:10911");
BrokerData brokerData = new BrokerData();
brokerData.setCluster("default-cluster");
brokerData.setBrokerName("default-broker");
brokerData.setBrokerAddrs(brokerAddrs);
brokerDatas.add(brokerData);
topicRouteData.setBrokerDatas(brokerDatas);
topicRouteData.setQueueDatas(new ArrayList<QueueData>());
topicRouteData.setFilterServerTable(new HashMap<String, List<String>>());
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(topicRouteData);
Map<MessageQueue, Long> messageQueueLongMap = new HashMap<>();
when(mQClientAPIImpl.invokeBrokerToResetOffset(anyString(), anyString(), anyString(), anyLong(), anyBoolean(), anyLong())).thenReturn(messageQueueLongMap);
}
@AfterClass
public static void terminate() {
defaultMQAdminExt.shutdown();
}
@Test
public void testExecute() {
ResetOffsetByTimeCommand cmd = new ResetOffsetByTimeCommand();
Options options = ServerUtil.buildCommandlineOptions(new Options());
String[] subargs = new String[] {"-g default-group", "-t unit-test", "-s 1412131213231", "-f false"};
final CommandLine commandLine =
ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser());
cmd.execute(commandLine, options, null);
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.tools.command.offset;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
public class ResetOffsetByTimeOldCommandTest {
@Test
public void testExecute() {
ResetOffsetByTimeOldCommand cmd = new ResetOffsetByTimeOldCommand();
Options options = ServerUtil.buildCommandlineOptions(new Options());
String[] subargs = new String[] {"-g default-group", "-t unit-test", "-s 1412131213231", "-f false"};
final CommandLine commandLine =
ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser());
assertThat(commandLine.getOptionValue('g').trim()).isEqualTo("default-group");
assertThat(commandLine.getOptionValue('t').trim()).isEqualTo("unit-test");
assertThat(commandLine.getOptionValue('s').trim()).isEqualTo("1412131213231");
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.tools.command.topic;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
public class AllocateMQSubCommandTest {
@Test
public void testExecute() {
AllocateMQSubCommand cmd = new AllocateMQSubCommand();
Options options = ServerUtil.buildCommandlineOptions(new Options());
String[] subargs = new String[] {"-t unit-test", "-i 127.0.0.1:10911"};
final CommandLine commandLine =
ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser());
assertThat(commandLine.getOptionValue('t').trim()).isEqualTo("unit-test");
assertThat(commandLine.getOptionValue("i").trim()).isEqualTo("127.0.0.1:10911");
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.tools.command.topic;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
public class DeleteTopicSubCommandTest {
@Test
public void testExecute() {
DeleteTopicSubCommand cmd = new DeleteTopicSubCommand();
Options options = ServerUtil.buildCommandlineOptions(new Options());
String[] subargs = new String[] {"-t unit-test", "-c default-cluster"};
final CommandLine commandLine =
ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser());
assertThat(commandLine.getOptionValue('t').trim()).isEqualTo("unit-test");
assertThat(commandLine.getOptionValue("c").trim()).isEqualTo("default-cluster");
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.tools.command.topic;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
public class TopicClusterSubCommandTest {
@Test
public void testExecute() {
TopicClusterSubCommand cmd = new TopicClusterSubCommand();
Options options = ServerUtil.buildCommandlineOptions(new Options());
String[] subargs = new String[] {"-t unit-test"};
final CommandLine commandLine =
ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser());
assertThat(commandLine.getOptionValue('t').trim()).isEqualTo("unit-test");
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.tools.command.topic;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
public class TopicRouteSubCommandTest {
@Test
public void testExecute() {
TopicRouteSubCommand cmd = new TopicRouteSubCommand();
Options options = ServerUtil.buildCommandlineOptions(new Options());
String[] subargs = new String[] {"-t unit-test"};
final CommandLine commandLine =
ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser());
assertThat(commandLine.getOptionValue('t').trim()).isEqualTo("unit-test");
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.tools.command.topic;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
public class TopicStatusSubCommandTest {
@Test
public void testExecute() {
TopicStatusSubCommand cmd = new TopicStatusSubCommand();
Options options = ServerUtil.buildCommandlineOptions(new Options());
String[] subargs = new String[] {"-t unit-test"};
final CommandLine commandLine =
ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser());
assertThat(commandLine.getOptionValue('t').trim()).isEqualTo("unit-test");
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.tools.command.topic;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
public class UpdateOrderConfCommandTest {
@Test
public void testExecute() {
UpdateOrderConfCommand cmd = new UpdateOrderConfCommand();
Options options = ServerUtil.buildCommandlineOptions(new Options());
String[] subargs = new String[] {"-t unit-test", "-v default-broker:8", "-m post"};
final CommandLine commandLine =
ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser());
assertThat(commandLine.getOptionValue('t').trim()).isEqualTo("unit-test");
assertThat(commandLine.getOptionValue('v').trim()).isEqualTo("default-broker:8");
assertThat(commandLine.getOptionValue('m').trim()).isEqualTo("post");
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.tools.command.topic;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
public class UpdateTopicPermSubCommandTest {
@Test
public void testExecute() {
UpdateTopicPermSubCommand cmd = new UpdateTopicPermSubCommand();
Options options = ServerUtil.buildCommandlineOptions(new Options());
String[] subargs = new String[] {"-b 127.0.0.1:10911", "-c default-cluster", "-t unit-test", "-p 6"};
final CommandLine commandLine =
ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser());
assertThat(commandLine.getOptionValue('b').trim()).isEqualTo("127.0.0.1:10911");
assertThat(commandLine.getOptionValue('c').trim()).isEqualTo("default-cluster");
assertThat(commandLine.getOptionValue('t').trim()).isEqualTo("unit-test");
assertThat(commandLine.getOptionValue('p').trim()).isEqualTo("6");
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.tools.command.topic;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
public class UpdateTopicSubCommandTest {
@Test
public void testExecute() {
UpdateTopicSubCommand cmd = new UpdateTopicSubCommand();
Options options = ServerUtil.buildCommandlineOptions(new Options());
String[] subargs = new String[] {
"-b 127.0.0.1:10911",
"-c default-cluster",
"-t unit-test",
"-r 8",
"-w 8",
"-p 6",
"-o false",
"-u false",
"-s false"};
final CommandLine commandLine =
ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser());
assertThat(commandLine.getOptionValue('b').trim()).isEqualTo("127.0.0.1:10911");
assertThat(commandLine.getOptionValue('c').trim()).isEqualTo("default-cluster");
assertThat(commandLine.getOptionValue('r').trim()).isEqualTo("8");
assertThat(commandLine.getOptionValue('w').trim()).isEqualTo("8");
assertThat(commandLine.getOptionValue('t').trim()).isEqualTo("unit-test");
assertThat(commandLine.getOptionValue('p').trim()).isEqualTo("6");
assertThat(commandLine.getOptionValue('o').trim()).isEqualTo("false");
assertThat(commandLine.getOptionValue('u').trim()).isEqualTo("false");
assertThat(commandLine.getOptionValue('s').trim()).isEqualTo("false");
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.tools.monitor;
import java.util.Properties;
import java.util.TreeMap;
import java.util.TreeSet;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.body.ConsumeStatus;
import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.common.protocol.body.ProcessQueueInfo;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.protocol.topic.OffsetMovedEvent;
import org.junit.Before;
import org.junit.Test;
import static org.mockito.Mockito.mock;
public class DefaultMonitorListenerTest {
private DefaultMonitorListener defaultMonitorListener;
@Before
public void init() {
defaultMonitorListener = mock(DefaultMonitorListener.class);
}
@Test
public void testBeginRound() {
defaultMonitorListener.beginRound();
}
@Test
public void testReportUndoneMsgs() {
UndoneMsgs undoneMsgs = new UndoneMsgs();
undoneMsgs.setConsumerGroup("default-group");
undoneMsgs.setTopic("unit-test");
undoneMsgs.setUndoneMsgsDelayTimeMills(30000);
undoneMsgs.setUndoneMsgsSingleMQ(1);
undoneMsgs.setUndoneMsgsTotal(100);
defaultMonitorListener.reportUndoneMsgs(undoneMsgs);
}
@Test
public void testReportFailedMsgs() {
FailedMsgs failedMsgs = new FailedMsgs();
failedMsgs.setTopic("unit-test");
failedMsgs.setConsumerGroup("default-consumer");
failedMsgs.setFailedMsgsTotalRecently(2);
defaultMonitorListener.reportFailedMsgs(failedMsgs);
}
@Test
public void testReportDeleteMsgsEvent() {
DeleteMsgsEvent deleteMsgsEvent = new DeleteMsgsEvent();
deleteMsgsEvent.setEventTimestamp(System.currentTimeMillis());
deleteMsgsEvent.setOffsetMovedEvent(new OffsetMovedEvent());
defaultMonitorListener.reportDeleteMsgsEvent(deleteMsgsEvent);
}
@Test
public void testReportConsumerRunningInfo() {
TreeMap<String, ConsumerRunningInfo> criTable = new TreeMap<>();
ConsumerRunningInfo consumerRunningInfo = new ConsumerRunningInfo();
consumerRunningInfo.setSubscriptionSet(new TreeSet<SubscriptionData>());
consumerRunningInfo.setStatusTable(new TreeMap<String, ConsumeStatus>());
consumerRunningInfo.setSubscriptionSet(new TreeSet<SubscriptionData>());
consumerRunningInfo.setMqTable(new TreeMap<MessageQueue, ProcessQueueInfo>());
consumerRunningInfo.setProperties(new Properties());
criTable.put("test", consumerRunningInfo);
defaultMonitorListener.reportConsumerRunningInfo(criTable);
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.tools.monitor;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.MQClientAPIImpl;
import org.apache.rocketmq.client.impl.MQClientManager;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.admin.ConsumeStats;
import org.apache.rocketmq.common.admin.OffsetWrapper;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.body.Connection;
import org.apache.rocketmq.common.protocol.body.ConsumeStatus;
import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.common.protocol.body.ProcessQueueInfo;
import org.apache.rocketmq.common.protocol.body.TopicList;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.protocol.LanguageCode;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.apache.rocketmq.common.protocol.heartbeat.ConsumeType.CONSUME_ACTIVELY;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.anyBoolean;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class MonitorServiceTest {
private static DefaultMQAdminExt defaultMQAdminExt;
private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
private static MQClientAPIImpl mQClientAPIImpl;
private static MonitorConfig monitorConfig;
private static MonitorListener monitorListener;
private static DefaultMQPullConsumer defaultMQPullConsumer;
private static DefaultMQPushConsumer defaultMQPushConsumer;
private static MonitorService monitorService;
@BeforeClass
public static void init() throws NoSuchFieldException, IllegalAccessException, RemotingException, MQClientException, InterruptedException, MQBrokerException {
monitorConfig = new MonitorConfig();
monitorListener = new DefaultMonitorListener();
defaultMQPullConsumer = mock(DefaultMQPullConsumer.class);
defaultMQPushConsumer = mock(DefaultMQPushConsumer.class);
mQClientAPIImpl = mock(MQClientAPIImpl.class);
defaultMQAdminExt = new DefaultMQAdminExt();
defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 1000);
monitorService = new MonitorService(monitorConfig, monitorListener, null);
Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance");
field.setAccessible(true);
field.set(defaultMQAdminExtImpl, mqClientInstance);
field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
field.setAccessible(true);
field.set(mqClientInstance, mQClientAPIImpl);
field = DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl");
field.setAccessible(true);
field.set(defaultMQAdminExt, defaultMQAdminExtImpl);
field = MonitorService.class.getDeclaredField("defaultMQAdminExt");
field.setAccessible(true);
field.set(monitorService, defaultMQAdminExt);
field = MonitorService.class.getDeclaredField("defaultMQPullConsumer");
field.setAccessible(true);
field.set(monitorService, defaultMQPullConsumer);
field = MonitorService.class.getDeclaredField("defaultMQPushConsumer");
field.setAccessible(true);
field.set(monitorService, defaultMQPushConsumer);
TopicList topicList = new TopicList();
Set<String> topicSet = new HashSet<>();
topicSet.add("topic_one");
topicSet.add("topic_two");
topicList.setTopicList(topicSet);
when(mQClientAPIImpl.getTopicListFromNameServer(anyLong())).thenReturn(topicList);
TopicRouteData topicRouteData = new TopicRouteData();
List<BrokerData> brokerDatas = new ArrayList<>();
HashMap<Long, String> brokerAddrs = new HashMap<>();
brokerAddrs.put(1234l, "127.0.0.1:10911");
BrokerData brokerData = new BrokerData();
brokerData.setCluster("default-cluster");
brokerData.setBrokerName("default-broker");
brokerData.setBrokerAddrs(brokerAddrs);
brokerDatas.add(brokerData);
topicRouteData.setBrokerDatas(brokerDatas);
topicRouteData.setQueueDatas(new ArrayList<QueueData>());
topicRouteData.setFilterServerTable(new HashMap<String, List<String>>());
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(topicRouteData);
ConsumeStats consumeStats = new ConsumeStats();
consumeStats.setConsumeTps(1234);
MessageQueue messageQueue = new MessageQueue();
OffsetWrapper offsetWrapper = new OffsetWrapper();
HashMap<MessageQueue, OffsetWrapper> stats = new HashMap<>();
stats.put(messageQueue, offsetWrapper);
consumeStats.setOffsetTable(stats);
when(mQClientAPIImpl.getConsumeStats(anyString(), anyString(), anyString(), anyLong())).thenReturn(consumeStats);
ConsumerConnection consumerConnection = new ConsumerConnection();
consumerConnection.setConsumeType(ConsumeType.CONSUME_PASSIVELY);
consumerConnection.setMessageModel(MessageModel.CLUSTERING);
HashSet<Connection> connections = new HashSet<>();
Connection connection = new Connection();
connection.setClientId("client_id");
connection.setClientAddr("127.0.0.1:109111");
connection.setLanguage(LanguageCode.JAVA);
connection.setVersion(MQVersion.Version.V4_0_0_SNAPSHOT.ordinal());
connections.add(connection);
consumerConnection.setConnectionSet(connections);
consumerConnection.setSubscriptionTable(new ConcurrentHashMap<String, SubscriptionData>());
consumerConnection.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
when(mQClientAPIImpl.getConsumerConnectionList(anyString(), anyString(), anyLong())).thenReturn(consumerConnection);
ConsumerRunningInfo consumerRunningInfo = new ConsumerRunningInfo();
consumerRunningInfo.setJstack("test");
consumerRunningInfo.setMqTable(new TreeMap<MessageQueue, ProcessQueueInfo>());
consumerRunningInfo.setStatusTable(new TreeMap<String, ConsumeStatus>());
consumerRunningInfo.setSubscriptionSet(new TreeSet<SubscriptionData>());
Properties properties = new Properties();
properties.put(ConsumerRunningInfo.PROP_CONSUME_TYPE, CONSUME_ACTIVELY);
properties.put(ConsumerRunningInfo.PROP_CONSUMER_START_TIMESTAMP, System.currentTimeMillis());
consumerRunningInfo.setProperties(properties);
when(mQClientAPIImpl.getConsumerRunningInfo(anyString(), anyString(), anyString(), anyBoolean(), anyLong())).thenReturn(consumerRunningInfo);
}
@AfterClass
public static void terminate() {
}
@Test
public void testDoMonitorWork() throws RemotingException, MQClientException, InterruptedException {
monitorService.doMonitorWork();
}
@Test
public void testReportConsumerRunningInfo() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
monitorService.reportConsumerRunningInfo("test_group");
}
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册