提交 626315ab 编写于 作者: S stevenschew

[ROCKETMQ-57] Polish unit tests for rocketmq-tools

上级 0f9f55f0
package org.apache.rocketmq.tools.admin;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.MQClientAPIImpl;
import org.apache.rocketmq.client.impl.MQClientManager;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.common.namesrv.NamesrvUtil;
import org.apache.rocketmq.common.protocol.body.*;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;
import java.io.UnsupportedEncodingException;
import java.lang.reflect.Field;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class DefaultMQAdminExtTest {
private DefaultMQAdminExtImpl defaultMQAdminExtImpl;
private MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
private MQClientAPIImpl mQClientAPIImpl;
private Properties properties = new Properties();
private TopicList topicList = new TopicList();
private TopicRouteData topicRouteData = new TopicRouteData();
private KVTable kvTable = new KVTable();
private ClusterInfo clusterInfo = new ClusterInfo();
@Before
public void init() throws Exception {
mQClientAPIImpl = mock(MQClientAPIImpl.class);
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt();
defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 1000);
Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance");
field.setAccessible(true);
field.set(defaultMQAdminExtImpl, mqClientInstance);
field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
field.setAccessible(true);
field.set(mqClientInstance, mQClientAPIImpl);
properties.setProperty("maxMessageSize", "5000000");
properties.setProperty("flushDelayOffsetInterval", "15000");
properties.setProperty("serverSocketRcvBufSize", "655350");
when(mQClientAPIImpl.getBrokerConfig(anyString(), anyLong())).thenReturn(properties);
Set<String> topicSet = new HashSet<>();
topicSet.add("topic_one");
topicSet.add("topic_two");
topicList.setTopicList(topicSet);
when(mQClientAPIImpl.getTopicListFromNameServer(anyLong())).thenReturn(topicList);
List<BrokerData> brokerDatas = new ArrayList<>();
HashMap<Long, String> brokerAddrs = new HashMap<>();
brokerAddrs.put(1234l, "127.0.0.1:10911");
BrokerData brokerData = new BrokerData();
brokerData.setCluster("default-cluster");
brokerData.setBrokerName("default-broker");
brokerData.setBrokerAddrs(brokerAddrs);
brokerDatas.add(brokerData);
topicRouteData.setBrokerDatas(brokerDatas);
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(topicRouteData);
HashMap<String, String> result = new HashMap<>();
result.put("id", "1234");
result.put("brokerName", "default-broker");
kvTable.setTable(result);
when(mQClientAPIImpl.getBrokerRuntimeInfo(anyString(), anyLong())).thenReturn(kvTable);
HashMap<String, BrokerData> brokerAddrTable = new HashMap<>();
brokerAddrTable.put("default-broker", brokerData);
brokerAddrTable.put("broker-test", new BrokerData());
clusterInfo.setBrokerAddrTable(brokerAddrTable);
when(mQClientAPIImpl.getBrokerClusterInfo(anyLong())).thenReturn(clusterInfo);
when(mQClientAPIImpl.cleanExpiredConsumeQueue(anyString(), anyLong())).thenReturn(true);
Set<String> clusterList = new HashSet<>();
clusterList.add("default-cluster-one");
clusterList.add("default-cluster-two");
when(mQClientAPIImpl.getClusterList(anyString(), anyLong())).thenReturn(clusterList);
GroupList groupList = new GroupList();
HashSet<String> groups = new HashSet<>();
groups.add("consumer-group-one");
groups.add("consumer-group-two");
groupList.setGroupList(groups);
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(topicRouteData);
when(mQClientAPIImpl.queryTopicConsumeByWho(anyString(), anyString(), anyLong())).thenReturn(groupList);
SubscriptionGroupWrapper subscriptionGroupWrapper = new SubscriptionGroupWrapper();
ConcurrentHashMap<String, SubscriptionGroupConfig> subscriptions = new ConcurrentHashMap<>();
SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
subscriptionGroupConfig.setConsumeBroadcastEnable(true);
subscriptionGroupConfig.setBrokerId(1234);
subscriptionGroupConfig.setGroupName("Consumer-group-one");
subscriptions.put("Consumer-group-one", subscriptionGroupConfig);
subscriptionGroupWrapper.setSubscriptionGroupTable(subscriptions);
when(mQClientAPIImpl.getAllSubscriptionGroup(anyString(), anyLong())).thenReturn(subscriptionGroupWrapper);
String topicListConfig = "topicListConfig";
when(mQClientAPIImpl.getKVConfigValue(anyString(), anyString(), anyLong())).thenReturn(topicListConfig);
KVTable kvTable = new KVTable();
HashMap<String, String> kv = new HashMap<>();
kv.put("broker-name", "broker-one");
kv.put("cluster-name", "default-cluster");
kvTable.setTable(kv);
when(mQClientAPIImpl.getKVListByNamespace(anyString(), anyLong())).thenReturn(kvTable);
}
@After
public void terminate() throws Exception {
if (defaultMQAdminExtImpl != null)
defaultMQAdminExtImpl.shutdown();
}
@Test
public void testUpdateBrokerConfig() throws InterruptedException, RemotingConnectException, UnsupportedEncodingException, RemotingTimeoutException, MQBrokerException, RemotingSendRequestException {
Properties result = defaultMQAdminExtImpl.getBrokerConfig("127.0.0.1:10911");
assertThat(result.getProperty("maxMessageSize")).isEqualTo("5000000");
assertThat(result.getProperty("flushDelayOffsetInterval")).isEqualTo("15000");
assertThat(result.getProperty("serverSocketRcvBufSize")).isEqualTo("655350");
}
@Test
public void testFetchAllTopicList() throws RemotingException, MQClientException, InterruptedException {
TopicList topicList = defaultMQAdminExtImpl.fetchAllTopicList();
assertThat(topicList.getTopicList().size()).isEqualTo(2);
assertThat(topicList.getTopicList()).contains("topic_one");
}
@Test
public void testFetchBrokerRuntimeStats() throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
KVTable brokerStats = defaultMQAdminExtImpl.fetchBrokerRuntimeStats("127.0.0.1:10911");
assertThat(brokerStats.getTable().get("id")).isEqualTo("1234");
assertThat(brokerStats.getTable().get("brokerName")).isEqualTo("default-broker");
}
@Test
public void testExamineBrokerClusterInfo() throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
ClusterInfo clusterInfo = defaultMQAdminExtImpl.examineBrokerClusterInfo();
HashMap<String, BrokerData> brokerList = clusterInfo.getBrokerAddrTable();
assertThat(brokerList.get("default-broker").getBrokerName()).isEqualTo("default-broker");
assertThat(brokerList.containsKey("broker-test")).isTrue();
HashMap<String, Set<String>> clusterMap = new HashMap<>();
Set<String> brokers = new HashSet<>();
brokers.add("default-broker");
brokers.add("broker-test");
clusterMap.put("default-cluster", brokers);
ClusterInfo cInfo = mock(ClusterInfo.class);
when(cInfo.getClusterAddrTable()).thenReturn(clusterMap);
HashMap<String, Set<String>> clusterAddress = cInfo.getClusterAddrTable();
assertThat(clusterAddress.containsKey("default-cluster")).isTrue();
assertThat(clusterAddress.get("default-cluster").size()).isEqualTo(2);
}
@Test
public void testExamineTopicRouteInfo() throws RemotingException, MQClientException, InterruptedException {
TopicRouteData topicRouteData = defaultMQAdminExtImpl.examineTopicRouteInfo("UnitTest");
assertThat(topicRouteData.getBrokerDatas().get(0).getBrokerName()).isEqualTo("default-broker");
assertThat(topicRouteData.getBrokerDatas().get(0).getCluster()).isEqualTo("default-cluster");
}
@Test
public void testGetNameServerAddressList() {
List<String> result = new ArrayList<>();
result.add("default-name-one");
result.add("default-name-two");
when(mqClientInstance.getMQClientAPIImpl().getNameServerAddressList()).thenReturn(result);
List<String> nameList = defaultMQAdminExtImpl.getNameServerAddressList();
assertThat(nameList.get(0)).isEqualTo("default-name-one");
assertThat(nameList.get(1)).isEqualTo("default-name-two");
}
@Test
public void testPutKVConfig() throws RemotingException, MQClientException, InterruptedException {
String topicConfig = defaultMQAdminExtImpl.getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG, "UnitTest");
assertThat(topicConfig).isEqualTo("topicListConfig");
KVTable kvs = defaultMQAdminExtImpl.getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);
assertThat(kvs.getTable().get("broker-name")).isEqualTo("broker-one");
assertThat(kvs.getTable().get("cluster-name")).isEqualTo("default-cluster");
}
@Test
public void testQueryTopicConsumeByWho() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
GroupList groupList = defaultMQAdminExtImpl.queryTopicConsumeByWho("UnitTest");
assertThat(groupList.getGroupList().contains("consumer-group-two")).isTrue();
}
@Test
public void testCleanExpiredConsumerQueueByAddr() throws InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException {
boolean clean = defaultMQAdminExtImpl.cleanExpiredConsumerQueueByAddr("127.0.0.1:10911");
assertThat(clean).isTrue();
}
@Test
public void testGetClusterList() throws InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException {
Set<String> clusterlist = defaultMQAdminExtImpl.getClusterList("UnitTest");
assertThat(clusterlist.contains("default-cluster-one")).isTrue();
assertThat(clusterlist.contains("default-cluster-two")).isTrue();
}
@Test
public void testFetchConsumeStatsInBroker() throws InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException {
ConsumeStatsList result = new ConsumeStatsList();
result.setBrokerAddr("127.0.0.1:10911");
when(mqClientInstance.getMQClientAPIImpl().fetchConsumeStatsInBroker("127.0.0.1:10911", false, 10000)).thenReturn(result);
ConsumeStatsList consumeStatsList = defaultMQAdminExtImpl.fetchConsumeStatsInBroker("127.0.0.1:10911", false, 10000);
assertThat(consumeStatsList.getBrokerAddr()).isEqualTo("127.0.0.1:10911");
}
@Test
public void testGetAllSubscriptionGroup() throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
SubscriptionGroupWrapper subscriptionGroupWrapper = defaultMQAdminExtImpl.getAllSubscriptionGroup("127.0.0.1:10911", 10000);
assertThat(subscriptionGroupWrapper.getSubscriptionGroupTable().get("Consumer-group-one").getBrokerId()).isEqualTo(1234);
assertThat(subscriptionGroupWrapper.getSubscriptionGroupTable().get("Consumer-group-one").getGroupName()).isEqualTo("Consumer-group-one");
assertThat(subscriptionGroupWrapper.getSubscriptionGroupTable().get("Consumer-group-one").isConsumeBroadcastEnable()).isTrue();
}
}
\ No newline at end of file
package org.apache.rocketmq.tools.command;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.MQClientAPIImpl;
import org.apache.rocketmq.client.impl.MQClientManager;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.lang.reflect.Field;
import java.util.*;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class CommandUtilTest {
private DefaultMQAdminExt defaultMQAdminExt;
private DefaultMQAdminExtImpl defaultMQAdminExtImpl;
private MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
private MQClientAPIImpl mQClientAPIImpl;
@Before
public void setup() throws MQClientException, NoSuchFieldException, IllegalAccessException, InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
defaultMQAdminExt = mock(DefaultMQAdminExt.class);
MQClientAPIImpl mQClientAPIImpl = mock(MQClientAPIImpl.class);
defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 3000);
Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance");
field.setAccessible(true);
field.set(defaultMQAdminExtImpl, mqClientInstance);
field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
field.setAccessible(true);
field.set(mqClientInstance, mQClientAPIImpl);
ClusterInfo clusterInfo = new ClusterInfo();
HashMap<String, BrokerData> brokerAddrTable = new HashMap<>();
HashMap<String, Set<String>> clusterAddrTable = new HashMap<>();
HashMap<Long, String> brokerAddrs = new HashMap<>();
brokerAddrs.put(1234l, "127.0.0.1:10911");
BrokerData brokerData = new BrokerData();
brokerData.setBrokerName("default-broker");
brokerData.setCluster("default-cluster");
brokerData.setBrokerAddrs(brokerAddrs);
brokerAddrTable.put("default-broker", brokerData);
brokerAddrTable.put("broker-test", new BrokerData());
Set<String> brokerSet = new HashSet<>();
brokerSet.add("default-broker");
brokerSet.add("default-broker-one");
clusterAddrTable.put("default-cluster", brokerSet);
clusterInfo.setBrokerAddrTable(brokerAddrTable);
clusterInfo.setClusterAddrTable(clusterAddrTable);
when(mQClientAPIImpl.getBrokerClusterInfo(anyLong())).thenReturn(clusterInfo);
when(mQClientAPIImpl.cleanExpiredConsumeQueue(anyString(), anyLong())).thenReturn(true);
}
@After
public void shutdown() throws Exception {
}
@Test
public void testFetchMasterAndSlaveDistinguish() throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
Map<String, List<String>> result = CommandUtil.fetchMasterAndSlaveDistinguish(defaultMQAdminExtImpl, "default-cluster");
assertThat(result.get(null).get(0)).isEqualTo("127.0.0.1:10911");
}
@Test
public void testFetchMasterAddrByClusterName() throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
Set<String> result = CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExtImpl, "default-cluster");
assertThat(result.size()).isEqualTo(0);
}
@Test
public void testFetchBrokerNameByClusterName() throws Exception {
Set<String> result = CommandUtil.fetchBrokerNameByClusterName(defaultMQAdminExtImpl, "default-cluster");
assertThat(result.contains("default-broker")).isTrue();
assertThat(result.contains("default-broker-one")).isTrue();
assertThat(result.size()).isEqualTo(2);
}
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册