未验证 提交 03c1f11d 编写于 作者: A Alvin 提交者: GitHub

[ISSUE #2378] FIx `NullPointerException` when Consumer shutdown in the ClientRemotingProcessor.

上级 f5a119f1
......@@ -1214,6 +1214,9 @@ public class MQClientInstance {
public ConsumerRunningInfo consumerRunningInfo(final String consumerGroup) {
MQConsumerInner mqConsumerInner = this.consumerTable.get(consumerGroup);
if (mqConsumerInner == null) {
return null;
}
ConsumerRunningInfo consumerRunningInfo = mqConsumerInner.consumerRunningInfo();
......
......@@ -19,6 +19,7 @@ package org.apache.rocketmq.client.impl.factory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.client.ClientConfig;
......@@ -29,6 +30,8 @@ import org.apache.rocketmq.client.impl.MQClientManager;
import org.apache.rocketmq.client.impl.consumer.MQConsumerInner;
import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
......@@ -41,6 +44,7 @@ import org.mockito.junit.MockitoJUnitRunner;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class MQClientInstanceTest {
......@@ -139,6 +143,31 @@ public class MQClientInstanceTest {
assertThat(flag).isTrue();
}
@Test
public void testConsumerRunningInfoWhenConsumersIsEmptyOrNot() throws RemotingException, InterruptedException, MQBrokerException {
MQConsumerInner mockConsumerInner = mock(MQConsumerInner.class);
ConsumerRunningInfo mockConsumerRunningInfo = mock(ConsumerRunningInfo.class);
when(mockConsumerInner.consumerRunningInfo()).thenReturn(mockConsumerRunningInfo);
when(mockConsumerInner.consumeType()).thenReturn(ConsumeType.CONSUME_PASSIVELY);
Properties properties = new Properties();
when(mockConsumerRunningInfo.getProperties()).thenReturn(properties);
mqClientInstance.unregisterConsumer(group);
ConsumerRunningInfo runningInfo = mqClientInstance.consumerRunningInfo(group);
assertThat(runningInfo).isNull();
boolean flag = mqClientInstance.registerConsumer(group, mockConsumerInner);
assertThat(flag).isTrue();
runningInfo = mqClientInstance.consumerRunningInfo(group);
assertThat(runningInfo).isNotNull();
assertThat(mockConsumerInner.consumerRunningInfo().getProperties().get(ConsumerRunningInfo.PROP_CONSUME_TYPE));
mqClientInstance.unregisterConsumer(group);
flag = mqClientInstance.registerConsumer(group, mock(MQConsumerInner.class));
assertThat(flag).isTrue();
}
@Test
public void testRegisterAdminExt() {
boolean flag = mqClientInstance.registerAdminExt(group, mock(MQAdminExtInner.class));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册