From 03c1f11d6bc3034daca519de043fd4d2f69bb047 Mon Sep 17 00:00:00 2001 From: Alvin <329772643@qq.com> Date: Sun, 1 Nov 2020 22:56:47 +0800 Subject: [PATCH] [ISSUE #2378] FIx `NullPointerException` when Consumer shutdown in the ClientRemotingProcessor. --- .../client/impl/factory/MQClientInstance.java | 3 ++ .../impl/factory/MQClientInstanceTest.java | 29 +++++++++++++++++++ 2 files changed, 32 insertions(+) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java index b5aaeb8e..d40bdc2d 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java @@ -1214,6 +1214,9 @@ public class MQClientInstance { public ConsumerRunningInfo consumerRunningInfo(final String consumerGroup) { MQConsumerInner mqConsumerInner = this.consumerTable.get(consumerGroup); + if (mqConsumerInner == null) { + return null; + } ConsumerRunningInfo consumerRunningInfo = mqConsumerInner.consumerRunningInfo(); diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java index e0506aa1..a3457e18 100644 --- a/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java @@ -19,6 +19,7 @@ package org.apache.rocketmq.client.impl.factory; import java.util.ArrayList; import java.util.HashMap; import java.util.List; +import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.client.ClientConfig; @@ -29,6 +30,8 @@ import org.apache.rocketmq.client.impl.MQClientManager; import org.apache.rocketmq.client.impl.consumer.MQConsumerInner; import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl; import org.apache.rocketmq.client.impl.producer.TopicPublishInfo; +import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo; +import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; import org.apache.rocketmq.common.protocol.route.BrokerData; import org.apache.rocketmq.common.protocol.route.QueueData; import org.apache.rocketmq.common.protocol.route.TopicRouteData; @@ -41,6 +44,7 @@ import org.mockito.junit.MockitoJUnitRunner; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; @RunWith(MockitoJUnitRunner.class) public class MQClientInstanceTest { @@ -139,6 +143,31 @@ public class MQClientInstanceTest { assertThat(flag).isTrue(); } + + @Test + public void testConsumerRunningInfoWhenConsumersIsEmptyOrNot() throws RemotingException, InterruptedException, MQBrokerException { + MQConsumerInner mockConsumerInner = mock(MQConsumerInner.class); + ConsumerRunningInfo mockConsumerRunningInfo = mock(ConsumerRunningInfo.class); + when(mockConsumerInner.consumerRunningInfo()).thenReturn(mockConsumerRunningInfo); + when(mockConsumerInner.consumeType()).thenReturn(ConsumeType.CONSUME_PASSIVELY); + Properties properties = new Properties(); + when(mockConsumerRunningInfo.getProperties()).thenReturn(properties); + mqClientInstance.unregisterConsumer(group); + + ConsumerRunningInfo runningInfo = mqClientInstance.consumerRunningInfo(group); + assertThat(runningInfo).isNull(); + boolean flag = mqClientInstance.registerConsumer(group, mockConsumerInner); + assertThat(flag).isTrue(); + + runningInfo = mqClientInstance.consumerRunningInfo(group); + assertThat(runningInfo).isNotNull(); + assertThat(mockConsumerInner.consumerRunningInfo().getProperties().get(ConsumerRunningInfo.PROP_CONSUME_TYPE)); + + mqClientInstance.unregisterConsumer(group); + flag = mqClientInstance.registerConsumer(group, mock(MQConsumerInner.class)); + assertThat(flag).isTrue(); + } + @Test public void testRegisterAdminExt() { boolean flag = mqClientInstance.registerAdminExt(group, mock(MQAdminExtInner.class)); -- GitLab