diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java index 9cc7c604fb23de789d57e1255b94878cd5b0ecfe..393bda928e5da1464acb9292c1f2b73aa7017cd4 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java @@ -211,6 +211,11 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon this.defaultLitePullConsumerImpl.shutdown(); } + @Override + public boolean isRunning() { + return this.defaultLitePullConsumerImpl.isRunning(); + } + @Override public void subscribe(String topic, String subExpression) throws MQClientException { this.defaultLitePullConsumerImpl.subscribe(withNamespace(topic), subExpression); diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java index ce2228803383d7fc1502b0fa5babd9bd1ec5c0ac..25b11046f84d7e98ecadd1b7cf8af6ae80625107 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java @@ -35,6 +35,13 @@ public interface LitePullConsumer { */ void shutdown(); + /** + * This consumer is still running + * + * @return true if consumer is still running + */ + boolean isRunning(); + /** * Subscribe some topic with subExpression * diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java index e3d60ffadde60bd9e44c1af36735b0584998f78a..676c03cef41ccc8891a170b954fe18b48743c189 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java @@ -229,6 +229,10 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { } } + public synchronized boolean isRunning() { + return this.serviceState == ServiceState.RUNNING; + } + public synchronized void start() throws MQClientException { switch (this.serviceState) { case CREATE_JUST: diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java index de2f608382a6e81d05471a38cb4e6c6178cec01b..3726a5b37d5f06c09f0d6ebdd10f9434c8010c77 100644 --- a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java @@ -513,6 +513,32 @@ public class DefaultLitePullConsumerTest { assertThat(offset).isEqualTo(100); } + @Test + public void testConsumerAfterShutdown() throws Exception { + DefaultLitePullConsumer defaultLitePullConsumer = createStartLitePullConsumer(); + defaultLitePullConsumer.setNamesrvAddr("127.0.0.1:9876"); + defaultLitePullConsumer.subscribe(topic, "*"); + new AsyncConsumer().executeAsync(defaultLitePullConsumer); + Thread.sleep(10 * 1000); + defaultLitePullConsumer.shutdown(); + assertThat(defaultLitePullConsumer.isRunning()).isFalse(); + } + + static class AsyncConsumer { + public void executeAsync(final DefaultLitePullConsumer consumer) { + new Thread(new Runnable() { + @Override + public void run() { + while (consumer.isRunning()) { + List poll = consumer.poll(2 * 1000); + System.out.println("consumer is still running"); + } + System.out.println("consumer shutdown"); + } + }).start(); + } + } + private void initDefaultLitePullConsumer(DefaultLitePullConsumer litePullConsumer) throws Exception { Field field = DefaultLitePullConsumer.class.getDeclaredField("defaultLitePullConsumerImpl");