diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java index f54078fcfdc687a650444e147d5cbb59779b6fe9..e3d60ffadde60bd9e44c1af36735b0584998f78a 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java @@ -691,7 +691,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { ProcessQueue processQueue = assignedMessageQueue.getProcessQueue(messageQueue); - if (processQueue == null && processQueue.isDropped()) { + if (null == processQueue || processQueue.isDropped()) { log.info("The message queue not be able to poll, because it's dropped. group={}, messageQueue={}", defaultLitePullConsumer.getConsumerGroup(), this.messageQueue); return; } diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java index cc8d5e2bf78a8ba0c840c1ec4e1045b89f931009..de2f608382a6e81d05471a38cb4e6c6178cec01b 100644 --- a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java @@ -81,6 +81,8 @@ public class DefaultLitePullConsumerTest { private MQClientAPIImpl mQClientAPIImpl; @Mock private MQAdminImpl mQAdminImpl; + @Mock + private AssignedMessageQueue assignedMQ; private RebalanceImpl rebalanceImpl; private OffsetStore offsetStore; @@ -304,6 +306,53 @@ public class DefaultLitePullConsumerTest { } } + @Test + public void testPullTaskImpl_ProcessQueueNull() throws Exception { + DefaultLitePullConsumer litePullConsumer = createNotStartLitePullConsumer(); + try { + MessageQueue messageQueue = createMessageQueue(); + litePullConsumer.assign(Collections.singletonList(messageQueue)); + Field field = DefaultLitePullConsumer.class.getDeclaredField("defaultLitePullConsumerImpl"); + field.setAccessible(true); + // set ProcessQueue dropped = true + DefaultLitePullConsumerImpl localLitePullConsumerImpl = (DefaultLitePullConsumerImpl) field.get(litePullConsumer); + field = DefaultLitePullConsumerImpl.class.getDeclaredField("assignedMessageQueue"); + field.setAccessible(true); + when(assignedMQ.isPaused(any(MessageQueue.class))).thenReturn(false); + when(assignedMQ.getProcessQueue(any(MessageQueue.class))).thenReturn(null); + litePullConsumer.start(); + field.set(localLitePullConsumerImpl, assignedMQ); + + List result = litePullConsumer.poll(100); + assertThat(result.isEmpty()).isTrue(); + } finally { + litePullConsumer.shutdown(); + } + } + + @Test + public void testPullTaskImpl_ProcessQueueDropped() throws Exception { + DefaultLitePullConsumer litePullConsumer = createNotStartLitePullConsumer(); + try { + MessageQueue messageQueue = createMessageQueue(); + litePullConsumer.assign(Collections.singletonList(messageQueue)); + Field field = DefaultLitePullConsumer.class.getDeclaredField("defaultLitePullConsumerImpl"); + field.setAccessible(true); + // set ProcessQueue dropped = true + DefaultLitePullConsumerImpl localLitePullConsumerImpl = (DefaultLitePullConsumerImpl) field.get(litePullConsumer); + field = DefaultLitePullConsumerImpl.class.getDeclaredField("assignedMessageQueue"); + field.setAccessible(true); + AssignedMessageQueue assignedMessageQueue = (AssignedMessageQueue) field.get(localLitePullConsumerImpl); + assignedMessageQueue.getProcessQueue(messageQueue).setDropped(true); + litePullConsumer.start(); + + List result = litePullConsumer.poll(100); + assertThat(result.isEmpty()).isTrue(); + } finally { + litePullConsumer.shutdown(); + } + } + @Test public void testRegisterTopicMessageQueueChangeListener_Success() throws Exception { flag = false;