提交 f8d881ba 编写于 作者: S stevenschew

[ROCKETMQ-57] Add unit test for DefaultMQAdminExt

上级 0de84e20
...@@ -22,15 +22,21 @@ import org.apache.rocketmq.client.exception.MQClientException; ...@@ -22,15 +22,21 @@ import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.MQClientAPIImpl; import org.apache.rocketmq.client.impl.MQClientAPIImpl;
import org.apache.rocketmq.client.impl.MQClientManager; import org.apache.rocketmq.client.impl.MQClientManager;
import org.apache.rocketmq.client.impl.factory.MQClientInstance; import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.common.admin.*;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.namesrv.NamesrvUtil; import org.apache.rocketmq.common.namesrv.NamesrvUtil;
import org.apache.rocketmq.common.protocol.body.*; import org.apache.rocketmq.common.protocol.body.*;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.protocol.route.BrokerData; import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.remoting.exception.RemotingConnectException; import org.apache.rocketmq.remoting.exception.*;
import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.tools.admin.api.MessageTrack;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
...@@ -43,8 +49,7 @@ import java.util.*; ...@@ -43,8 +49,7 @@ import java.util.*;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.*;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
...@@ -93,6 +98,8 @@ public class DefaultMQAdminExtTest { ...@@ -93,6 +98,8 @@ public class DefaultMQAdminExtTest {
brokerData.setBrokerAddrs(brokerAddrs); brokerData.setBrokerAddrs(brokerAddrs);
brokerDatas.add(brokerData); brokerDatas.add(brokerData);
topicRouteData.setBrokerDatas(brokerDatas); topicRouteData.setBrokerDatas(brokerDatas);
topicRouteData.setQueueDatas(new ArrayList<QueueData>());
topicRouteData.setFilterServerTable(new HashMap<String, List<String>>());
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(topicRouteData); when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(topicRouteData);
HashMap<String, String> result = new HashMap<>(); HashMap<String, String> result = new HashMap<>();
...@@ -105,6 +112,7 @@ public class DefaultMQAdminExtTest { ...@@ -105,6 +112,7 @@ public class DefaultMQAdminExtTest {
brokerAddrTable.put("default-broker", brokerData); brokerAddrTable.put("default-broker", brokerData);
brokerAddrTable.put("broker-test", new BrokerData()); brokerAddrTable.put("broker-test", new BrokerData());
clusterInfo.setBrokerAddrTable(brokerAddrTable); clusterInfo.setBrokerAddrTable(brokerAddrTable);
clusterInfo.setClusterAddrTable(new HashMap<String, Set<String>>());
when(mQClientAPIImpl.getBrokerClusterInfo(anyLong())).thenReturn(clusterInfo); when(mQClientAPIImpl.getBrokerClusterInfo(anyLong())).thenReturn(clusterInfo);
when(mQClientAPIImpl.cleanExpiredConsumeQueue(anyString(), anyLong())).thenReturn(true); when(mQClientAPIImpl.cleanExpiredConsumeQueue(anyString(), anyLong())).thenReturn(true);
...@@ -140,6 +148,52 @@ public class DefaultMQAdminExtTest { ...@@ -140,6 +148,52 @@ public class DefaultMQAdminExtTest {
kv.put("cluster-name", "default-cluster"); kv.put("cluster-name", "default-cluster");
kvTable.setTable(kv); kvTable.setTable(kv);
when(mQClientAPIImpl.getKVListByNamespace(anyString(), anyLong())).thenReturn(kvTable); when(mQClientAPIImpl.getKVListByNamespace(anyString(), anyLong())).thenReturn(kvTable);
ConsumeStats consumeStats = new ConsumeStats();
consumeStats.setConsumeTps(1234);
MessageQueue messageQueue = new MessageQueue();
OffsetWrapper offsetWrapper = new OffsetWrapper();
HashMap<MessageQueue, OffsetWrapper> stats = new HashMap<>();
stats.put(messageQueue, offsetWrapper);
consumeStats.setOffsetTable(stats);
when(mQClientAPIImpl.getConsumeStats(anyString(), anyString(), anyString(), anyLong())).thenReturn(consumeStats);
ConsumerConnection consumerConnection = new ConsumerConnection();
consumerConnection.setConsumeType(ConsumeType.CONSUME_PASSIVELY);
consumerConnection.setMessageModel(MessageModel.CLUSTERING);
HashSet<Connection> connections = new HashSet<>();
connections.add(new Connection());
consumerConnection.setConnectionSet(connections);
consumerConnection.setSubscriptionTable(new ConcurrentHashMap<String, SubscriptionData>());
consumerConnection.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
when(mQClientAPIImpl.getConsumerConnectionList(anyString(), anyString(), anyLong())).thenReturn(consumerConnection);
ProducerConnection producerConnection = new ProducerConnection();
Connection connection = new Connection();
connection.setClientAddr("127.0.0.1:9898");
connection.setClientId("PID_12345");
HashSet<Connection> connectionSet = new HashSet<Connection>();
connectionSet.add(connection);
producerConnection.setConnectionSet(connectionSet);
when(mQClientAPIImpl.getProducerConnectionList(anyString(), anyString(), anyLong())).thenReturn(producerConnection);
when(mQClientAPIImpl.wipeWritePermOfBroker(anyString(), anyString(), anyLong())).thenReturn(6);
TopicStatsTable topicStatsTable = new TopicStatsTable();
topicStatsTable.setOffsetTable(new HashMap<MessageQueue, TopicOffset>());
Map<String, Map<MessageQueue, Long>> consumerStatus = new HashMap<>();
when(mQClientAPIImpl.invokeBrokerToGetConsumerStatus(anyString(), anyString(), anyString(), anyString(), anyLong())).thenReturn(consumerStatus);
List<QueueTimeSpan> queueTimeSpanList = new ArrayList<>();
when(mQClientAPIImpl.queryConsumeTimeSpan(anyString(), anyString(), anyString(), anyLong())).thenReturn(queueTimeSpanList);
ConsumerRunningInfo consumerRunningInfo = new ConsumerRunningInfo();
consumerRunningInfo.setJstack("test");
consumerRunningInfo.setMqTable(new TreeMap<MessageQueue, ProcessQueueInfo>());
consumerRunningInfo.setStatusTable(new TreeMap<String, ConsumeStatus>());
consumerRunningInfo.setSubscriptionSet(new TreeSet<SubscriptionData>());
when(mQClientAPIImpl.getConsumerRunningInfo(anyString(), anyString(), anyString(), anyBoolean(), anyLong())).thenReturn(consumerRunningInfo);
} }
@After @After
...@@ -190,6 +244,31 @@ public class DefaultMQAdminExtTest { ...@@ -190,6 +244,31 @@ public class DefaultMQAdminExtTest {
assertThat(clusterAddress.get("default-cluster").size()).isEqualTo(2); assertThat(clusterAddress.get("default-cluster").size()).isEqualTo(2);
} }
@Test
public void testExamineConsumeStats() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
ConsumeStats consumeStats = defaultMQAdminExtImpl.examineConsumeStats("default-consumer-group", "unit-test");
assertThat(consumeStats.getConsumeTps()).isEqualTo(1234);
}
@Test
public void testExamineConsumerConnectionInfo() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
ConsumerConnection consumerConnection = defaultMQAdminExtImpl.examineConsumerConnectionInfo("default-consumer-group");
assertThat(consumerConnection.getConsumeType()).isEqualTo(ConsumeType.CONSUME_PASSIVELY);
assertThat(consumerConnection.getMessageModel()).isEqualTo(MessageModel.CLUSTERING);
}
@Test
public void testExamineProducerConnectionInfo() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
ProducerConnection producerConnection = defaultMQAdminExtImpl.examineProducerConnectionInfo("default-producer-group", "unit-test");
assertThat(producerConnection.getConnectionSet().size()).isEqualTo(1);
}
@Test
public void testWipeWritePermOfBroker() throws InterruptedException, RemotingCommandException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, RemotingConnectException {
int result = defaultMQAdminExtImpl.wipeWritePermOfBroker("127.0.0.1:9876", "default-broker");
assertThat(result).isEqualTo(6);
}
@Test @Test
public void testExamineTopicRouteInfo() throws RemotingException, MQClientException, InterruptedException { public void testExamineTopicRouteInfo() throws RemotingException, MQClientException, InterruptedException {
TopicRouteData topicRouteData = defaultMQAdminExtImpl.examineTopicRouteInfo("UnitTest"); TopicRouteData topicRouteData = defaultMQAdminExtImpl.examineTopicRouteInfo("UnitTest");
...@@ -224,12 +303,57 @@ public class DefaultMQAdminExtTest { ...@@ -224,12 +303,57 @@ public class DefaultMQAdminExtTest {
assertThat(groupList.getGroupList().contains("consumer-group-two")).isTrue(); assertThat(groupList.getGroupList().contains("consumer-group-two")).isTrue();
} }
@Test
public void testQueryConsumeTimeSpan() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
List<QueueTimeSpan> result = defaultMQAdminExtImpl.queryConsumeTimeSpan("unit-test", "default-broker-group");
assertThat(result.size()).isEqualTo(0);
}
@Test
public void testCleanExpiredConsumerQueue() throws InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException {
boolean result = defaultMQAdminExtImpl.cleanExpiredConsumerQueue("default-cluster");
assertThat(result).isFalse();
}
@Test @Test
public void testCleanExpiredConsumerQueueByAddr() throws InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException { public void testCleanExpiredConsumerQueueByAddr() throws InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException {
boolean clean = defaultMQAdminExtImpl.cleanExpiredConsumerQueueByAddr("127.0.0.1:10911"); boolean clean = defaultMQAdminExtImpl.cleanExpiredConsumerQueueByAddr("127.0.0.1:10911");
assertThat(clean).isTrue(); assertThat(clean).isTrue();
} }
@Test
public void testCleanUnusedTopic() throws InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException {
boolean result = defaultMQAdminExtImpl.cleanUnusedTopic("default-cluster");
assertThat(result).isFalse();
}
@Test
public void testGetConsumerRunningInfo() throws RemotingException, MQClientException, InterruptedException {
ConsumerRunningInfo consumerRunningInfo = defaultMQAdminExtImpl.getConsumerRunningInfo("consumer-group", "cid_123", false);
assertThat(consumerRunningInfo.getJstack()).isEqualTo("test");
}
@Test
public void testMessageTrackDetail() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
MessageExt messageExt = new MessageExt();
messageExt.setMsgId("msgId");
messageExt.setTopic("unit-test");
List<MessageTrack> messageTrackList = defaultMQAdminExtImpl.messageTrackDetail(messageExt);
assertThat(messageTrackList.size()).isEqualTo(2);
}
@Test
public void testGetConsumeStatus() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
Map<String, Map<MessageQueue, Long>> result = defaultMQAdminExtImpl.getConsumeStatus("unit-test", "default-broker-group", "127.0.0.1:10911");
assertThat(result.size()).isEqualTo(0);
}
@Test
public void testGetTopicClusterList() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
Set<String> result = defaultMQAdminExtImpl.getTopicClusterList("unit-test");
assertThat(result.size()).isEqualTo(0);
}
@Test @Test
public void testGetClusterList() throws InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException { public void testGetClusterList() throws InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException {
Set<String> clusterlist = defaultMQAdminExtImpl.getClusterList("UnitTest"); Set<String> clusterlist = defaultMQAdminExtImpl.getClusterList("UnitTest");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册