diff --git a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java index 5964cd1a9e48e7d02fd61a754837197290c6e5f7..da3bd8c9e0aac245e398743402bb48db1764285e 100644 --- a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java +++ b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java @@ -22,15 +22,21 @@ import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.impl.MQClientAPIImpl; import org.apache.rocketmq.client.impl.MQClientManager; import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.apache.rocketmq.common.admin.*; +import org.apache.rocketmq.common.consumer.ConsumeFromWhere; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.namesrv.NamesrvUtil; import org.apache.rocketmq.common.protocol.body.*; +import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; +import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; +import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.common.protocol.route.BrokerData; +import org.apache.rocketmq.common.protocol.route.QueueData; import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; -import org.apache.rocketmq.remoting.exception.RemotingConnectException; -import org.apache.rocketmq.remoting.exception.RemotingException; -import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; -import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; +import org.apache.rocketmq.remoting.exception.*; +import org.apache.rocketmq.tools.admin.api.MessageTrack; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -43,8 +49,7 @@ import java.util.*; import java.util.concurrent.ConcurrentHashMap; import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -93,6 +98,8 @@ public class DefaultMQAdminExtTest { brokerData.setBrokerAddrs(brokerAddrs); brokerDatas.add(brokerData); topicRouteData.setBrokerDatas(brokerDatas); + topicRouteData.setQueueDatas(new ArrayList()); + topicRouteData.setFilterServerTable(new HashMap>()); when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(topicRouteData); HashMap result = new HashMap<>(); @@ -105,6 +112,7 @@ public class DefaultMQAdminExtTest { brokerAddrTable.put("default-broker", brokerData); brokerAddrTable.put("broker-test", new BrokerData()); clusterInfo.setBrokerAddrTable(brokerAddrTable); + clusterInfo.setClusterAddrTable(new HashMap>()); when(mQClientAPIImpl.getBrokerClusterInfo(anyLong())).thenReturn(clusterInfo); when(mQClientAPIImpl.cleanExpiredConsumeQueue(anyString(), anyLong())).thenReturn(true); @@ -140,6 +148,52 @@ public class DefaultMQAdminExtTest { kv.put("cluster-name", "default-cluster"); kvTable.setTable(kv); when(mQClientAPIImpl.getKVListByNamespace(anyString(), anyLong())).thenReturn(kvTable); + + ConsumeStats consumeStats = new ConsumeStats(); + consumeStats.setConsumeTps(1234); + MessageQueue messageQueue = new MessageQueue(); + OffsetWrapper offsetWrapper = new OffsetWrapper(); + HashMap stats = new HashMap<>(); + stats.put(messageQueue, offsetWrapper); + consumeStats.setOffsetTable(stats); + when(mQClientAPIImpl.getConsumeStats(anyString(), anyString(), anyString(), anyLong())).thenReturn(consumeStats); + + ConsumerConnection consumerConnection = new ConsumerConnection(); + consumerConnection.setConsumeType(ConsumeType.CONSUME_PASSIVELY); + consumerConnection.setMessageModel(MessageModel.CLUSTERING); + HashSet connections = new HashSet<>(); + connections.add(new Connection()); + consumerConnection.setConnectionSet(connections); + consumerConnection.setSubscriptionTable(new ConcurrentHashMap()); + consumerConnection.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); + when(mQClientAPIImpl.getConsumerConnectionList(anyString(), anyString(), anyLong())).thenReturn(consumerConnection); + + ProducerConnection producerConnection = new ProducerConnection(); + Connection connection = new Connection(); + connection.setClientAddr("127.0.0.1:9898"); + connection.setClientId("PID_12345"); + HashSet connectionSet = new HashSet(); + connectionSet.add(connection); + producerConnection.setConnectionSet(connectionSet); + when(mQClientAPIImpl.getProducerConnectionList(anyString(), anyString(), anyLong())).thenReturn(producerConnection); + + when(mQClientAPIImpl.wipeWritePermOfBroker(anyString(), anyString(), anyLong())).thenReturn(6); + + TopicStatsTable topicStatsTable = new TopicStatsTable(); + topicStatsTable.setOffsetTable(new HashMap()); + + Map> consumerStatus = new HashMap<>(); + when(mQClientAPIImpl.invokeBrokerToGetConsumerStatus(anyString(), anyString(), anyString(), anyString(), anyLong())).thenReturn(consumerStatus); + + List queueTimeSpanList = new ArrayList<>(); + when(mQClientAPIImpl.queryConsumeTimeSpan(anyString(), anyString(), anyString(), anyLong())).thenReturn(queueTimeSpanList); + + ConsumerRunningInfo consumerRunningInfo = new ConsumerRunningInfo(); + consumerRunningInfo.setJstack("test"); + consumerRunningInfo.setMqTable(new TreeMap()); + consumerRunningInfo.setStatusTable(new TreeMap()); + consumerRunningInfo.setSubscriptionSet(new TreeSet()); + when(mQClientAPIImpl.getConsumerRunningInfo(anyString(), anyString(), anyString(), anyBoolean(), anyLong())).thenReturn(consumerRunningInfo); } @After @@ -190,6 +244,31 @@ public class DefaultMQAdminExtTest { assertThat(clusterAddress.get("default-cluster").size()).isEqualTo(2); } + @Test + public void testExamineConsumeStats() throws InterruptedException, RemotingException, MQClientException, MQBrokerException { + ConsumeStats consumeStats = defaultMQAdminExtImpl.examineConsumeStats("default-consumer-group", "unit-test"); + assertThat(consumeStats.getConsumeTps()).isEqualTo(1234); + } + + @Test + public void testExamineConsumerConnectionInfo() throws InterruptedException, RemotingException, MQClientException, MQBrokerException { + ConsumerConnection consumerConnection = defaultMQAdminExtImpl.examineConsumerConnectionInfo("default-consumer-group"); + assertThat(consumerConnection.getConsumeType()).isEqualTo(ConsumeType.CONSUME_PASSIVELY); + assertThat(consumerConnection.getMessageModel()).isEqualTo(MessageModel.CLUSTERING); + } + + @Test + public void testExamineProducerConnectionInfo() throws InterruptedException, RemotingException, MQClientException, MQBrokerException { + ProducerConnection producerConnection = defaultMQAdminExtImpl.examineProducerConnectionInfo("default-producer-group", "unit-test"); + assertThat(producerConnection.getConnectionSet().size()).isEqualTo(1); + } + + @Test + public void testWipeWritePermOfBroker() throws InterruptedException, RemotingCommandException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, RemotingConnectException { + int result = defaultMQAdminExtImpl.wipeWritePermOfBroker("127.0.0.1:9876", "default-broker"); + assertThat(result).isEqualTo(6); + } + @Test public void testExamineTopicRouteInfo() throws RemotingException, MQClientException, InterruptedException { TopicRouteData topicRouteData = defaultMQAdminExtImpl.examineTopicRouteInfo("UnitTest"); @@ -224,12 +303,57 @@ public class DefaultMQAdminExtTest { assertThat(groupList.getGroupList().contains("consumer-group-two")).isTrue(); } + @Test + public void testQueryConsumeTimeSpan() throws InterruptedException, RemotingException, MQClientException, MQBrokerException { + List result = defaultMQAdminExtImpl.queryConsumeTimeSpan("unit-test", "default-broker-group"); + assertThat(result.size()).isEqualTo(0); + } + + @Test + public void testCleanExpiredConsumerQueue() throws InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException { + boolean result = defaultMQAdminExtImpl.cleanExpiredConsumerQueue("default-cluster"); + assertThat(result).isFalse(); + } + @Test public void testCleanExpiredConsumerQueueByAddr() throws InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException { boolean clean = defaultMQAdminExtImpl.cleanExpiredConsumerQueueByAddr("127.0.0.1:10911"); assertThat(clean).isTrue(); } + @Test + public void testCleanUnusedTopic() throws InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException { + boolean result = defaultMQAdminExtImpl.cleanUnusedTopic("default-cluster"); + assertThat(result).isFalse(); + } + + @Test + public void testGetConsumerRunningInfo() throws RemotingException, MQClientException, InterruptedException { + ConsumerRunningInfo consumerRunningInfo = defaultMQAdminExtImpl.getConsumerRunningInfo("consumer-group", "cid_123", false); + assertThat(consumerRunningInfo.getJstack()).isEqualTo("test"); + } + + @Test + public void testMessageTrackDetail() throws InterruptedException, RemotingException, MQClientException, MQBrokerException { + MessageExt messageExt = new MessageExt(); + messageExt.setMsgId("msgId"); + messageExt.setTopic("unit-test"); + List messageTrackList = defaultMQAdminExtImpl.messageTrackDetail(messageExt); + assertThat(messageTrackList.size()).isEqualTo(2); + } + + @Test + public void testGetConsumeStatus() throws InterruptedException, RemotingException, MQClientException, MQBrokerException { + Map> result = defaultMQAdminExtImpl.getConsumeStatus("unit-test", "default-broker-group", "127.0.0.1:10911"); + assertThat(result.size()).isEqualTo(0); + } + + @Test + public void testGetTopicClusterList() throws InterruptedException, RemotingException, MQClientException, MQBrokerException { + Set result = defaultMQAdminExtImpl.getTopicClusterList("unit-test"); + assertThat(result.size()).isEqualTo(0); + } + @Test public void testGetClusterList() throws InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException { Set clusterlist = defaultMQAdminExtImpl.getClusterList("UnitTest");