From bf848c14e1a9540f956aff6856bf4ee62b865816 Mon Sep 17 00:00:00 2001 From: Zhanhui Li Date: Sun, 8 Apr 2018 16:32:31 +0800 Subject: [PATCH] Check if pull message service has shutdown before scheduling pull requests (#277) --- .../impl/consumer/PullMessageService.java | 29 +++++++++++-------- 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullMessageService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullMessageService.java index 5f22f041..bd46a588 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullMessageService.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullMessageService.java @@ -44,13 +44,16 @@ public class PullMessageService extends ServiceThread { } public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) { - this.scheduledExecutorService.schedule(new Runnable() { - - @Override - public void run() { - PullMessageService.this.executePullRequestImmediately(pullRequest); - } - }, timeDelay, TimeUnit.MILLISECONDS); + if (!isStopped()) { + this.scheduledExecutorService.schedule(new Runnable() { + @Override + public void run() { + PullMessageService.this.executePullRequestImmediately(pullRequest); + } + }, timeDelay, TimeUnit.MILLISECONDS); + } else { + log.warn("PullMessageServiceScheduledThread has shutdown"); + } } public void executePullRequestImmediately(final PullRequest pullRequest) { @@ -62,7 +65,11 @@ public class PullMessageService extends ServiceThread { } public void executeTaskLater(final Runnable r, final long timeDelay) { - this.scheduledExecutorService.schedule(r, timeDelay, TimeUnit.MILLISECONDS); + if (!isStopped()) { + this.scheduledExecutorService.schedule(r, timeDelay, TimeUnit.MILLISECONDS); + } else { + log.warn("PullMessageServiceScheduledThread has shutdown"); + } } public ScheduledExecutorService getScheduledExecutorService() { @@ -86,10 +93,8 @@ public class PullMessageService extends ServiceThread { while (!this.isStopped()) { try { PullRequest pullRequest = this.pullRequestQueue.take(); - if (pullRequest != null) { - this.pullMessage(pullRequest); - } - } catch (InterruptedException e) { + this.pullMessage(pullRequest); + } catch (InterruptedException ignored) { } catch (Exception e) { log.error("Pull Message Service Run Method exception", e); } -- GitLab