diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullMessageService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullMessageService.java index 5f22f041eb2b52c993fdc229916eeb2c92c90cd8..bd46a58859acf59714eeba794ad5fa4ac247bebf 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullMessageService.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullMessageService.java @@ -44,13 +44,16 @@ public class PullMessageService extends ServiceThread { } public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) { - this.scheduledExecutorService.schedule(new Runnable() { - - @Override - public void run() { - PullMessageService.this.executePullRequestImmediately(pullRequest); - } - }, timeDelay, TimeUnit.MILLISECONDS); + if (!isStopped()) { + this.scheduledExecutorService.schedule(new Runnable() { + @Override + public void run() { + PullMessageService.this.executePullRequestImmediately(pullRequest); + } + }, timeDelay, TimeUnit.MILLISECONDS); + } else { + log.warn("PullMessageServiceScheduledThread has shutdown"); + } } public void executePullRequestImmediately(final PullRequest pullRequest) { @@ -62,7 +65,11 @@ public class PullMessageService extends ServiceThread { } public void executeTaskLater(final Runnable r, final long timeDelay) { - this.scheduledExecutorService.schedule(r, timeDelay, TimeUnit.MILLISECONDS); + if (!isStopped()) { + this.scheduledExecutorService.schedule(r, timeDelay, TimeUnit.MILLISECONDS); + } else { + log.warn("PullMessageServiceScheduledThread has shutdown"); + } } public ScheduledExecutorService getScheduledExecutorService() { @@ -86,10 +93,8 @@ public class PullMessageService extends ServiceThread { while (!this.isStopped()) { try { PullRequest pullRequest = this.pullRequestQueue.take(); - if (pullRequest != null) { - this.pullMessage(pullRequest); - } - } catch (InterruptedException e) { + this.pullMessage(pullRequest); + } catch (InterruptedException ignored) { } catch (Exception e) { log.error("Pull Message Service Run Method exception", e); }