diff --git a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java index bb80eb4dc4a1632e4e30c9410d01d11e128ada56..78659800219742a7b2cf19cf188679d2584b13b8 100644 --- a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java +++ b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java @@ -68,8 +68,8 @@ import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.tools.admin.api.MessageTrack; -import org.junit.After; -import org.junit.Before; +import org.junit.AfterClass; +import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.junit.MockitoJUnitRunner; @@ -83,19 +83,20 @@ import static org.mockito.Mockito.when; @RunWith(MockitoJUnitRunner.class) public class DefaultMQAdminExtTest { - private DefaultMQAdminExtImpl defaultMQAdminExtImpl; - private MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); - private MQClientAPIImpl mQClientAPIImpl; - private Properties properties = new Properties(); - private TopicList topicList = new TopicList(); - private TopicRouteData topicRouteData = new TopicRouteData(); - private KVTable kvTable = new KVTable(); - private ClusterInfo clusterInfo = new ClusterInfo(); - - @Before - public void init() throws Exception { + private static DefaultMQAdminExt defaultMQAdminExt; + private static DefaultMQAdminExtImpl defaultMQAdminExtImpl; + private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private static MQClientAPIImpl mQClientAPIImpl; + private static Properties properties = new Properties(); + private static TopicList topicList = new TopicList(); + private static TopicRouteData topicRouteData = new TopicRouteData(); + private static KVTable kvTable = new KVTable(); + private static ClusterInfo clusterInfo = new ClusterInfo(); + + @BeforeClass + public static void init() throws Exception { mQClientAPIImpl = mock(MQClientAPIImpl.class); - DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(); + defaultMQAdminExt = new DefaultMQAdminExt(); defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 1000); Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance"); @@ -104,6 +105,9 @@ public class DefaultMQAdminExtTest { field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl"); field.setAccessible(true); field.set(mqClientInstance, mQClientAPIImpl); + field = DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl"); + field.setAccessible(true); + field.set(defaultMQAdminExt, defaultMQAdminExtImpl); properties.setProperty("maxMessageSize", "5000000"); properties.setProperty("flushDelayOffsetInterval", "15000"); @@ -223,15 +227,15 @@ public class DefaultMQAdminExtTest { when(mQClientAPIImpl.getConsumerRunningInfo(anyString(), anyString(), anyString(), anyBoolean(), anyLong())).thenReturn(consumerRunningInfo); } - @After - public void terminate() throws Exception { + @AfterClass + public static void terminate() throws Exception { if (defaultMQAdminExtImpl != null) - defaultMQAdminExtImpl.shutdown(); + defaultMQAdminExt.shutdown(); } @Test public void testUpdateBrokerConfig() throws InterruptedException, RemotingConnectException, UnsupportedEncodingException, RemotingTimeoutException, MQBrokerException, RemotingSendRequestException { - Properties result = defaultMQAdminExtImpl.getBrokerConfig("127.0.0.1:10911"); + Properties result = defaultMQAdminExt.getBrokerConfig("127.0.0.1:10911"); assertThat(result.getProperty("maxMessageSize")).isEqualTo("5000000"); assertThat(result.getProperty("flushDelayOffsetInterval")).isEqualTo("15000"); assertThat(result.getProperty("serverSocketRcvBufSize")).isEqualTo("655350"); @@ -239,21 +243,21 @@ public class DefaultMQAdminExtTest { @Test public void testFetchAllTopicList() throws RemotingException, MQClientException, InterruptedException { - TopicList topicList = defaultMQAdminExtImpl.fetchAllTopicList(); + TopicList topicList = defaultMQAdminExt.fetchAllTopicList(); assertThat(topicList.getTopicList().size()).isEqualTo(2); assertThat(topicList.getTopicList()).contains("topic_one"); } @Test public void testFetchBrokerRuntimeStats() throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException { - KVTable brokerStats = defaultMQAdminExtImpl.fetchBrokerRuntimeStats("127.0.0.1:10911"); + KVTable brokerStats = defaultMQAdminExt.fetchBrokerRuntimeStats("127.0.0.1:10911"); assertThat(brokerStats.getTable().get("id")).isEqualTo("1234"); assertThat(brokerStats.getTable().get("brokerName")).isEqualTo("default-broker"); } @Test public void testExamineBrokerClusterInfo() throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException { - ClusterInfo clusterInfo = defaultMQAdminExtImpl.examineBrokerClusterInfo(); + ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo(); HashMap brokerList = clusterInfo.getBrokerAddrTable(); assertThat(brokerList.get("default-broker").getBrokerName()).isEqualTo("default-broker"); assertThat(brokerList.containsKey("broker-test")).isTrue(); @@ -272,32 +276,32 @@ public class DefaultMQAdminExtTest { @Test public void testExamineConsumeStats() throws InterruptedException, RemotingException, MQClientException, MQBrokerException { - ConsumeStats consumeStats = defaultMQAdminExtImpl.examineConsumeStats("default-consumer-group", "unit-test"); + ConsumeStats consumeStats = defaultMQAdminExt.examineConsumeStats("default-consumer-group", "unit-test"); assertThat(consumeStats.getConsumeTps()).isEqualTo(1234); } @Test public void testExamineConsumerConnectionInfo() throws InterruptedException, RemotingException, MQClientException, MQBrokerException { - ConsumerConnection consumerConnection = defaultMQAdminExtImpl.examineConsumerConnectionInfo("default-consumer-group"); + ConsumerConnection consumerConnection = defaultMQAdminExt.examineConsumerConnectionInfo("default-consumer-group"); assertThat(consumerConnection.getConsumeType()).isEqualTo(ConsumeType.CONSUME_PASSIVELY); assertThat(consumerConnection.getMessageModel()).isEqualTo(MessageModel.CLUSTERING); } @Test public void testExamineProducerConnectionInfo() throws InterruptedException, RemotingException, MQClientException, MQBrokerException { - ProducerConnection producerConnection = defaultMQAdminExtImpl.examineProducerConnectionInfo("default-producer-group", "unit-test"); + ProducerConnection producerConnection = defaultMQAdminExt.examineProducerConnectionInfo("default-producer-group", "unit-test"); assertThat(producerConnection.getConnectionSet().size()).isEqualTo(1); } @Test public void testWipeWritePermOfBroker() throws InterruptedException, RemotingCommandException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, RemotingConnectException { - int result = defaultMQAdminExtImpl.wipeWritePermOfBroker("127.0.0.1:9876", "default-broker"); + int result = defaultMQAdminExt.wipeWritePermOfBroker("127.0.0.1:9876", "default-broker"); assertThat(result).isEqualTo(6); } @Test public void testExamineTopicRouteInfo() throws RemotingException, MQClientException, InterruptedException { - TopicRouteData topicRouteData = defaultMQAdminExtImpl.examineTopicRouteInfo("UnitTest"); + TopicRouteData topicRouteData = defaultMQAdminExt.examineTopicRouteInfo("UnitTest"); assertThat(topicRouteData.getBrokerDatas().get(0).getBrokerName()).isEqualTo("default-broker"); assertThat(topicRouteData.getBrokerDatas().get(0).getCluster()).isEqualTo("default-cluster"); } @@ -308,53 +312,53 @@ public class DefaultMQAdminExtTest { result.add("default-name-one"); result.add("default-name-two"); when(mqClientInstance.getMQClientAPIImpl().getNameServerAddressList()).thenReturn(result); - List nameList = defaultMQAdminExtImpl.getNameServerAddressList(); + List nameList = defaultMQAdminExt.getNameServerAddressList(); assertThat(nameList.get(0)).isEqualTo("default-name-one"); assertThat(nameList.get(1)).isEqualTo("default-name-two"); } @Test public void testPutKVConfig() throws RemotingException, MQClientException, InterruptedException { - String topicConfig = defaultMQAdminExtImpl.getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG, "UnitTest"); + String topicConfig = defaultMQAdminExt.getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG, "UnitTest"); assertThat(topicConfig).isEqualTo("topicListConfig"); - KVTable kvs = defaultMQAdminExtImpl.getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG); + KVTable kvs = defaultMQAdminExt.getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG); assertThat(kvs.getTable().get("broker-name")).isEqualTo("broker-one"); assertThat(kvs.getTable().get("cluster-name")).isEqualTo("default-cluster"); } @Test public void testQueryTopicConsumeByWho() throws InterruptedException, RemotingException, MQClientException, MQBrokerException { - GroupList groupList = defaultMQAdminExtImpl.queryTopicConsumeByWho("UnitTest"); + GroupList groupList = defaultMQAdminExt.queryTopicConsumeByWho("UnitTest"); assertThat(groupList.getGroupList().contains("consumer-group-two")).isTrue(); } @Test public void testQueryConsumeTimeSpan() throws InterruptedException, RemotingException, MQClientException, MQBrokerException { - List result = defaultMQAdminExtImpl.queryConsumeTimeSpan("unit-test", "default-broker-group"); + List result = defaultMQAdminExt.queryConsumeTimeSpan("unit-test", "default-broker-group"); assertThat(result.size()).isEqualTo(0); } @Test public void testCleanExpiredConsumerQueue() throws InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException { - boolean result = defaultMQAdminExtImpl.cleanExpiredConsumerQueue("default-cluster"); + boolean result = defaultMQAdminExt.cleanExpiredConsumerQueue("default-cluster"); assertThat(result).isFalse(); } @Test public void testCleanExpiredConsumerQueueByAddr() throws InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException { - boolean clean = defaultMQAdminExtImpl.cleanExpiredConsumerQueueByAddr("127.0.0.1:10911"); + boolean clean = defaultMQAdminExt.cleanExpiredConsumerQueueByAddr("127.0.0.1:10911"); assertThat(clean).isTrue(); } @Test public void testCleanUnusedTopic() throws InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException { - boolean result = defaultMQAdminExtImpl.cleanUnusedTopic("default-cluster"); + boolean result = defaultMQAdminExt.cleanUnusedTopic("default-cluster"); assertThat(result).isFalse(); } @Test public void testGetConsumerRunningInfo() throws RemotingException, MQClientException, InterruptedException { - ConsumerRunningInfo consumerRunningInfo = defaultMQAdminExtImpl.getConsumerRunningInfo("consumer-group", "cid_123", false); + ConsumerRunningInfo consumerRunningInfo = defaultMQAdminExt.getConsumerRunningInfo("consumer-group", "cid_123", false); assertThat(consumerRunningInfo.getJstack()).isEqualTo("test"); } @@ -363,25 +367,25 @@ public class DefaultMQAdminExtTest { MessageExt messageExt = new MessageExt(); messageExt.setMsgId("msgId"); messageExt.setTopic("unit-test"); - List messageTrackList = defaultMQAdminExtImpl.messageTrackDetail(messageExt); + List messageTrackList = defaultMQAdminExt.messageTrackDetail(messageExt); assertThat(messageTrackList.size()).isEqualTo(2); } @Test public void testGetConsumeStatus() throws InterruptedException, RemotingException, MQClientException, MQBrokerException { - Map> result = defaultMQAdminExtImpl.getConsumeStatus("unit-test", "default-broker-group", "127.0.0.1:10911"); + Map> result = defaultMQAdminExt.getConsumeStatus("unit-test", "default-broker-group", "127.0.0.1:10911"); assertThat(result.size()).isEqualTo(0); } @Test public void testGetTopicClusterList() throws InterruptedException, RemotingException, MQClientException, MQBrokerException { - Set result = defaultMQAdminExtImpl.getTopicClusterList("unit-test"); + Set result = defaultMQAdminExt.getTopicClusterList("unit-test"); assertThat(result.size()).isEqualTo(0); } @Test public void testGetClusterList() throws InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException { - Set clusterlist = defaultMQAdminExtImpl.getClusterList("UnitTest"); + Set clusterlist = defaultMQAdminExt.getClusterList("UnitTest"); assertThat(clusterlist.contains("default-cluster-one")).isTrue(); assertThat(clusterlist.contains("default-cluster-two")).isTrue(); } @@ -391,13 +395,13 @@ public class DefaultMQAdminExtTest { ConsumeStatsList result = new ConsumeStatsList(); result.setBrokerAddr("127.0.0.1:10911"); when(mqClientInstance.getMQClientAPIImpl().fetchConsumeStatsInBroker("127.0.0.1:10911", false, 10000)).thenReturn(result); - ConsumeStatsList consumeStatsList = defaultMQAdminExtImpl.fetchConsumeStatsInBroker("127.0.0.1:10911", false, 10000); + ConsumeStatsList consumeStatsList = defaultMQAdminExt.fetchConsumeStatsInBroker("127.0.0.1:10911", false, 10000); assertThat(consumeStatsList.getBrokerAddr()).isEqualTo("127.0.0.1:10911"); } @Test public void testGetAllSubscriptionGroup() throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException { - SubscriptionGroupWrapper subscriptionGroupWrapper = defaultMQAdminExtImpl.getAllSubscriptionGroup("127.0.0.1:10911", 10000); + SubscriptionGroupWrapper subscriptionGroupWrapper = defaultMQAdminExt.getAllSubscriptionGroup("127.0.0.1:10911", 10000); assertThat(subscriptionGroupWrapper.getSubscriptionGroupTable().get("Consumer-group-one").getBrokerId()).isEqualTo(1234); assertThat(subscriptionGroupWrapper.getSubscriptionGroupTable().get("Consumer-group-one").getGroupName()).isEqualTo("Consumer-group-one"); assertThat(subscriptionGroupWrapper.getSubscriptionGroupTable().get("Consumer-group-one").isConsumeBroadcastEnable()).isTrue(); diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/CommandUtilTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/CommandUtilTest.java index ba5801001896562ca8430cce2354c5fda80785d2..33b449768d105b6d223c84d6b46ae5d22beba916 100644 --- a/tools/src/test/java/org/apache/rocketmq/tools/command/CommandUtilTest.java +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/CommandUtilTest.java @@ -16,6 +16,12 @@ */ package org.apache.rocketmq.tools.command; +import java.lang.reflect.Field; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; @@ -33,9 +39,6 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; -import java.lang.reflect.Field; -import java.util.*; - import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/broker/BrokerConsumeStatsSubCommadTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/BrokerConsumeStatsSubCommadTest.java new file mode 100644 index 0000000000000000000000000000000000000000..35231752c461ca9af1bfbe46eccabbaa600342f2 --- /dev/null +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/BrokerConsumeStatsSubCommadTest.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tools.command.broker; + +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.apache.rocketmq.client.ClientConfig; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.impl.MQClientAPIImpl; +import org.apache.rocketmq.client.impl.MQClientManager; +import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.apache.rocketmq.common.admin.ConsumeStats; +import org.apache.rocketmq.common.protocol.body.ConsumeStatsList; +import org.apache.rocketmq.remoting.exception.RemotingConnectException; +import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; +import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; +import org.apache.rocketmq.srvutil.ServerUtil; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class BrokerConsumeStatsSubCommadTest { + private static DefaultMQAdminExt defaultMQAdminExt; + private static DefaultMQAdminExtImpl defaultMQAdminExtImpl; + private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private static MQClientAPIImpl mQClientAPIImpl; + + @BeforeClass + public static void init() throws NoSuchFieldException, IllegalAccessException, InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException { + mQClientAPIImpl = mock(MQClientAPIImpl.class); + defaultMQAdminExt = new DefaultMQAdminExt(); + defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 1000); + + Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance"); + field.setAccessible(true); + field.set(defaultMQAdminExtImpl, mqClientInstance); + field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl"); + field.setAccessible(true); + field.set(mqClientInstance, mQClientAPIImpl); + field = DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl"); + field.setAccessible(true); + field.set(defaultMQAdminExt, defaultMQAdminExtImpl); + + ConsumeStatsList consumeStatsList = new ConsumeStatsList(); + consumeStatsList.setBrokerAddr("127.0l.0.1:10911"); + consumeStatsList.setConsumeStatsList(new ArrayList>>()); + consumeStatsList.setTotalDiff(123); + when(mQClientAPIImpl.fetchConsumeStatsInBroker(anyString(), anyBoolean(), anyLong())).thenReturn(consumeStatsList); + } + + @AfterClass + public static void terminate() { + } + + @Test + public void testExecute() { + BrokerConsumeStatsSubCommad cmd = new BrokerConsumeStatsSubCommad(); + Options options = ServerUtil.buildCommandlineOptions(new Options()); + String[] subargs = new String[] {"-b 127.0.0.1:10911", "-t 3000", "-l 5", "-o true"}; + final CommandLine commandLine = + ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser()); + cmd.execute(commandLine, options, null); + } + +} \ No newline at end of file diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/broker/BrokerStatusSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/BrokerStatusSubCommandTest.java new file mode 100644 index 0000000000000000000000000000000000000000..1b08735c913c9fb3ab9d0be01f0e6bc0c868585a --- /dev/null +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/BrokerStatusSubCommandTest.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tools.command.broker; + +import java.lang.reflect.Field; +import java.util.HashMap; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.apache.rocketmq.client.ClientConfig; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.impl.MQClientAPIImpl; +import org.apache.rocketmq.client.impl.MQClientManager; +import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.apache.rocketmq.common.protocol.body.KVTable; +import org.apache.rocketmq.remoting.exception.RemotingConnectException; +import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; +import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; +import org.apache.rocketmq.srvutil.ServerUtil; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class BrokerStatusSubCommandTest { + private static DefaultMQAdminExt defaultMQAdminExt; + private static DefaultMQAdminExtImpl defaultMQAdminExtImpl; + private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private static MQClientAPIImpl mQClientAPIImpl; + + @BeforeClass + public static void init() throws NoSuchFieldException, IllegalAccessException, InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException, MQBrokerException { + mQClientAPIImpl = mock(MQClientAPIImpl.class); + defaultMQAdminExt = new DefaultMQAdminExt(); + defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 1000); + + Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance"); + field.setAccessible(true); + field.set(defaultMQAdminExtImpl, mqClientInstance); + field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl"); + field.setAccessible(true); + field.set(mqClientInstance, mQClientAPIImpl); + field = DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl"); + field.setAccessible(true); + field.set(defaultMQAdminExt, defaultMQAdminExtImpl); + + KVTable kvTable = new KVTable(); + kvTable.setTable(new HashMap()); + when(mQClientAPIImpl.getBrokerRuntimeInfo(anyString(), anyLong())).thenReturn(kvTable); + } + + @AfterClass + public static void terminate() { + defaultMQAdminExt.shutdown(); + } + + @Test + public void testExecute() { + BrokerStatusSubCommand cmd = new BrokerStatusSubCommand(); + Options options = ServerUtil.buildCommandlineOptions(new Options()); + String[] subargs = new String[] {"-b 127.0.0.1:10911", "-c default-cluster"}; + final CommandLine commandLine = + ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser()); + cmd.execute(commandLine, options, null); + } + +} \ No newline at end of file diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/broker/CleanExpiredCQSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/CleanExpiredCQSubCommandTest.java new file mode 100644 index 0000000000000000000000000000000000000000..6fcf044d9d4c03f484988edf970789f6531df51c --- /dev/null +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/CleanExpiredCQSubCommandTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tools.command.broker; + +import java.lang.reflect.Field; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.apache.rocketmq.client.ClientConfig; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.impl.MQClientAPIImpl; +import org.apache.rocketmq.client.impl.MQClientManager; +import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.apache.rocketmq.remoting.exception.RemotingConnectException; +import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; +import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; +import org.apache.rocketmq.srvutil.ServerUtil; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class CleanExpiredCQSubCommandTest { + private static DefaultMQAdminExt defaultMQAdminExt; + private static DefaultMQAdminExtImpl defaultMQAdminExtImpl; + private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private static MQClientAPIImpl mQClientAPIImpl; + + @BeforeClass + public static void init() throws NoSuchFieldException, IllegalAccessException, InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException, MQBrokerException { + mQClientAPIImpl = mock(MQClientAPIImpl.class); + defaultMQAdminExt = new DefaultMQAdminExt(); + defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 1000); + + Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance"); + field.setAccessible(true); + field.set(defaultMQAdminExtImpl, mqClientInstance); + field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl"); + field.setAccessible(true); + field.set(mqClientInstance, mQClientAPIImpl); + field = DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl"); + field.setAccessible(true); + field.set(defaultMQAdminExt, defaultMQAdminExtImpl); + + when(mQClientAPIImpl.cleanExpiredConsumeQueue(anyString(), anyLong())).thenReturn(true); + } + + @AfterClass + public static void terminate() { + defaultMQAdminExt.shutdown(); + } + + @Test + public void testExecute() { + CleanExpiredCQSubCommand cmd = new CleanExpiredCQSubCommand(); + Options options = ServerUtil.buildCommandlineOptions(new Options()); + String[] subargs = new String[] {"-b 127.0.0.1:10911", "-c default-cluster"}; + final CommandLine commandLine = + ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser()); + cmd.execute(commandLine, options, null); + } + +} \ No newline at end of file diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/broker/CleanUnusedTopicCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/CleanUnusedTopicCommandTest.java new file mode 100644 index 0000000000000000000000000000000000000000..3ae2c48b79da2b5843f4b25de7e35e3261765db6 --- /dev/null +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/CleanUnusedTopicCommandTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tools.command.broker; + +import java.lang.reflect.Field; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.apache.rocketmq.client.ClientConfig; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.impl.MQClientAPIImpl; +import org.apache.rocketmq.client.impl.MQClientManager; +import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.apache.rocketmq.remoting.exception.RemotingConnectException; +import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; +import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; +import org.apache.rocketmq.srvutil.ServerUtil; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class CleanUnusedTopicCommandTest { + private static DefaultMQAdminExt defaultMQAdminExt; + private static DefaultMQAdminExtImpl defaultMQAdminExtImpl; + private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private static MQClientAPIImpl mQClientAPIImpl; + + @BeforeClass + public static void init() throws NoSuchFieldException, IllegalAccessException, InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException, MQBrokerException { + mQClientAPIImpl = mock(MQClientAPIImpl.class); + defaultMQAdminExt = new DefaultMQAdminExt(); + defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 1000); + + Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance"); + field.setAccessible(true); + field.set(defaultMQAdminExtImpl, mqClientInstance); + field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl"); + field.setAccessible(true); + field.set(mqClientInstance, mQClientAPIImpl); + field = DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl"); + field.setAccessible(true); + field.set(defaultMQAdminExt, defaultMQAdminExtImpl); + + when(mQClientAPIImpl.cleanUnusedTopicByAddr(anyString(), anyLong())).thenReturn(true); + } + + @AfterClass + public static void terminate() { + defaultMQAdminExt.shutdown(); + } + + @Test + public void testExecute() { + CleanUnusedTopicCommand cmd = new CleanUnusedTopicCommand(); + Options options = ServerUtil.buildCommandlineOptions(new Options()); + String[] subargs = new String[] {"-b 127.0.0.1:10911", "-c default-cluster"}; + final CommandLine commandLine = + ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser()); + cmd.execute(commandLine, options, null); + } + +} \ No newline at end of file diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/broker/GetBrokerConfigCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/GetBrokerConfigCommandTest.java new file mode 100644 index 0000000000000000000000000000000000000000..88a8ea8ba0dafb6234b7ef858c003d3364e65605 --- /dev/null +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/GetBrokerConfigCommandTest.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tools.command.broker; + +import java.io.UnsupportedEncodingException; +import java.lang.reflect.Field; +import java.util.Properties; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.apache.rocketmq.client.ClientConfig; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.impl.MQClientAPIImpl; +import org.apache.rocketmq.client.impl.MQClientManager; +import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.apache.rocketmq.remoting.exception.RemotingConnectException; +import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; +import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; +import org.apache.rocketmq.srvutil.ServerUtil; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class GetBrokerConfigCommandTest { + private static DefaultMQAdminExt defaultMQAdminExt; + private static DefaultMQAdminExtImpl defaultMQAdminExtImpl; + private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private static MQClientAPIImpl mQClientAPIImpl; + + @BeforeClass + public static void init() throws NoSuchFieldException, IllegalAccessException, InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException, MQBrokerException, UnsupportedEncodingException { + mQClientAPIImpl = mock(MQClientAPIImpl.class); + defaultMQAdminExt = new DefaultMQAdminExt(); + defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 1000); + + Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance"); + field.setAccessible(true); + field.set(defaultMQAdminExtImpl, mqClientInstance); + field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl"); + field.setAccessible(true); + field.set(mqClientInstance, mQClientAPIImpl); + field = DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl"); + field.setAccessible(true); + field.set(defaultMQAdminExt, defaultMQAdminExtImpl); + + Properties properties = new Properties(); + properties.setProperty("maxMessageSize", "5000000"); + properties.setProperty("flushDelayOffsetInterval", "15000"); + properties.setProperty("serverSocketRcvBufSize", "655350"); + when(mQClientAPIImpl.getBrokerConfig(anyString(), anyLong())).thenReturn(properties); + } + + @AfterClass + public static void terminate() { + defaultMQAdminExt.shutdown(); + } + + @Test + public void testExecute() { + GetBrokerConfigCommand cmd = new GetBrokerConfigCommand(); + Options options = ServerUtil.buildCommandlineOptions(new Options()); + String[] subargs = new String[] {"-b 127.0.0.1:10911", "-c default-cluster"}; + final CommandLine commandLine = + ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser()); + cmd.execute(commandLine, options, null); + } +} \ No newline at end of file diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/broker/SendMsgStatusCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/SendMsgStatusCommandTest.java new file mode 100644 index 0000000000000000000000000000000000000000..9089a39b1d2c56340216ab50bd0a0e66eb5df519 --- /dev/null +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/SendMsgStatusCommandTest.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tools.command.broker; + +import java.lang.reflect.Field; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.apache.rocketmq.client.ClientConfig; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.impl.MQClientAPIImpl; +import org.apache.rocketmq.client.impl.MQClientManager; +import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.apache.rocketmq.remoting.exception.RemotingConnectException; +import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; +import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; +import org.apache.rocketmq.srvutil.ServerUtil; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.mockito.Mockito.mock; + +public class SendMsgStatusCommandTest { + private static DefaultMQAdminExt defaultMQAdminExt; + private static DefaultMQAdminExtImpl defaultMQAdminExtImpl; + private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private static MQClientAPIImpl mQClientAPIImpl; + + @BeforeClass + public static void init() throws NoSuchFieldException, IllegalAccessException, InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException, MQBrokerException { + mQClientAPIImpl = mock(MQClientAPIImpl.class); + defaultMQAdminExt = new DefaultMQAdminExt(); + defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 1000); + + Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance"); + field.setAccessible(true); + field.set(defaultMQAdminExtImpl, mqClientInstance); + field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl"); + field.setAccessible(true); + field.set(mqClientInstance, mQClientAPIImpl); + field = DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl"); + field.setAccessible(true); + field.set(defaultMQAdminExt, defaultMQAdminExtImpl); + } + + @AfterClass + public static void terminate() { + defaultMQAdminExt.shutdown(); + } + + @Test + public void testExecute() { + SendMsgStatusCommand cmd = new SendMsgStatusCommand(); + Options options = ServerUtil.buildCommandlineOptions(new Options()); + String[] subargs = new String[] {"-b 127.0.0.1:10911", "-s 1024 -c 10"}; + final CommandLine commandLine = + ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser()); + //cmd.execute(commandLine, options, null); + } +} \ No newline at end of file diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/broker/UpdateBrokerConfigSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/UpdateBrokerConfigSubCommandTest.java new file mode 100644 index 0000000000000000000000000000000000000000..cc459ba08761c5265758727fef62edcd4e9a5a48 --- /dev/null +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/UpdateBrokerConfigSubCommandTest.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tools.command.broker; + +import java.io.UnsupportedEncodingException; +import java.lang.reflect.Field; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.apache.rocketmq.client.ClientConfig; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.impl.MQClientAPIImpl; +import org.apache.rocketmq.client.impl.MQClientManager; +import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.apache.rocketmq.remoting.exception.RemotingConnectException; +import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; +import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; +import org.apache.rocketmq.srvutil.ServerUtil; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.mockito.Mockito.mock; + +public class UpdateBrokerConfigSubCommandTest { + private static DefaultMQAdminExt defaultMQAdminExt; + private static DefaultMQAdminExtImpl defaultMQAdminExtImpl; + private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private static MQClientAPIImpl mQClientAPIImpl; + + @BeforeClass + public static void init() throws NoSuchFieldException, IllegalAccessException, InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException, MQBrokerException, UnsupportedEncodingException { + mQClientAPIImpl = mock(MQClientAPIImpl.class); + defaultMQAdminExt = new DefaultMQAdminExt(); + defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 1000); + + Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance"); + field.setAccessible(true); + field.set(defaultMQAdminExtImpl, mqClientInstance); + field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl"); + field.setAccessible(true); + field.set(mqClientInstance, mQClientAPIImpl); + field = DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl"); + field.setAccessible(true); + field.set(defaultMQAdminExt, defaultMQAdminExtImpl); + } + + @AfterClass + public static void terminate() { + defaultMQAdminExt.shutdown(); + } + + @Test + public void testExecute() { + UpdateBrokerConfigSubCommand cmd = new UpdateBrokerConfigSubCommand(); + Options options = ServerUtil.buildCommandlineOptions(new Options()); + String[] subargs = new String[] {"-b 127.0.0.1:10911", "-c default-cluster", "-k topicname", "-v unit_test"}; + final CommandLine commandLine = + ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser()); + cmd.execute(commandLine, options, null); + } +} \ No newline at end of file diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/connection/ConsumerConnectionSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/connection/ConsumerConnectionSubCommandTest.java new file mode 100644 index 0000000000000000000000000000000000000000..88530e22a5c7d825ba3e385ef0bea3504387f5ef --- /dev/null +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/connection/ConsumerConnectionSubCommandTest.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tools.command.connection; + +import java.lang.reflect.Field; +import java.util.HashSet; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.apache.rocketmq.client.ClientConfig; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.impl.MQClientAPIImpl; +import org.apache.rocketmq.client.impl.MQClientManager; +import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.apache.rocketmq.common.consumer.ConsumeFromWhere; +import org.apache.rocketmq.common.protocol.body.Connection; +import org.apache.rocketmq.common.protocol.body.ConsumerConnection; +import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; +import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; +import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; +import org.apache.rocketmq.remoting.exception.RemotingConnectException; +import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; +import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; +import org.apache.rocketmq.srvutil.ServerUtil; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class ConsumerConnectionSubCommandTest { + private static DefaultMQAdminExt defaultMQAdminExt; + private static DefaultMQAdminExtImpl defaultMQAdminExtImpl; + private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private static MQClientAPIImpl mQClientAPIImpl; + + @BeforeClass + public static void init() throws NoSuchFieldException, IllegalAccessException, InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException, MQBrokerException { + mQClientAPIImpl = mock(MQClientAPIImpl.class); + defaultMQAdminExt = new DefaultMQAdminExt(); + defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 1000); + + Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance"); + field.setAccessible(true); + field.set(defaultMQAdminExtImpl, mqClientInstance); + field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl"); + field.setAccessible(true); + field.set(mqClientInstance, mQClientAPIImpl); + field = DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl"); + field.setAccessible(true); + field.set(defaultMQAdminExt, defaultMQAdminExtImpl); + + ConsumerConnection consumerConnection = new ConsumerConnection(); + consumerConnection.setConsumeType(ConsumeType.CONSUME_PASSIVELY); + consumerConnection.setMessageModel(MessageModel.CLUSTERING); + HashSet connections = new HashSet<>(); + connections.add(new Connection()); + consumerConnection.setConnectionSet(connections); + consumerConnection.setSubscriptionTable(new ConcurrentHashMap()); + consumerConnection.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); + when(mQClientAPIImpl.getConsumerConnectionList(anyString(), anyString(), anyLong())).thenReturn(consumerConnection); + } + + @AfterClass + public static void terminate() { + defaultMQAdminExt.shutdown(); + } + + @Test + public void testExecute() { + ConsumerConnectionSubCommand cmd = new ConsumerConnectionSubCommand(); + Options options = ServerUtil.buildCommandlineOptions(new Options()); + String[] subargs = new String[] {"-g default-consumer-group"}; + final CommandLine commandLine = + ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser()); + cmd.execute(commandLine, options, null); + } +} \ No newline at end of file diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/connection/ProducerConnectionSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/connection/ProducerConnectionSubCommandTest.java new file mode 100644 index 0000000000000000000000000000000000000000..8df66fb1d081c536906e443a41171061cf7ce31e --- /dev/null +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/connection/ProducerConnectionSubCommandTest.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tools.command.connection; + +import java.lang.reflect.Field; +import java.util.HashSet; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.apache.rocketmq.client.ClientConfig; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.impl.MQClientAPIImpl; +import org.apache.rocketmq.client.impl.MQClientManager; +import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.apache.rocketmq.common.protocol.body.Connection; +import org.apache.rocketmq.common.protocol.body.ProducerConnection; +import org.apache.rocketmq.remoting.exception.RemotingConnectException; +import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; +import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; +import org.apache.rocketmq.srvutil.ServerUtil; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class ProducerConnectionSubCommandTest { + private static DefaultMQAdminExt defaultMQAdminExt; + private static DefaultMQAdminExtImpl defaultMQAdminExtImpl; + private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private static MQClientAPIImpl mQClientAPIImpl; + + @BeforeClass + public static void init() throws NoSuchFieldException, IllegalAccessException, InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException, MQBrokerException { + mQClientAPIImpl = mock(MQClientAPIImpl.class); + defaultMQAdminExt = new DefaultMQAdminExt(); + defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 1000); + + Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance"); + field.setAccessible(true); + field.set(defaultMQAdminExtImpl, mqClientInstance); + field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl"); + field.setAccessible(true); + field.set(mqClientInstance, mQClientAPIImpl); + field = DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl"); + field.setAccessible(true); + field.set(defaultMQAdminExt, defaultMQAdminExtImpl); + + ProducerConnection producerConnection = new ProducerConnection(); + Connection connection = new Connection(); + connection.setClientAddr("127.0.0.1:9898"); + connection.setClientId("PID_12345"); + HashSet connectionSet = new HashSet<>(); + connectionSet.add(connection); + producerConnection.setConnectionSet(connectionSet); + when(mQClientAPIImpl.getProducerConnectionList(anyString(), anyString(), anyLong())).thenReturn(producerConnection); + } + + @AfterClass + public static void terminate() { + defaultMQAdminExt.shutdown(); + } + + @Test + public void testExecute() { + ProducerConnectionSubCommand cmd = new ProducerConnectionSubCommand(); + Options options = ServerUtil.buildCommandlineOptions(new Options()); + String[] subargs = new String[] {"-g default-producer-group", "-t unit-test"}; + final CommandLine commandLine = + ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser()); + cmd.execute(commandLine, options, null); + } +} \ No newline at end of file diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommandTest.java new file mode 100644 index 0000000000000000000000000000000000000000..a5af04a6ca5b7b5074bdbdb7d8d93ed08de740f6 --- /dev/null +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommandTest.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tools.command.consumer; + +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.apache.rocketmq.client.ClientConfig; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.impl.MQClientAPIImpl; +import org.apache.rocketmq.client.impl.MQClientManager; +import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.apache.rocketmq.common.admin.ConsumeStats; +import org.apache.rocketmq.common.admin.OffsetWrapper; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.protocol.route.BrokerData; +import org.apache.rocketmq.common.protocol.route.QueueData; +import org.apache.rocketmq.common.protocol.route.TopicRouteData; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.apache.rocketmq.srvutil.ServerUtil; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class ConsumerProgressSubCommandTest { + private static DefaultMQAdminExt defaultMQAdminExt; + private static DefaultMQAdminExtImpl defaultMQAdminExtImpl; + private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private static MQClientAPIImpl mQClientAPIImpl; + + @BeforeClass + public static void init() throws NoSuchFieldException, IllegalAccessException, InterruptedException, RemotingException, MQClientException, MQBrokerException { + mQClientAPIImpl = mock(MQClientAPIImpl.class); + defaultMQAdminExt = new DefaultMQAdminExt(); + defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 1000); + + Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance"); + field.setAccessible(true); + field.set(defaultMQAdminExtImpl, mqClientInstance); + field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl"); + field.setAccessible(true); + field.set(mqClientInstance, mQClientAPIImpl); + field = DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl"); + field.setAccessible(true); + field.set(defaultMQAdminExt, defaultMQAdminExtImpl); + + TopicRouteData topicRouteData = new TopicRouteData(); + List brokerDatas = new ArrayList<>(); + HashMap brokerAddrs = new HashMap<>(); + brokerAddrs.put(1234l, "127.0.0.1:10911"); + BrokerData brokerData = new BrokerData(); + brokerData.setCluster("default-cluster"); + brokerData.setBrokerName("default-broker"); + brokerData.setBrokerAddrs(brokerAddrs); + brokerDatas.add(brokerData); + topicRouteData.setBrokerDatas(brokerDatas); + topicRouteData.setQueueDatas(new ArrayList()); + topicRouteData.setFilterServerTable(new HashMap>()); + when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(topicRouteData); + + ConsumeStats consumeStats = new ConsumeStats(); + consumeStats.setConsumeTps(1234); + MessageQueue messageQueue = new MessageQueue(); + OffsetWrapper offsetWrapper = new OffsetWrapper(); + HashMap stats = new HashMap<>(); + stats.put(messageQueue, offsetWrapper); + consumeStats.setOffsetTable(stats); + when(mQClientAPIImpl.getConsumeStats(anyString(), anyString(), anyString(), anyLong())).thenReturn(consumeStats); + } + + @AfterClass + public static void terminate() { + defaultMQAdminExt.shutdown(); + } + + @Test + public void testExecute() { + ConsumerProgressSubCommand cmd = new ConsumerProgressSubCommand(); + Options options = ServerUtil.buildCommandlineOptions(new Options()); + String[] subargs = new String[] {"-g default-group"}; + final CommandLine commandLine = + ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser()); + cmd.execute(commandLine, options, null); + } +} \ No newline at end of file diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/consumer/ConsumerStatusSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/consumer/ConsumerStatusSubCommandTest.java new file mode 100644 index 0000000000000000000000000000000000000000..8e846bcf243b7580085e612ec202733e45ca855f --- /dev/null +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/consumer/ConsumerStatusSubCommandTest.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tools.command.consumer; + +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.apache.rocketmq.client.ClientConfig; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.impl.MQClientAPIImpl; +import org.apache.rocketmq.client.impl.MQClientManager; +import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.apache.rocketmq.common.consumer.ConsumeFromWhere; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.protocol.body.Connection; +import org.apache.rocketmq.common.protocol.body.ConsumeStatus; +import org.apache.rocketmq.common.protocol.body.ConsumerConnection; +import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo; +import org.apache.rocketmq.common.protocol.body.ProcessQueueInfo; +import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; +import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; +import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; +import org.apache.rocketmq.common.protocol.route.BrokerData; +import org.apache.rocketmq.common.protocol.route.QueueData; +import org.apache.rocketmq.common.protocol.route.TopicRouteData; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.apache.rocketmq.srvutil.ServerUtil; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class ConsumerStatusSubCommandTest { + private static DefaultMQAdminExt defaultMQAdminExt; + private static DefaultMQAdminExtImpl defaultMQAdminExtImpl; + private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private static MQClientAPIImpl mQClientAPIImpl; + + @BeforeClass + public static void init() throws NoSuchFieldException, IllegalAccessException, InterruptedException, RemotingException, MQClientException, MQBrokerException { + mQClientAPIImpl = mock(MQClientAPIImpl.class); + defaultMQAdminExt = new DefaultMQAdminExt(); + defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 1000); + + Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance"); + field.setAccessible(true); + field.set(defaultMQAdminExtImpl, mqClientInstance); + field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl"); + field.setAccessible(true); + field.set(mqClientInstance, mQClientAPIImpl); + field = DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl"); + field.setAccessible(true); + field.set(defaultMQAdminExt, defaultMQAdminExtImpl); + + TopicRouteData topicRouteData = new TopicRouteData(); + List brokerDatas = new ArrayList<>(); + HashMap brokerAddrs = new HashMap<>(); + brokerAddrs.put(1234l, "127.0.0.1:10911"); + BrokerData brokerData = new BrokerData(); + brokerData.setCluster("default-cluster"); + brokerData.setBrokerName("default-broker"); + brokerData.setBrokerAddrs(brokerAddrs); + brokerDatas.add(brokerData); + topicRouteData.setBrokerDatas(brokerDatas); + topicRouteData.setQueueDatas(new ArrayList()); + topicRouteData.setFilterServerTable(new HashMap>()); + when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(topicRouteData); + + ConsumerConnection consumerConnection = new ConsumerConnection(); + consumerConnection.setConsumeType(ConsumeType.CONSUME_PASSIVELY); + consumerConnection.setMessageModel(MessageModel.CLUSTERING); + HashSet connections = new HashSet<>(); + connections.add(new Connection()); + consumerConnection.setConnectionSet(connections); + consumerConnection.setSubscriptionTable(new ConcurrentHashMap()); + consumerConnection.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); + when(mQClientAPIImpl.getConsumerConnectionList(anyString(), anyString(), anyLong())).thenReturn(consumerConnection); + + ConsumerRunningInfo consumerRunningInfo = new ConsumerRunningInfo(); + consumerRunningInfo.setJstack("test"); + consumerRunningInfo.setMqTable(new TreeMap()); + consumerRunningInfo.setStatusTable(new TreeMap()); + consumerRunningInfo.setSubscriptionSet(new TreeSet()); + when(mQClientAPIImpl.getConsumerRunningInfo(anyString(), anyString(), anyString(), anyBoolean(), anyLong())).thenReturn(consumerRunningInfo); + } + + @AfterClass + public static void terminate() { + defaultMQAdminExt.shutdown(); + } + + @Test + public void testExecute() { + ConsumerStatusSubCommand cmd = new ConsumerStatusSubCommand(); + Options options = ServerUtil.buildCommandlineOptions(new Options()); + String[] subargs = new String[] {"-g default-group", "-i cid_one"}; + final CommandLine commandLine = + ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser()); + cmd.execute(commandLine, options, null); + } +} \ No newline at end of file diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/namesrv/GetNamesrvConfigCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/namesrv/GetNamesrvConfigCommandTest.java new file mode 100644 index 0000000000000000000000000000000000000000..49802b91da91e236bcff44ad3a84a96f3695d5dc --- /dev/null +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/namesrv/GetNamesrvConfigCommandTest.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tools.command.namesrv; + +import java.io.UnsupportedEncodingException; +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.apache.rocketmq.client.ClientConfig; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.impl.MQClientAPIImpl; +import org.apache.rocketmq.client.impl.MQClientManager; +import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.apache.rocketmq.remoting.exception.RemotingConnectException; +import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; +import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; +import org.apache.rocketmq.srvutil.ServerUtil; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.ArgumentMatchers; + +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class GetNamesrvConfigCommandTest { + private static DefaultMQAdminExt defaultMQAdminExt; + private static DefaultMQAdminExtImpl defaultMQAdminExtImpl; + private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private static MQClientAPIImpl mQClientAPIImpl; + + @BeforeClass + public static void init() throws NoSuchFieldException, IllegalAccessException, InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException, MQBrokerException, UnsupportedEncodingException { + mQClientAPIImpl = mock(MQClientAPIImpl.class); + defaultMQAdminExt = new DefaultMQAdminExt(); + defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 1000); + + Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance"); + field.setAccessible(true); + field.set(defaultMQAdminExtImpl, mqClientInstance); + field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl"); + field.setAccessible(true); + field.set(mqClientInstance, mQClientAPIImpl); + field = DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl"); + field.setAccessible(true); + field.set(defaultMQAdminExt, defaultMQAdminExtImpl); + + Map propertiesMap = new HashMap<>(); + List nameServers = new ArrayList<>(); + when(mQClientAPIImpl.getNameServerConfig(ArgumentMatchers.anyList(), anyLong())).thenReturn(propertiesMap); + } + + @AfterClass + public static void terminate() { + defaultMQAdminExt.shutdown(); + } + + @Test + public void testExecute() { + GetNamesrvConfigCommand cmd = new GetNamesrvConfigCommand(); + Options options = ServerUtil.buildCommandlineOptions(new Options()); + String[] subargs = new String[] {}; + final CommandLine commandLine = + ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser()); + cmd.execute(commandLine, options, null); + } +} \ No newline at end of file diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/namesrv/WipeWritePermSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/namesrv/WipeWritePermSubCommandTest.java new file mode 100644 index 0000000000000000000000000000000000000000..5d2781ad98abcf2e03ecd204776c7239e408288e --- /dev/null +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/namesrv/WipeWritePermSubCommandTest.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tools.command.namesrv; + +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.List; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.apache.rocketmq.client.ClientConfig; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.impl.MQClientAPIImpl; +import org.apache.rocketmq.client.impl.MQClientManager; +import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.exception.RemotingConnectException; +import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; +import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; +import org.apache.rocketmq.srvutil.ServerUtil; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class WipeWritePermSubCommandTest { + private static DefaultMQAdminExt defaultMQAdminExt; + private static DefaultMQAdminExtImpl defaultMQAdminExtImpl; + private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private static MQClientAPIImpl mQClientAPIImpl; + + @BeforeClass + public static void init() throws NoSuchFieldException, IllegalAccessException, InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException, MQBrokerException, RemotingCommandException { + mQClientAPIImpl = mock(MQClientAPIImpl.class); + defaultMQAdminExt = new DefaultMQAdminExt(); + defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 1000); + + Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance"); + field.setAccessible(true); + field.set(defaultMQAdminExtImpl, mqClientInstance); + field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl"); + field.setAccessible(true); + field.set(mqClientInstance, mQClientAPIImpl); + field = DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl"); + field.setAccessible(true); + field.set(defaultMQAdminExt, defaultMQAdminExtImpl); + + List result = new ArrayList<>(); + result.add("default-name-one"); + result.add("default-name-two"); + when(mqClientInstance.getMQClientAPIImpl().getNameServerAddressList()).thenReturn(result); + when(mQClientAPIImpl.wipeWritePermOfBroker(anyString(), anyString(), anyLong())).thenReturn(6); + } + + @AfterClass + public static void terminate() { + defaultMQAdminExt.shutdown(); + } + + @Test + public void testExecute() { + WipeWritePermSubCommand cmd = new WipeWritePermSubCommand(); + Options options = ServerUtil.buildCommandlineOptions(new Options()); + String[] subargs = new String[] {"-b default-broker"}; + final CommandLine commandLine = + ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser()); + cmd.execute(commandLine, options, null); + } +} \ No newline at end of file diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/offset/GetConsumerStatusCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/offset/GetConsumerStatusCommandTest.java new file mode 100644 index 0000000000000000000000000000000000000000..07dda80ff7c8f2b3ff7996d488c66e78759cf80a --- /dev/null +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/offset/GetConsumerStatusCommandTest.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tools.command.offset; + +import java.lang.reflect.Field; +import java.util.HashMap; +import java.util.Map; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.apache.rocketmq.client.ClientConfig; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.impl.MQClientAPIImpl; +import org.apache.rocketmq.client.impl.MQClientManager; +import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.apache.rocketmq.srvutil.ServerUtil; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class GetConsumerStatusCommandTest { + private static DefaultMQAdminExt defaultMQAdminExt; + private static DefaultMQAdminExtImpl defaultMQAdminExtImpl; + private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private static MQClientAPIImpl mQClientAPIImpl; + + @BeforeClass + public static void init() throws NoSuchFieldException, IllegalAccessException, InterruptedException, RemotingException, MQClientException, MQBrokerException { + mQClientAPIImpl = mock(MQClientAPIImpl.class); + defaultMQAdminExt = new DefaultMQAdminExt(); + defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 1000); + + Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance"); + field.setAccessible(true); + field.set(defaultMQAdminExtImpl, mqClientInstance); + field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl"); + field.setAccessible(true); + field.set(mqClientInstance, mQClientAPIImpl); + field = DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl"); + field.setAccessible(true); + field.set(defaultMQAdminExt, defaultMQAdminExtImpl); + + Map> consumerStatus = new HashMap<>(); + when(mQClientAPIImpl.invokeBrokerToGetConsumerStatus(anyString(), anyString(), anyString(), anyString(), anyLong())).thenReturn(consumerStatus); + } + + @AfterClass + public static void terminate() { + defaultMQAdminExt.shutdown(); + } + + @Test + public void testExecute() { + GetConsumerStatusCommand cmd = new GetConsumerStatusCommand(); + Options options = ServerUtil.buildCommandlineOptions(new Options()); + String[] subargs = new String[] {"-g default-group", "-t unit-test", "-i clientid"}; + final CommandLine commandLine = + ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser()); + cmd.execute(commandLine, options, null); + } +} \ No newline at end of file diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeCommandTest.java new file mode 100644 index 0000000000000000000000000000000000000000..de32660f235eb46eac365559ad8b11d28a1a7d2b --- /dev/null +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeCommandTest.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tools.command.offset; + +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.apache.rocketmq.client.ClientConfig; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.impl.MQClientAPIImpl; +import org.apache.rocketmq.client.impl.MQClientManager; +import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.protocol.route.BrokerData; +import org.apache.rocketmq.common.protocol.route.QueueData; +import org.apache.rocketmq.common.protocol.route.TopicRouteData; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.apache.rocketmq.srvutil.ServerUtil; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class ResetOffsetByTimeCommandTest { + private static DefaultMQAdminExt defaultMQAdminExt; + private static DefaultMQAdminExtImpl defaultMQAdminExtImpl; + private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private static MQClientAPIImpl mQClientAPIImpl; + + @BeforeClass + public static void init() throws NoSuchFieldException, IllegalAccessException, InterruptedException, RemotingException, MQClientException, MQBrokerException { + mQClientAPIImpl = mock(MQClientAPIImpl.class); + defaultMQAdminExt = new DefaultMQAdminExt(); + defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 1000); + + Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance"); + field.setAccessible(true); + field.set(defaultMQAdminExtImpl, mqClientInstance); + field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl"); + field.setAccessible(true); + field.set(mqClientInstance, mQClientAPIImpl); + field = DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl"); + field.setAccessible(true); + field.set(defaultMQAdminExt, defaultMQAdminExtImpl); + + TopicRouteData topicRouteData = new TopicRouteData(); + List brokerDatas = new ArrayList<>(); + HashMap brokerAddrs = new HashMap<>(); + brokerAddrs.put(1234l, "127.0.0.1:10911"); + BrokerData brokerData = new BrokerData(); + brokerData.setCluster("default-cluster"); + brokerData.setBrokerName("default-broker"); + brokerData.setBrokerAddrs(brokerAddrs); + brokerDatas.add(brokerData); + topicRouteData.setBrokerDatas(brokerDatas); + topicRouteData.setQueueDatas(new ArrayList()); + topicRouteData.setFilterServerTable(new HashMap>()); + when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(topicRouteData); + + Map messageQueueLongMap = new HashMap<>(); + when(mQClientAPIImpl.invokeBrokerToResetOffset(anyString(), anyString(), anyString(), anyLong(), anyBoolean(), anyLong())).thenReturn(messageQueueLongMap); + } + + @AfterClass + public static void terminate() { + defaultMQAdminExt.shutdown(); + } + + @Test + public void testExecute() { + ResetOffsetByTimeCommand cmd = new ResetOffsetByTimeCommand(); + Options options = ServerUtil.buildCommandlineOptions(new Options()); + String[] subargs = new String[] {"-g default-group", "-t unit-test", "-s 1412131213231", "-f false"}; + final CommandLine commandLine = + ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser()); + cmd.execute(commandLine, options, null); + } +} \ No newline at end of file diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeOldCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeOldCommandTest.java new file mode 100644 index 0000000000000000000000000000000000000000..c172c7edb82bd4108c0a571349c2db47586445dc --- /dev/null +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeOldCommandTest.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tools.command.offset; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.apache.rocketmq.srvutil.ServerUtil; +import org.junit.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +public class ResetOffsetByTimeOldCommandTest { + @Test + public void testExecute() { + ResetOffsetByTimeOldCommand cmd = new ResetOffsetByTimeOldCommand(); + Options options = ServerUtil.buildCommandlineOptions(new Options()); + String[] subargs = new String[] {"-g default-group", "-t unit-test", "-s 1412131213231", "-f false"}; + final CommandLine commandLine = + ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser()); + assertThat(commandLine.getOptionValue('g').trim()).isEqualTo("default-group"); + assertThat(commandLine.getOptionValue('t').trim()).isEqualTo("unit-test"); + assertThat(commandLine.getOptionValue('s').trim()).isEqualTo("1412131213231"); + } +} \ No newline at end of file diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/topic/AllocateMQSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/topic/AllocateMQSubCommandTest.java new file mode 100644 index 0000000000000000000000000000000000000000..f3091419cd664378ed86d9f737e8ab844dcf7ef8 --- /dev/null +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/topic/AllocateMQSubCommandTest.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tools.command.topic; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.apache.rocketmq.srvutil.ServerUtil; +import org.junit.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +public class AllocateMQSubCommandTest { + @Test + public void testExecute() { + AllocateMQSubCommand cmd = new AllocateMQSubCommand(); + Options options = ServerUtil.buildCommandlineOptions(new Options()); + String[] subargs = new String[] {"-t unit-test", "-i 127.0.0.1:10911"}; + final CommandLine commandLine = + ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser()); + assertThat(commandLine.getOptionValue('t').trim()).isEqualTo("unit-test"); + assertThat(commandLine.getOptionValue("i").trim()).isEqualTo("127.0.0.1:10911"); + } +} \ No newline at end of file diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/topic/DeleteTopicSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/topic/DeleteTopicSubCommandTest.java new file mode 100644 index 0000000000000000000000000000000000000000..4539c0a09ffc64175ba12adfdddee1ec8c16dd49 --- /dev/null +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/topic/DeleteTopicSubCommandTest.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tools.command.topic; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.apache.rocketmq.srvutil.ServerUtil; +import org.junit.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +public class DeleteTopicSubCommandTest { + @Test + public void testExecute() { + DeleteTopicSubCommand cmd = new DeleteTopicSubCommand(); + Options options = ServerUtil.buildCommandlineOptions(new Options()); + String[] subargs = new String[] {"-t unit-test", "-c default-cluster"}; + final CommandLine commandLine = + ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser()); + assertThat(commandLine.getOptionValue('t').trim()).isEqualTo("unit-test"); + assertThat(commandLine.getOptionValue("c").trim()).isEqualTo("default-cluster"); + } +} \ No newline at end of file diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/topic/TopicClusterSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/topic/TopicClusterSubCommandTest.java new file mode 100644 index 0000000000000000000000000000000000000000..4a1bd5fd8da19521dac3c642c4ecceb4d39944b3 --- /dev/null +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/topic/TopicClusterSubCommandTest.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tools.command.topic; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.apache.rocketmq.srvutil.ServerUtil; +import org.junit.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +public class TopicClusterSubCommandTest { + @Test + public void testExecute() { + TopicClusterSubCommand cmd = new TopicClusterSubCommand(); + Options options = ServerUtil.buildCommandlineOptions(new Options()); + String[] subargs = new String[] {"-t unit-test"}; + final CommandLine commandLine = + ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser()); + assertThat(commandLine.getOptionValue('t').trim()).isEqualTo("unit-test"); + } +} \ No newline at end of file diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/topic/TopicRouteSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/topic/TopicRouteSubCommandTest.java new file mode 100644 index 0000000000000000000000000000000000000000..8c02ddaa90060d058dfc98d3f46dbcabe87ec5cb --- /dev/null +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/topic/TopicRouteSubCommandTest.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tools.command.topic; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.apache.rocketmq.srvutil.ServerUtil; +import org.junit.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +public class TopicRouteSubCommandTest { + @Test + public void testExecute() { + TopicRouteSubCommand cmd = new TopicRouteSubCommand(); + Options options = ServerUtil.buildCommandlineOptions(new Options()); + String[] subargs = new String[] {"-t unit-test"}; + final CommandLine commandLine = + ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser()); + assertThat(commandLine.getOptionValue('t').trim()).isEqualTo("unit-test"); + } +} \ No newline at end of file diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/topic/TopicStatusSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/topic/TopicStatusSubCommandTest.java new file mode 100644 index 0000000000000000000000000000000000000000..3b389a674ad0fb4fca83a7bcbf79c89ff0131ea6 --- /dev/null +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/topic/TopicStatusSubCommandTest.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tools.command.topic; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.apache.rocketmq.srvutil.ServerUtil; +import org.junit.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +public class TopicStatusSubCommandTest { + @Test + public void testExecute() { + TopicStatusSubCommand cmd = new TopicStatusSubCommand(); + Options options = ServerUtil.buildCommandlineOptions(new Options()); + String[] subargs = new String[] {"-t unit-test"}; + final CommandLine commandLine = + ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser()); + assertThat(commandLine.getOptionValue('t').trim()).isEqualTo("unit-test"); + } +} \ No newline at end of file diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/topic/UpdateOrderConfCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/topic/UpdateOrderConfCommandTest.java new file mode 100644 index 0000000000000000000000000000000000000000..632e9b62e2276a9988dd213527951e8c69e75e52 --- /dev/null +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/topic/UpdateOrderConfCommandTest.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tools.command.topic; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.apache.rocketmq.srvutil.ServerUtil; +import org.junit.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +public class UpdateOrderConfCommandTest { + @Test + public void testExecute() { + UpdateOrderConfCommand cmd = new UpdateOrderConfCommand(); + Options options = ServerUtil.buildCommandlineOptions(new Options()); + String[] subargs = new String[] {"-t unit-test", "-v default-broker:8", "-m post"}; + final CommandLine commandLine = + ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser()); + assertThat(commandLine.getOptionValue('t').trim()).isEqualTo("unit-test"); + assertThat(commandLine.getOptionValue('v').trim()).isEqualTo("default-broker:8"); + assertThat(commandLine.getOptionValue('m').trim()).isEqualTo("post"); + } +} \ No newline at end of file diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/topic/UpdateTopicPermSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/topic/UpdateTopicPermSubCommandTest.java new file mode 100644 index 0000000000000000000000000000000000000000..f147a5523d17588f817290f6a07cba809ef6d6ae --- /dev/null +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/topic/UpdateTopicPermSubCommandTest.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tools.command.topic; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.apache.rocketmq.srvutil.ServerUtil; +import org.junit.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +public class UpdateTopicPermSubCommandTest { + @Test + public void testExecute() { + UpdateTopicPermSubCommand cmd = new UpdateTopicPermSubCommand(); + Options options = ServerUtil.buildCommandlineOptions(new Options()); + String[] subargs = new String[] {"-b 127.0.0.1:10911", "-c default-cluster", "-t unit-test", "-p 6"}; + final CommandLine commandLine = + ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser()); + assertThat(commandLine.getOptionValue('b').trim()).isEqualTo("127.0.0.1:10911"); + assertThat(commandLine.getOptionValue('c').trim()).isEqualTo("default-cluster"); + assertThat(commandLine.getOptionValue('t').trim()).isEqualTo("unit-test"); + assertThat(commandLine.getOptionValue('p').trim()).isEqualTo("6"); + + } +} \ No newline at end of file diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommandTest.java new file mode 100644 index 0000000000000000000000000000000000000000..5ea03d6584bbb1d57b716d4528ebb2b8862db844 --- /dev/null +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommandTest.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tools.command.topic; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.apache.rocketmq.srvutil.ServerUtil; +import org.junit.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +public class UpdateTopicSubCommandTest { + @Test + public void testExecute() { + UpdateTopicSubCommand cmd = new UpdateTopicSubCommand(); + Options options = ServerUtil.buildCommandlineOptions(new Options()); + String[] subargs = new String[] { + "-b 127.0.0.1:10911", + "-c default-cluster", + "-t unit-test", + "-r 8", + "-w 8", + "-p 6", + "-o false", + "-u false", + "-s false"}; + final CommandLine commandLine = + ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser()); + assertThat(commandLine.getOptionValue('b').trim()).isEqualTo("127.0.0.1:10911"); + assertThat(commandLine.getOptionValue('c').trim()).isEqualTo("default-cluster"); + assertThat(commandLine.getOptionValue('r').trim()).isEqualTo("8"); + assertThat(commandLine.getOptionValue('w').trim()).isEqualTo("8"); + assertThat(commandLine.getOptionValue('t').trim()).isEqualTo("unit-test"); + assertThat(commandLine.getOptionValue('p').trim()).isEqualTo("6"); + assertThat(commandLine.getOptionValue('o').trim()).isEqualTo("false"); + assertThat(commandLine.getOptionValue('u').trim()).isEqualTo("false"); + assertThat(commandLine.getOptionValue('s').trim()).isEqualTo("false"); + } +} \ No newline at end of file diff --git a/tools/src/test/java/org/apache/rocketmq/tools/monitor/DefaultMonitorListenerTest.java b/tools/src/test/java/org/apache/rocketmq/tools/monitor/DefaultMonitorListenerTest.java new file mode 100644 index 0000000000000000000000000000000000000000..f6f879481325ef7814a86c8c03713be7f16beec7 --- /dev/null +++ b/tools/src/test/java/org/apache/rocketmq/tools/monitor/DefaultMonitorListenerTest.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tools.monitor; + +import java.util.Properties; +import java.util.TreeMap; +import java.util.TreeSet; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.protocol.body.ConsumeStatus; +import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo; +import org.apache.rocketmq.common.protocol.body.ProcessQueueInfo; +import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; +import org.apache.rocketmq.common.protocol.topic.OffsetMovedEvent; +import org.junit.Before; +import org.junit.Test; + +import static org.mockito.Mockito.mock; + +public class DefaultMonitorListenerTest { + private DefaultMonitorListener defaultMonitorListener; + + @Before + public void init() { + defaultMonitorListener = mock(DefaultMonitorListener.class); + } + + @Test + public void testBeginRound() { + defaultMonitorListener.beginRound(); + } + + @Test + public void testReportUndoneMsgs() { + UndoneMsgs undoneMsgs = new UndoneMsgs(); + undoneMsgs.setConsumerGroup("default-group"); + undoneMsgs.setTopic("unit-test"); + undoneMsgs.setUndoneMsgsDelayTimeMills(30000); + undoneMsgs.setUndoneMsgsSingleMQ(1); + undoneMsgs.setUndoneMsgsTotal(100); + defaultMonitorListener.reportUndoneMsgs(undoneMsgs); + } + + @Test + public void testReportFailedMsgs() { + FailedMsgs failedMsgs = new FailedMsgs(); + failedMsgs.setTopic("unit-test"); + failedMsgs.setConsumerGroup("default-consumer"); + failedMsgs.setFailedMsgsTotalRecently(2); + defaultMonitorListener.reportFailedMsgs(failedMsgs); + } + + @Test + public void testReportDeleteMsgsEvent() { + DeleteMsgsEvent deleteMsgsEvent = new DeleteMsgsEvent(); + deleteMsgsEvent.setEventTimestamp(System.currentTimeMillis()); + deleteMsgsEvent.setOffsetMovedEvent(new OffsetMovedEvent()); + defaultMonitorListener.reportDeleteMsgsEvent(deleteMsgsEvent); + } + + @Test + public void testReportConsumerRunningInfo() { + TreeMap criTable = new TreeMap<>(); + ConsumerRunningInfo consumerRunningInfo = new ConsumerRunningInfo(); + consumerRunningInfo.setSubscriptionSet(new TreeSet()); + consumerRunningInfo.setStatusTable(new TreeMap()); + consumerRunningInfo.setSubscriptionSet(new TreeSet()); + consumerRunningInfo.setMqTable(new TreeMap()); + consumerRunningInfo.setProperties(new Properties()); + criTable.put("test", consumerRunningInfo); + defaultMonitorListener.reportConsumerRunningInfo(criTable); + } +} \ No newline at end of file diff --git a/tools/src/test/java/org/apache/rocketmq/tools/monitor/MonitorServiceTest.java b/tools/src/test/java/org/apache/rocketmq/tools/monitor/MonitorServiceTest.java new file mode 100644 index 0000000000000000000000000000000000000000..4989a9b55ab37920a97415fbae24de81a7334300 --- /dev/null +++ b/tools/src/test/java/org/apache/rocketmq/tools/monitor/MonitorServiceTest.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tools.monitor; + +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.rocketmq.client.ClientConfig; +import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.impl.MQClientAPIImpl; +import org.apache.rocketmq.client.impl.MQClientManager; +import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.apache.rocketmq.common.MQVersion; +import org.apache.rocketmq.common.admin.ConsumeStats; +import org.apache.rocketmq.common.admin.OffsetWrapper; +import org.apache.rocketmq.common.consumer.ConsumeFromWhere; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.protocol.body.Connection; +import org.apache.rocketmq.common.protocol.body.ConsumeStatus; +import org.apache.rocketmq.common.protocol.body.ConsumerConnection; +import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo; +import org.apache.rocketmq.common.protocol.body.ProcessQueueInfo; +import org.apache.rocketmq.common.protocol.body.TopicList; +import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; +import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; +import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; +import org.apache.rocketmq.common.protocol.route.BrokerData; +import org.apache.rocketmq.common.protocol.route.QueueData; +import org.apache.rocketmq.common.protocol.route.TopicRouteData; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.apache.rocketmq.remoting.protocol.LanguageCode; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.apache.rocketmq.common.protocol.heartbeat.ConsumeType.CONSUME_ACTIVELY; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.anyBoolean; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class MonitorServiceTest { + private static DefaultMQAdminExt defaultMQAdminExt; + private static DefaultMQAdminExtImpl defaultMQAdminExtImpl; + private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private static MQClientAPIImpl mQClientAPIImpl; + private static MonitorConfig monitorConfig; + private static MonitorListener monitorListener; + private static DefaultMQPullConsumer defaultMQPullConsumer; + private static DefaultMQPushConsumer defaultMQPushConsumer; + private static MonitorService monitorService; + + @BeforeClass + public static void init() throws NoSuchFieldException, IllegalAccessException, RemotingException, MQClientException, InterruptedException, MQBrokerException { + monitorConfig = new MonitorConfig(); + monitorListener = new DefaultMonitorListener(); + defaultMQPullConsumer = mock(DefaultMQPullConsumer.class); + defaultMQPushConsumer = mock(DefaultMQPushConsumer.class); + mQClientAPIImpl = mock(MQClientAPIImpl.class); + defaultMQAdminExt = new DefaultMQAdminExt(); + defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 1000); + monitorService = new MonitorService(monitorConfig, monitorListener, null); + + Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance"); + field.setAccessible(true); + field.set(defaultMQAdminExtImpl, mqClientInstance); + field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl"); + field.setAccessible(true); + field.set(mqClientInstance, mQClientAPIImpl); + field = DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl"); + field.setAccessible(true); + field.set(defaultMQAdminExt, defaultMQAdminExtImpl); + + field = MonitorService.class.getDeclaredField("defaultMQAdminExt"); + field.setAccessible(true); + field.set(monitorService, defaultMQAdminExt); + field = MonitorService.class.getDeclaredField("defaultMQPullConsumer"); + field.setAccessible(true); + field.set(monitorService, defaultMQPullConsumer); + field = MonitorService.class.getDeclaredField("defaultMQPushConsumer"); + field.setAccessible(true); + field.set(monitorService, defaultMQPushConsumer); + + TopicList topicList = new TopicList(); + Set topicSet = new HashSet<>(); + topicSet.add("topic_one"); + topicSet.add("topic_two"); + topicList.setTopicList(topicSet); + when(mQClientAPIImpl.getTopicListFromNameServer(anyLong())).thenReturn(topicList); + + TopicRouteData topicRouteData = new TopicRouteData(); + List brokerDatas = new ArrayList<>(); + HashMap brokerAddrs = new HashMap<>(); + brokerAddrs.put(1234l, "127.0.0.1:10911"); + BrokerData brokerData = new BrokerData(); + brokerData.setCluster("default-cluster"); + brokerData.setBrokerName("default-broker"); + brokerData.setBrokerAddrs(brokerAddrs); + brokerDatas.add(brokerData); + topicRouteData.setBrokerDatas(brokerDatas); + topicRouteData.setQueueDatas(new ArrayList()); + topicRouteData.setFilterServerTable(new HashMap>()); + when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(topicRouteData); + + ConsumeStats consumeStats = new ConsumeStats(); + consumeStats.setConsumeTps(1234); + MessageQueue messageQueue = new MessageQueue(); + OffsetWrapper offsetWrapper = new OffsetWrapper(); + HashMap stats = new HashMap<>(); + stats.put(messageQueue, offsetWrapper); + consumeStats.setOffsetTable(stats); + when(mQClientAPIImpl.getConsumeStats(anyString(), anyString(), anyString(), anyLong())).thenReturn(consumeStats); + + ConsumerConnection consumerConnection = new ConsumerConnection(); + consumerConnection.setConsumeType(ConsumeType.CONSUME_PASSIVELY); + consumerConnection.setMessageModel(MessageModel.CLUSTERING); + HashSet connections = new HashSet<>(); + Connection connection = new Connection(); + connection.setClientId("client_id"); + connection.setClientAddr("127.0.0.1:109111"); + connection.setLanguage(LanguageCode.JAVA); + connection.setVersion(MQVersion.Version.V4_0_0_SNAPSHOT.ordinal()); + connections.add(connection); + consumerConnection.setConnectionSet(connections); + consumerConnection.setSubscriptionTable(new ConcurrentHashMap()); + consumerConnection.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); + when(mQClientAPIImpl.getConsumerConnectionList(anyString(), anyString(), anyLong())).thenReturn(consumerConnection); + + ConsumerRunningInfo consumerRunningInfo = new ConsumerRunningInfo(); + consumerRunningInfo.setJstack("test"); + consumerRunningInfo.setMqTable(new TreeMap()); + consumerRunningInfo.setStatusTable(new TreeMap()); + consumerRunningInfo.setSubscriptionSet(new TreeSet()); + Properties properties = new Properties(); + properties.put(ConsumerRunningInfo.PROP_CONSUME_TYPE, CONSUME_ACTIVELY); + properties.put(ConsumerRunningInfo.PROP_CONSUMER_START_TIMESTAMP, System.currentTimeMillis()); + consumerRunningInfo.setProperties(properties); + when(mQClientAPIImpl.getConsumerRunningInfo(anyString(), anyString(), anyString(), anyBoolean(), anyLong())).thenReturn(consumerRunningInfo); + } + + @AfterClass + public static void terminate() { + } + + @Test + public void testDoMonitorWork() throws RemotingException, MQClientException, InterruptedException { + monitorService.doMonitorWork(); + } + + @Test + public void testReportConsumerRunningInfo() throws InterruptedException, RemotingException, MQClientException, MQBrokerException { + monitorService.reportConsumerRunningInfo("test_group"); + } +} \ No newline at end of file