diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java index b5aaeb8e438900146c7bb9d1721edc49b5074190..d40bdc2d991c20d3b3b8f5b3324062b1e9d036dc 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java @@ -1214,6 +1214,9 @@ public class MQClientInstance { public ConsumerRunningInfo consumerRunningInfo(final String consumerGroup) { MQConsumerInner mqConsumerInner = this.consumerTable.get(consumerGroup); + if (mqConsumerInner == null) { + return null; + } ConsumerRunningInfo consumerRunningInfo = mqConsumerInner.consumerRunningInfo(); diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java index e0506aa144a5cd42b78bf0f5aead2fbc2cc0f3fe..a3457e186edf147f9f25ab8d868ecefd56f3bbc8 100644 --- a/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java @@ -19,6 +19,7 @@ package org.apache.rocketmq.client.impl.factory; import java.util.ArrayList; import java.util.HashMap; import java.util.List; +import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.client.ClientConfig; @@ -29,6 +30,8 @@ import org.apache.rocketmq.client.impl.MQClientManager; import org.apache.rocketmq.client.impl.consumer.MQConsumerInner; import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl; import org.apache.rocketmq.client.impl.producer.TopicPublishInfo; +import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo; +import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; import org.apache.rocketmq.common.protocol.route.BrokerData; import org.apache.rocketmq.common.protocol.route.QueueData; import org.apache.rocketmq.common.protocol.route.TopicRouteData; @@ -41,6 +44,7 @@ import org.mockito.junit.MockitoJUnitRunner; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; @RunWith(MockitoJUnitRunner.class) public class MQClientInstanceTest { @@ -139,6 +143,31 @@ public class MQClientInstanceTest { assertThat(flag).isTrue(); } + + @Test + public void testConsumerRunningInfoWhenConsumersIsEmptyOrNot() throws RemotingException, InterruptedException, MQBrokerException { + MQConsumerInner mockConsumerInner = mock(MQConsumerInner.class); + ConsumerRunningInfo mockConsumerRunningInfo = mock(ConsumerRunningInfo.class); + when(mockConsumerInner.consumerRunningInfo()).thenReturn(mockConsumerRunningInfo); + when(mockConsumerInner.consumeType()).thenReturn(ConsumeType.CONSUME_PASSIVELY); + Properties properties = new Properties(); + when(mockConsumerRunningInfo.getProperties()).thenReturn(properties); + mqClientInstance.unregisterConsumer(group); + + ConsumerRunningInfo runningInfo = mqClientInstance.consumerRunningInfo(group); + assertThat(runningInfo).isNull(); + boolean flag = mqClientInstance.registerConsumer(group, mockConsumerInner); + assertThat(flag).isTrue(); + + runningInfo = mqClientInstance.consumerRunningInfo(group); + assertThat(runningInfo).isNotNull(); + assertThat(mockConsumerInner.consumerRunningInfo().getProperties().get(ConsumerRunningInfo.PROP_CONSUME_TYPE)); + + mqClientInstance.unregisterConsumer(group); + flag = mqClientInstance.registerConsumer(group, mock(MQConsumerInner.class)); + assertThat(flag).isTrue(); + } + @Test public void testRegisterAdminExt() { boolean flag = mqClientInstance.registerAdminExt(group, mock(MQAdminExtInner.class));