diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java index d28d23ad6bbb5c5f765ef8f1e470dcf00a15d237..46a72fb5e848e8fff6848ffd2a40128bdedddc94 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java @@ -593,8 +593,19 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { } final Object objLock = messageQueueLock.fetchLockObject(messageQueue); synchronized (objLock) { - assignedMessageQueue.setSeekOffset(messageQueue, offset); clearMessageQueueInCache(messageQueue); + + PullTaskImpl oldPullTaskImpl = this.taskTable.get(messageQueue); + if (oldPullTaskImpl != null) { + oldPullTaskImpl.tryInterrupt(); + this.taskTable.remove(messageQueue); + } + assignedMessageQueue.setSeekOffset(messageQueue, offset); + if (!this.taskTable.containsKey(messageQueue)) { + PullTaskImpl pullTask = new PullTaskImpl(messageQueue); + this.taskTable.put(messageQueue, pullTask); + this.scheduledThreadPoolExecutor.schedule(pullTask, 0, TimeUnit.MILLISECONDS); + } } } @@ -718,16 +729,29 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { public class PullTaskImpl implements Runnable { private final MessageQueue messageQueue; private volatile boolean cancelled = false; + private Thread currentThread; public PullTaskImpl(final MessageQueue messageQueue) { this.messageQueue = messageQueue; } + public void tryInterrupt() { + setCancelled(true); + if (currentThread == null) { + return; + } + if (!currentThread.isInterrupted()) { + currentThread.interrupt(); + } + } + @Override public void run() { if (!this.isCancelled()) { + this.currentThread = Thread.currentThread(); + if (assignedMessageQueue.isPaused(messageQueue)) { scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_PAUSE, TimeUnit.MILLISECONDS); log.debug("Message Queue: {} has been paused!", messageQueue); @@ -803,7 +827,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { } else { subscriptionData = FilterAPI.buildSubscriptionData(topic, SubscriptionData.SUB_ALL); } - + PullResult pullResult = pull(messageQueue, subscriptionData, offset, defaultLitePullConsumer.getPullBatchSize()); if (this.isCancelled() || processQueue.isDropped()) { return;