From 02e2abb5eecf76ea2cd5ab2d30a6927441c60067 Mon Sep 17 00:00:00 2001 From: zhangjidi2016 <1017543663@qq.com> Date: Fri, 8 Mar 2019 17:17:51 +0800 Subject: [PATCH] [RIP-10] Add test cases for ConsumerRunningInfo (#923) --- .../body/ConsumerRunningInfoTest.java | 108 ++++++++++++++++++ 1 file changed, 108 insertions(+) create mode 100644 common/src/test/java/org/apache/rocketmq/common/protocol/body/ConsumerRunningInfoTest.java diff --git a/common/src/test/java/org/apache/rocketmq/common/protocol/body/ConsumerRunningInfoTest.java b/common/src/test/java/org/apache/rocketmq/common/protocol/body/ConsumerRunningInfoTest.java new file mode 100644 index 00000000..b3718938 --- /dev/null +++ b/common/src/test/java/org/apache/rocketmq/common/protocol/body/ConsumerRunningInfoTest.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.protocol.body; + +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; +import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; +import org.apache.rocketmq.remoting.protocol.RemotingSerializable; +import org.junit.Before; +import org.junit.Test; + +import java.util.Properties; +import java.util.TreeMap; +import java.util.TreeSet; + +import static org.apache.rocketmq.common.protocol.heartbeat.ConsumeType.CONSUME_ACTIVELY; + +import static org.assertj.core.api.Assertions.assertThat; + +public class ConsumerRunningInfoTest { + + private ConsumerRunningInfo consumerRunningInfo; + + private TreeMap criTable; + + private MessageQueue messageQueue; + + @Before + public void init() { + consumerRunningInfo = new ConsumerRunningInfo(); + consumerRunningInfo.setJstack("test"); + + TreeMap mqTable = new TreeMap(); + messageQueue = new MessageQueue("topicA","broker", 1); + mqTable.put(messageQueue, new ProcessQueueInfo()); + consumerRunningInfo.setMqTable(mqTable); + + TreeMap statusTable = new TreeMap(); + statusTable.put("topicA", new ConsumeStatus()); + consumerRunningInfo.setStatusTable(statusTable); + + TreeSet subscriptionSet = new TreeSet(); + subscriptionSet.add(new SubscriptionData()); + consumerRunningInfo.setSubscriptionSet(subscriptionSet); + + Properties properties = new Properties(); + properties.put(ConsumerRunningInfo.PROP_CONSUME_TYPE, CONSUME_ACTIVELY); + properties.put(ConsumerRunningInfo.PROP_CONSUMER_START_TIMESTAMP, System.currentTimeMillis()); + consumerRunningInfo.setProperties(properties); + + criTable = new TreeMap(); + criTable.put("client_id", consumerRunningInfo); + } + + @Test + public void testFromJson() { + String toJson = RemotingSerializable.toJson(consumerRunningInfo, true); + ConsumerRunningInfo fromJson = RemotingSerializable.fromJson(toJson, ConsumerRunningInfo.class); + + assertThat(fromJson.getJstack()).isEqualTo("test"); + assertThat(fromJson.getProperties().get(ConsumerRunningInfo.PROP_CONSUME_TYPE)).isEqualTo(ConsumeType.CONSUME_ACTIVELY.name()); + + ConsumeStatus consumeStatus = fromJson.getStatusTable().get("topicA"); + assertThat(consumeStatus).isExactlyInstanceOf(ConsumeStatus.class); + + SubscriptionData subscription = fromJson.getSubscriptionSet().first(); + assertThat(subscription).isExactlyInstanceOf(SubscriptionData.class); + + ProcessQueueInfo processQueueInfo = fromJson.getMqTable().get(messageQueue); + assertThat(processQueueInfo).isExactlyInstanceOf(ProcessQueueInfo.class); + } + + @Test + public void testAnalyzeRebalance(){ + boolean result = ConsumerRunningInfo.analyzeRebalance(criTable); + assertThat(result).isTrue(); + } + + @Test + public void testAnalyzeProcessQueue(){ + String result = ConsumerRunningInfo.analyzeProcessQueue("client_id", consumerRunningInfo); + assertThat(result).isEmpty(); + + } + + @Test + public void testAnalyzeSubscription(){ + boolean result = ConsumerRunningInfo.analyzeSubscription(criTable); + assertThat(result).isTrue(); + } + + +} -- GitLab