未验证 提交 7953c4f5 编写于 作者: L lebron374 提交者: GitHub

[ISSUE #2044] Fix DefaultLitePullConsumerImpl NPE (#2059)

* fix DefaultLitePullConsumerImpl NPE

* add ut
上级 49a722f4
......@@ -691,7 +691,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
ProcessQueue processQueue = assignedMessageQueue.getProcessQueue(messageQueue);
if (processQueue == null && processQueue.isDropped()) {
if (null == processQueue || processQueue.isDropped()) {
log.info("The message queue not be able to poll, because it's dropped. group={}, messageQueue={}", defaultLitePullConsumer.getConsumerGroup(), this.messageQueue);
return;
}
......
......@@ -81,6 +81,8 @@ public class DefaultLitePullConsumerTest {
private MQClientAPIImpl mQClientAPIImpl;
@Mock
private MQAdminImpl mQAdminImpl;
@Mock
private AssignedMessageQueue assignedMQ;
private RebalanceImpl rebalanceImpl;
private OffsetStore offsetStore;
......@@ -304,6 +306,53 @@ public class DefaultLitePullConsumerTest {
}
}
@Test
public void testPullTaskImpl_ProcessQueueNull() throws Exception {
DefaultLitePullConsumer litePullConsumer = createNotStartLitePullConsumer();
try {
MessageQueue messageQueue = createMessageQueue();
litePullConsumer.assign(Collections.singletonList(messageQueue));
Field field = DefaultLitePullConsumer.class.getDeclaredField("defaultLitePullConsumerImpl");
field.setAccessible(true);
// set ProcessQueue dropped = true
DefaultLitePullConsumerImpl localLitePullConsumerImpl = (DefaultLitePullConsumerImpl) field.get(litePullConsumer);
field = DefaultLitePullConsumerImpl.class.getDeclaredField("assignedMessageQueue");
field.setAccessible(true);
when(assignedMQ.isPaused(any(MessageQueue.class))).thenReturn(false);
when(assignedMQ.getProcessQueue(any(MessageQueue.class))).thenReturn(null);
litePullConsumer.start();
field.set(localLitePullConsumerImpl, assignedMQ);
List<MessageExt> result = litePullConsumer.poll(100);
assertThat(result.isEmpty()).isTrue();
} finally {
litePullConsumer.shutdown();
}
}
@Test
public void testPullTaskImpl_ProcessQueueDropped() throws Exception {
DefaultLitePullConsumer litePullConsumer = createNotStartLitePullConsumer();
try {
MessageQueue messageQueue = createMessageQueue();
litePullConsumer.assign(Collections.singletonList(messageQueue));
Field field = DefaultLitePullConsumer.class.getDeclaredField("defaultLitePullConsumerImpl");
field.setAccessible(true);
// set ProcessQueue dropped = true
DefaultLitePullConsumerImpl localLitePullConsumerImpl = (DefaultLitePullConsumerImpl) field.get(litePullConsumer);
field = DefaultLitePullConsumerImpl.class.getDeclaredField("assignedMessageQueue");
field.setAccessible(true);
AssignedMessageQueue assignedMessageQueue = (AssignedMessageQueue) field.get(localLitePullConsumerImpl);
assignedMessageQueue.getProcessQueue(messageQueue).setDropped(true);
litePullConsumer.start();
List<MessageExt> result = litePullConsumer.poll(100);
assertThat(result.isEmpty()).isTrue();
} finally {
litePullConsumer.shutdown();
}
}
@Test
public void testRegisterTopicMessageQueueChangeListener_Success() throws Exception {
flag = false;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册