diff --git a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java new file mode 100644 index 0000000000000000000000000000000000000000..ab6319ff8596a4e24dd254f442e49976de24420a --- /dev/null +++ b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java @@ -0,0 +1,240 @@ +package org.apache.rocketmq.tools.admin; + +import org.apache.rocketmq.client.ClientConfig; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.impl.MQClientAPIImpl; +import org.apache.rocketmq.client.impl.MQClientManager; +import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.apache.rocketmq.common.namesrv.NamesrvUtil; +import org.apache.rocketmq.common.protocol.body.*; +import org.apache.rocketmq.common.protocol.route.BrokerData; +import org.apache.rocketmq.common.protocol.route.TopicRouteData; +import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; +import org.apache.rocketmq.remoting.exception.RemotingConnectException; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; +import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.junit.MockitoJUnitRunner; + +import java.io.UnsupportedEncodingException; +import java.lang.reflect.Field; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class DefaultMQAdminExtTest { + private DefaultMQAdminExtImpl defaultMQAdminExtImpl; + private MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private MQClientAPIImpl mQClientAPIImpl; + private Properties properties = new Properties(); + private TopicList topicList = new TopicList(); + private TopicRouteData topicRouteData = new TopicRouteData(); + private KVTable kvTable = new KVTable(); + private ClusterInfo clusterInfo = new ClusterInfo(); + + @Before + public void init() throws Exception { + mQClientAPIImpl = mock(MQClientAPIImpl.class); + DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(); + defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 1000); + + Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance"); + field.setAccessible(true); + field.set(defaultMQAdminExtImpl, mqClientInstance); + field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl"); + field.setAccessible(true); + field.set(mqClientInstance, mQClientAPIImpl); + + properties.setProperty("maxMessageSize", "5000000"); + properties.setProperty("flushDelayOffsetInterval", "15000"); + properties.setProperty("serverSocketRcvBufSize", "655350"); + when(mQClientAPIImpl.getBrokerConfig(anyString(), anyLong())).thenReturn(properties); + + Set topicSet = new HashSet<>(); + topicSet.add("topic_one"); + topicSet.add("topic_two"); + topicList.setTopicList(topicSet); + when(mQClientAPIImpl.getTopicListFromNameServer(anyLong())).thenReturn(topicList); + + + List brokerDatas = new ArrayList<>(); + HashMap brokerAddrs = new HashMap<>(); + brokerAddrs.put(1234l, "127.0.0.1:10911"); + BrokerData brokerData = new BrokerData(); + brokerData.setCluster("default-cluster"); + brokerData.setBrokerName("default-broker"); + brokerData.setBrokerAddrs(brokerAddrs); + brokerDatas.add(brokerData); + topicRouteData.setBrokerDatas(brokerDatas); + when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(topicRouteData); + + HashMap result = new HashMap<>(); + result.put("id", "1234"); + result.put("brokerName", "default-broker"); + kvTable.setTable(result); + when(mQClientAPIImpl.getBrokerRuntimeInfo(anyString(), anyLong())).thenReturn(kvTable); + + HashMap brokerAddrTable = new HashMap<>(); + brokerAddrTable.put("default-broker", brokerData); + brokerAddrTable.put("broker-test", new BrokerData()); + clusterInfo.setBrokerAddrTable(brokerAddrTable); + when(mQClientAPIImpl.getBrokerClusterInfo(anyLong())).thenReturn(clusterInfo); + when(mQClientAPIImpl.cleanExpiredConsumeQueue(anyString(), anyLong())).thenReturn(true); + + Set clusterList = new HashSet<>(); + clusterList.add("default-cluster-one"); + clusterList.add("default-cluster-two"); + when(mQClientAPIImpl.getClusterList(anyString(), anyLong())).thenReturn(clusterList); + + GroupList groupList = new GroupList(); + HashSet groups = new HashSet<>(); + groups.add("consumer-group-one"); + groups.add("consumer-group-two"); + groupList.setGroupList(groups); + when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(topicRouteData); + when(mQClientAPIImpl.queryTopicConsumeByWho(anyString(), anyString(), anyLong())).thenReturn(groupList); + + SubscriptionGroupWrapper subscriptionGroupWrapper = new SubscriptionGroupWrapper(); + ConcurrentHashMap subscriptions = new ConcurrentHashMap<>(); + SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig(); + subscriptionGroupConfig.setConsumeBroadcastEnable(true); + subscriptionGroupConfig.setBrokerId(1234); + subscriptionGroupConfig.setGroupName("Consumer-group-one"); + subscriptions.put("Consumer-group-one", subscriptionGroupConfig); + subscriptionGroupWrapper.setSubscriptionGroupTable(subscriptions); + when(mQClientAPIImpl.getAllSubscriptionGroup(anyString(), anyLong())).thenReturn(subscriptionGroupWrapper); + + String topicListConfig = "topicListConfig"; + when(mQClientAPIImpl.getKVConfigValue(anyString(), anyString(), anyLong())).thenReturn(topicListConfig); + + KVTable kvTable = new KVTable(); + HashMap kv = new HashMap<>(); + kv.put("broker-name", "broker-one"); + kv.put("cluster-name", "default-cluster"); + kvTable.setTable(kv); + when(mQClientAPIImpl.getKVListByNamespace(anyString(), anyLong())).thenReturn(kvTable); + } + + @After + public void terminate() throws Exception { + if (defaultMQAdminExtImpl != null) + defaultMQAdminExtImpl.shutdown(); + } + + @Test + public void testUpdateBrokerConfig() throws InterruptedException, RemotingConnectException, UnsupportedEncodingException, RemotingTimeoutException, MQBrokerException, RemotingSendRequestException { + Properties result = defaultMQAdminExtImpl.getBrokerConfig("127.0.0.1:10911"); + assertThat(result.getProperty("maxMessageSize")).isEqualTo("5000000"); + assertThat(result.getProperty("flushDelayOffsetInterval")).isEqualTo("15000"); + assertThat(result.getProperty("serverSocketRcvBufSize")).isEqualTo("655350"); + } + + + @Test + public void testFetchAllTopicList() throws RemotingException, MQClientException, InterruptedException { + TopicList topicList = defaultMQAdminExtImpl.fetchAllTopicList(); + assertThat(topicList.getTopicList().size()).isEqualTo(2); + assertThat(topicList.getTopicList()).contains("topic_one"); + } + + @Test + public void testFetchBrokerRuntimeStats() throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException { + KVTable brokerStats = defaultMQAdminExtImpl.fetchBrokerRuntimeStats("127.0.0.1:10911"); + assertThat(brokerStats.getTable().get("id")).isEqualTo("1234"); + assertThat(brokerStats.getTable().get("brokerName")).isEqualTo("default-broker"); + } + + @Test + public void testExamineBrokerClusterInfo() throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException { + ClusterInfo clusterInfo = defaultMQAdminExtImpl.examineBrokerClusterInfo(); + HashMap brokerList = clusterInfo.getBrokerAddrTable(); + assertThat(brokerList.get("default-broker").getBrokerName()).isEqualTo("default-broker"); + assertThat(brokerList.containsKey("broker-test")).isTrue(); + + HashMap> clusterMap = new HashMap<>(); + Set brokers = new HashSet<>(); + brokers.add("default-broker"); + brokers.add("broker-test"); + clusterMap.put("default-cluster", brokers); + ClusterInfo cInfo = mock(ClusterInfo.class); + when(cInfo.getClusterAddrTable()).thenReturn(clusterMap); + HashMap> clusterAddress = cInfo.getClusterAddrTable(); + assertThat(clusterAddress.containsKey("default-cluster")).isTrue(); + assertThat(clusterAddress.get("default-cluster").size()).isEqualTo(2); + } + + @Test + public void testExamineTopicRouteInfo() throws RemotingException, MQClientException, InterruptedException { + TopicRouteData topicRouteData = defaultMQAdminExtImpl.examineTopicRouteInfo("UnitTest"); + assertThat(topicRouteData.getBrokerDatas().get(0).getBrokerName()).isEqualTo("default-broker"); + assertThat(topicRouteData.getBrokerDatas().get(0).getCluster()).isEqualTo("default-cluster"); + } + + @Test + public void testGetNameServerAddressList() { + List result = new ArrayList<>(); + result.add("default-name-one"); + result.add("default-name-two"); + when(mqClientInstance.getMQClientAPIImpl().getNameServerAddressList()).thenReturn(result); + List nameList = defaultMQAdminExtImpl.getNameServerAddressList(); + assertThat(nameList.get(0)).isEqualTo("default-name-one"); + assertThat(nameList.get(1)).isEqualTo("default-name-two"); + } + + @Test + public void testPutKVConfig() throws RemotingException, MQClientException, InterruptedException { + String topicConfig = defaultMQAdminExtImpl.getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG, "UnitTest"); + assertThat(topicConfig).isEqualTo("topicListConfig"); + KVTable kvs = defaultMQAdminExtImpl.getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG); + assertThat(kvs.getTable().get("broker-name")).isEqualTo("broker-one"); + assertThat(kvs.getTable().get("cluster-name")).isEqualTo("default-cluster"); + } + + + @Test + public void testQueryTopicConsumeByWho() throws InterruptedException, RemotingException, MQClientException, MQBrokerException { + GroupList groupList = defaultMQAdminExtImpl.queryTopicConsumeByWho("UnitTest"); + assertThat(groupList.getGroupList().contains("consumer-group-two")).isTrue(); + } + + @Test + public void testCleanExpiredConsumerQueueByAddr() throws InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException { + boolean clean = defaultMQAdminExtImpl.cleanExpiredConsumerQueueByAddr("127.0.0.1:10911"); + assertThat(clean).isTrue(); + } + + @Test + public void testGetClusterList() throws InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException { + Set clusterlist = defaultMQAdminExtImpl.getClusterList("UnitTest"); + assertThat(clusterlist.contains("default-cluster-one")).isTrue(); + assertThat(clusterlist.contains("default-cluster-two")).isTrue(); + } + + @Test + public void testFetchConsumeStatsInBroker() throws InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException { + ConsumeStatsList result = new ConsumeStatsList(); + result.setBrokerAddr("127.0.0.1:10911"); + when(mqClientInstance.getMQClientAPIImpl().fetchConsumeStatsInBroker("127.0.0.1:10911", false, 10000)).thenReturn(result); + ConsumeStatsList consumeStatsList = defaultMQAdminExtImpl.fetchConsumeStatsInBroker("127.0.0.1:10911", false, 10000); + assertThat(consumeStatsList.getBrokerAddr()).isEqualTo("127.0.0.1:10911"); + } + + @Test + public void testGetAllSubscriptionGroup() throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException { + SubscriptionGroupWrapper subscriptionGroupWrapper = defaultMQAdminExtImpl.getAllSubscriptionGroup("127.0.0.1:10911", 10000); + assertThat(subscriptionGroupWrapper.getSubscriptionGroupTable().get("Consumer-group-one").getBrokerId()).isEqualTo(1234); + assertThat(subscriptionGroupWrapper.getSubscriptionGroupTable().get("Consumer-group-one").getGroupName()).isEqualTo("Consumer-group-one"); + assertThat(subscriptionGroupWrapper.getSubscriptionGroupTable().get("Consumer-group-one").isConsumeBroadcastEnable()).isTrue(); + } +} \ No newline at end of file diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/CommandUtilTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/CommandUtilTest.java new file mode 100644 index 0000000000000000000000000000000000000000..35c205e34996813c4b027ad30bf18bbf3080ea35 --- /dev/null +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/CommandUtilTest.java @@ -0,0 +1,92 @@ +package org.apache.rocketmq.tools.command; + +import org.apache.rocketmq.client.ClientConfig; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.impl.MQClientAPIImpl; +import org.apache.rocketmq.client.impl.MQClientManager; +import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.apache.rocketmq.common.protocol.body.ClusterInfo; +import org.apache.rocketmq.common.protocol.route.BrokerData; +import org.apache.rocketmq.remoting.exception.RemotingConnectException; +import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; +import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.lang.reflect.Field; +import java.util.*; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class CommandUtilTest { + private DefaultMQAdminExt defaultMQAdminExt; + private DefaultMQAdminExtImpl defaultMQAdminExtImpl; + private MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private MQClientAPIImpl mQClientAPIImpl; + + @Before + public void setup() throws MQClientException, NoSuchFieldException, IllegalAccessException, InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException { + defaultMQAdminExt = mock(DefaultMQAdminExt.class); + MQClientAPIImpl mQClientAPIImpl = mock(MQClientAPIImpl.class); + defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 3000); + + Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance"); + field.setAccessible(true); + field.set(defaultMQAdminExtImpl, mqClientInstance); + field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl"); + field.setAccessible(true); + field.set(mqClientInstance, mQClientAPIImpl); + + ClusterInfo clusterInfo = new ClusterInfo(); + HashMap brokerAddrTable = new HashMap<>(); + HashMap> clusterAddrTable = new HashMap<>(); + HashMap brokerAddrs = new HashMap<>(); + brokerAddrs.put(1234l, "127.0.0.1:10911"); + BrokerData brokerData = new BrokerData(); + brokerData.setBrokerName("default-broker"); + brokerData.setCluster("default-cluster"); + brokerData.setBrokerAddrs(brokerAddrs); + brokerAddrTable.put("default-broker", brokerData); + brokerAddrTable.put("broker-test", new BrokerData()); + Set brokerSet = new HashSet<>(); + brokerSet.add("default-broker"); + brokerSet.add("default-broker-one"); + clusterAddrTable.put("default-cluster", brokerSet); + clusterInfo.setBrokerAddrTable(brokerAddrTable); + clusterInfo.setClusterAddrTable(clusterAddrTable); + when(mQClientAPIImpl.getBrokerClusterInfo(anyLong())).thenReturn(clusterInfo); + when(mQClientAPIImpl.cleanExpiredConsumeQueue(anyString(), anyLong())).thenReturn(true); + } + + @After + public void shutdown() throws Exception { + } + + @Test + public void testFetchMasterAndSlaveDistinguish() throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException { + Map> result = CommandUtil.fetchMasterAndSlaveDistinguish(defaultMQAdminExtImpl, "default-cluster"); + assertThat(result.get(null).get(0)).isEqualTo("127.0.0.1:10911"); + } + + @Test + public void testFetchMasterAddrByClusterName() throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException { + Set result = CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExtImpl, "default-cluster"); + assertThat(result.size()).isEqualTo(0); + } + + @Test + public void testFetchBrokerNameByClusterName() throws Exception { + Set result = CommandUtil.fetchBrokerNameByClusterName(defaultMQAdminExtImpl, "default-cluster"); + assertThat(result.contains("default-broker")).isTrue(); + assertThat(result.contains("default-broker-one")).isTrue(); + assertThat(result.size()).isEqualTo(2); + } +} \ No newline at end of file