From 67751d818df898b642c48b8e7500d1e9c4926719 Mon Sep 17 00:00:00 2001 From: King <794220751@qq.com> Date: Thu, 22 Aug 2019 15:26:12 +0800 Subject: [PATCH] Add unit test for lite pull consumer (#1410) * Add unit test for lite pull consumer. * Add synchronized to poll function. --- .../consumer/DefaultLitePullConsumerImpl.java | 4 +- .../consumer/DefaultLitePullConsumerTest.java | 206 +++++++++++++++--- 2 files changed, 178 insertions(+), 32 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java index 5dd9c352..5217a315 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java @@ -327,7 +327,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { log.error("ScheduledTask fetchMessageQueuesAndCompare exception", e); } } - }, 1000 * 20, this.getDefaultLitePullConsumer().getTopicMetadataCheckIntervalMillis(), TimeUnit.MILLISECONDS); + }, 1000 * 10, this.getDefaultLitePullConsumer().getTopicMetadataCheckIntervalMillis(), TimeUnit.MILLISECONDS); } private void operateAfterRunning() throws MQClientException { @@ -491,7 +491,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { } } - public List poll(long timeout) { + public synchronized List poll(long timeout) { try { checkServiceState(); if (timeout < 0) diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java index cbcc7392..7d496acf 100644 --- a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java @@ -40,6 +40,7 @@ import org.apache.rocketmq.client.impl.consumer.PullResultExt; import org.apache.rocketmq.client.impl.consumer.RebalanceImpl; import org.apache.rocketmq.client.impl.consumer.RebalanceService; import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.message.MessageClientExt; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; @@ -83,6 +84,7 @@ public class DefaultLitePullConsumerTest { private String consumerGroup = "LitePullConsumerGroup"; private String topic = "LitePullConsumerTest"; private String brokerName = "BrokerA"; + private boolean flag = false; @Before public void init() throws Exception { @@ -155,7 +157,7 @@ public class DefaultLitePullConsumerTest { } @Test - public void testFetchMesseageQueues_FetchMessageQueuesBeforeStart() throws Exception { + public void testFetchMessageQueues_FetchMessageQueuesBeforeStart() throws Exception { DefaultLitePullConsumer litePullConsumer = createNotStartLitePullConsumer(); try { litePullConsumer.fetchMessageQueues(topic); @@ -167,6 +169,22 @@ public class DefaultLitePullConsumerTest { } } + @Test + public void testSeek_SeekOffsetSuccess() throws Exception { + DefaultLitePullConsumer litePullConsumer = createStartLitePullConsumer(); + when(mQAdminImpl.minOffset(any(MessageQueue.class))).thenReturn(0L); + when(mQAdminImpl.maxOffset(any(MessageQueue.class))).thenReturn(500L); + MessageQueue messageQueue = createMessageQueue(); + litePullConsumer.assign(Collections.singletonList(messageQueue)); + long offset = litePullConsumer.committed(messageQueue); + litePullConsumer.seek(messageQueue, offset); + Field field = DefaultLitePullConsumerImpl.class.getDeclaredField("assignedMessageQueue"); + field.setAccessible(true); + AssignedMessageQueue assignedMessageQueue = (AssignedMessageQueue) field.get(litePullConsumerImpl); + assertEquals(assignedMessageQueue.getSeekOffset(messageQueue), offset); + litePullConsumer.shutdown(); + } + @Test public void testSeek_SeekOffsetIllegal() throws Exception { DefaultLitePullConsumer litePullConsumer = createStartLitePullConsumer(); @@ -190,21 +208,6 @@ public class DefaultLitePullConsumerTest { litePullConsumer.shutdown(); } - @Test - public void testSeek_SeekOffsetSuccess() throws Exception { - DefaultLitePullConsumer litePullConsumer = createStartLitePullConsumer(); - when(mQAdminImpl.minOffset(any(MessageQueue.class))).thenReturn(0L); - when(mQAdminImpl.maxOffset(any(MessageQueue.class))).thenReturn(100L); - MessageQueue messageQueue = createMessageQueue(); - litePullConsumer.assign(Collections.singletonList(messageQueue)); - litePullConsumer.seek(messageQueue, 50); - Field field = DefaultLitePullConsumerImpl.class.getDeclaredField("assignedMessageQueue"); - field.setAccessible(true); - AssignedMessageQueue assignedMessageQueue = (AssignedMessageQueue) field.get(litePullConsumerImpl); - assertEquals(assignedMessageQueue.getSeekOffset(messageQueue), 50); - litePullConsumer.shutdown(); - } - @Test public void testSeek_MessageQueueNotInAssignList() throws Exception { DefaultLitePullConsumer litePullConsumer = createStartLitePullConsumer(); @@ -216,6 +219,34 @@ public class DefaultLitePullConsumerTest { } finally { litePullConsumer.shutdown(); } + + litePullConsumer = createSubscribeLitePullConsumer(); + try { + litePullConsumer.seek(createMessageQueue(), 0); + failBecauseExceptionWasNotThrown(MQClientException.class); + } catch (MQClientException e) { + assertThat(e).hasMessageContaining("The message queue is not in assigned list, may be rebalancing"); + } finally { + litePullConsumer.shutdown(); + } + } + + @Test + public void testOffsetForTimestamp_FailedAndSuccess() throws Exception { + MessageQueue messageQueue = createMessageQueue(); + DefaultLitePullConsumer litePullConsumer = createNotStartLitePullConsumer(); + try { + litePullConsumer.offsetForTimestamp(messageQueue, 123456L); + failBecauseExceptionWasNotThrown(IllegalStateException.class); + } catch (IllegalStateException e) { + assertThat(e).hasMessageContaining("The consumer not running, please start it first."); + } finally { + litePullConsumer.shutdown(); + } + doReturn(123L).when(mQAdminImpl).searchOffset(any(MessageQueue.class), anyLong()); + litePullConsumer = createStartLitePullConsumer(); + long offset = litePullConsumer.offsetForTimestamp(messageQueue, 123456L); + assertThat(offset).isEqualTo(123L); } @Test @@ -238,12 +269,120 @@ public class DefaultLitePullConsumerTest { } } - private MessageQueue createMessageQueue() { - MessageQueue messageQueue = new MessageQueue(); - messageQueue.setBrokerName(brokerName); - messageQueue.setQueueId(0); - messageQueue.setTopic(topic); - return messageQueue; + @Test + public void testRegisterTopicMessageQueueChangeListener_Success() throws Exception { + flag = false; + DefaultLitePullConsumer litePullConsumer = createStartLitePullConsumer(); + doReturn(Collections.emptySet()).when(mQAdminImpl).fetchSubscribeMessageQueues(anyString()); + litePullConsumer.setTopicMetadataCheckIntervalMillis(10); + litePullConsumer.registerTopicMessageQueueChangeListener(topic, new TopicMessageQueueChangeListener() { + @Override public void onChanged(String topic, Set messageQueues) { + flag = true; + } + }); + Set set = new HashSet(); + set.add(createMessageQueue()); + doReturn(set).when(mQAdminImpl).fetchSubscribeMessageQueues(anyString()); + Thread.sleep(11 * 1000); + assertThat(flag).isTrue(); + } + + @Test + public void testFlowControl_Success() throws Exception { + DefaultLitePullConsumer litePullConsumer = createStartLitePullConsumer(); + try { + MessageQueue messageQueue = createMessageQueue(); + litePullConsumer.setPullThresholdForAll(-1); + litePullConsumer.assign(Collections.singletonList(messageQueue)); + litePullConsumer.setPollTimeoutMillis(500); + List result = litePullConsumer.poll(); + assertThat(result).isEmpty(); + } finally { + litePullConsumer.shutdown(); + } + + litePullConsumer = createStartLitePullConsumer(); + try { + MessageQueue messageQueue = createMessageQueue(); + litePullConsumer.setPullThresholdForQueue(-1); + litePullConsumer.assign(Collections.singletonList(messageQueue)); + litePullConsumer.setPollTimeoutMillis(500); + List result = litePullConsumer.poll(); + assertThat(result).isEmpty(); + } finally { + litePullConsumer.shutdown(); + } + + litePullConsumer = createStartLitePullConsumer(); + try { + MessageQueue messageQueue = createMessageQueue(); + litePullConsumer.setPullThresholdSizeForQueue(-1); + litePullConsumer.assign(Collections.singletonList(messageQueue)); + litePullConsumer.setPollTimeoutMillis(500); + List result = litePullConsumer.poll(); + assertThat(result).isEmpty(); + } finally { + litePullConsumer.shutdown(); + } + + litePullConsumer = createStartLitePullConsumer(); + try { + MessageQueue messageQueue = createMessageQueue(); + litePullConsumer.setConsumeMaxSpan(-1); + litePullConsumer.assign(Collections.singletonList(messageQueue)); + litePullConsumer.setPollTimeoutMillis(500); + List result = litePullConsumer.poll(); + assertThat(result).isEmpty(); + } finally { + litePullConsumer.shutdown(); + } + } + + @Test + public void testCheckConfig_Exception() { + DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer(MixAll.DEFAULT_CONSUMER_GROUP); + try { + litePullConsumer.start(); + failBecauseExceptionWasNotThrown(MQClientException.class); + } catch (MQClientException e) { + assertThat(e).hasMessageContaining("consumerGroup can not equal"); + } finally { + litePullConsumer.shutdown(); + } + + litePullConsumer = new DefaultLitePullConsumer(consumerGroup + System.currentTimeMillis()); + litePullConsumer.setMessageModel(null); + try { + litePullConsumer.start(); + failBecauseExceptionWasNotThrown(MQClientException.class); + } catch (MQClientException e) { + assertThat(e).hasMessageContaining("messageModel is null"); + } finally { + litePullConsumer.shutdown(); + } + + litePullConsumer = new DefaultLitePullConsumer(consumerGroup + System.currentTimeMillis()); + litePullConsumer.setAllocateMessageQueueStrategy(null); + try { + litePullConsumer.start(); + failBecauseExceptionWasNotThrown(MQClientException.class); + } catch (MQClientException e) { + assertThat(e).hasMessageContaining("allocateMessageQueueStrategy is null"); + } finally { + litePullConsumer.shutdown(); + } + + litePullConsumer = new DefaultLitePullConsumer(consumerGroup + System.currentTimeMillis()); + litePullConsumer.setConsumerTimeoutMillisWhenSuspend(1); + try { + litePullConsumer.start(); + failBecauseExceptionWasNotThrown(MQClientException.class); + } catch (MQClientException e) { + assertThat(e).hasMessageContaining("Long polling mode, the consumer consumerTimeoutMillisWhenSuspend must greater than brokerSuspendMaxTimeMillis"); + } finally { + litePullConsumer.shutdown(); + } + } private void initDefaultLitePullConsumer(DefaultLitePullConsumer litePullConsumer) throws Exception { @@ -310,33 +449,40 @@ public class DefaultLitePullConsumerTest { DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer(consumerGroup + System.currentTimeMillis()); litePullConsumer.setNamesrvAddr("127.0.0.1:9876"); litePullConsumer.subscribe(topic, "*"); - litePullConsumer.start(); initDefaultLitePullConsumer(litePullConsumer); return litePullConsumer; } - private DefaultLitePullConsumer createBroadcastLitePullConsumer() throws Exception { + private DefaultLitePullConsumer createStartLitePullConsumer() throws Exception { DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer(consumerGroup + System.currentTimeMillis()); litePullConsumer.setNamesrvAddr("127.0.0.1:9876"); - litePullConsumer.setMessageModel(MessageModel.BROADCASTING); - litePullConsumer.subscribe(topic, "*"); litePullConsumer.start(); initDefaultLitePullConsumer(litePullConsumer); return litePullConsumer; } - private DefaultLitePullConsumer createStartLitePullConsumer() throws Exception { + private DefaultLitePullConsumer createNotStartLitePullConsumer() { + DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer(consumerGroup + System.currentTimeMillis()); + return litePullConsumer; + } + + private DefaultLitePullConsumer createBroadcastLitePullConsumer() throws Exception { DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer(consumerGroup + System.currentTimeMillis()); litePullConsumer.setNamesrvAddr("127.0.0.1:9876"); + litePullConsumer.setMessageModel(MessageModel.BROADCASTING); + litePullConsumer.subscribe(topic, "*"); litePullConsumer.start(); initDefaultLitePullConsumer(litePullConsumer); return litePullConsumer; } - private DefaultLitePullConsumer createNotStartLitePullConsumer() { - DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer(consumerGroup + System.currentTimeMillis()); - return litePullConsumer; + private MessageQueue createMessageQueue() { + MessageQueue messageQueue = new MessageQueue(); + messageQueue.setBrokerName(brokerName); + messageQueue.setQueueId(0); + messageQueue.setTopic(topic); + return messageQueue; } private PullResultExt createPullResult(PullMessageRequestHeader requestHeader, PullStatus pullStatus, -- GitLab