未验证 提交 9ddcab41 编写于 作者: anotherJJz's avatar anotherJJz 提交者: GitHub

[ISSUE apache#2152] Add isRunning method in DefaultLitePullConsumerImpl class and test suit (#2302)

上级 2a8ba5a7
...@@ -211,6 +211,11 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon ...@@ -211,6 +211,11 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
this.defaultLitePullConsumerImpl.shutdown(); this.defaultLitePullConsumerImpl.shutdown();
} }
@Override
public boolean isRunning() {
return this.defaultLitePullConsumerImpl.isRunning();
}
@Override @Override
public void subscribe(String topic, String subExpression) throws MQClientException { public void subscribe(String topic, String subExpression) throws MQClientException {
this.defaultLitePullConsumerImpl.subscribe(withNamespace(topic), subExpression); this.defaultLitePullConsumerImpl.subscribe(withNamespace(topic), subExpression);
......
...@@ -35,6 +35,13 @@ public interface LitePullConsumer { ...@@ -35,6 +35,13 @@ public interface LitePullConsumer {
*/ */
void shutdown(); void shutdown();
/**
* This consumer is still running
*
* @return true if consumer is still running
*/
boolean isRunning();
/** /**
* Subscribe some topic with subExpression * Subscribe some topic with subExpression
* *
......
...@@ -229,6 +229,10 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { ...@@ -229,6 +229,10 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
} }
} }
public synchronized boolean isRunning() {
return this.serviceState == ServiceState.RUNNING;
}
public synchronized void start() throws MQClientException { public synchronized void start() throws MQClientException {
switch (this.serviceState) { switch (this.serviceState) {
case CREATE_JUST: case CREATE_JUST:
......
...@@ -513,6 +513,32 @@ public class DefaultLitePullConsumerTest { ...@@ -513,6 +513,32 @@ public class DefaultLitePullConsumerTest {
assertThat(offset).isEqualTo(100); assertThat(offset).isEqualTo(100);
} }
@Test
public void testConsumerAfterShutdown() throws Exception {
DefaultLitePullConsumer defaultLitePullConsumer = createStartLitePullConsumer();
defaultLitePullConsumer.setNamesrvAddr("127.0.0.1:9876");
defaultLitePullConsumer.subscribe(topic, "*");
new AsyncConsumer().executeAsync(defaultLitePullConsumer);
Thread.sleep(10 * 1000);
defaultLitePullConsumer.shutdown();
assertThat(defaultLitePullConsumer.isRunning()).isFalse();
}
static class AsyncConsumer {
public void executeAsync(final DefaultLitePullConsumer consumer) {
new Thread(new Runnable() {
@Override
public void run() {
while (consumer.isRunning()) {
List<MessageExt> poll = consumer.poll(2 * 1000);
System.out.println("consumer is still running");
}
System.out.println("consumer shutdown");
}
}).start();
}
}
private void initDefaultLitePullConsumer(DefaultLitePullConsumer litePullConsumer) throws Exception { private void initDefaultLitePullConsumer(DefaultLitePullConsumer litePullConsumer) throws Exception {
Field field = DefaultLitePullConsumer.class.getDeclaredField("defaultLitePullConsumerImpl"); Field field = DefaultLitePullConsumer.class.getDeclaredField("defaultLitePullConsumerImpl");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册