From 2820e477ec8586a397906a46e255898d1b08bd74 Mon Sep 17 00:00:00 2001 From: guyinyou <36399867+guyinyou@users.noreply.github.com> Date: Mon, 23 Aug 2021 11:43:28 +0800 Subject: [PATCH] [ISSUE #3215] polish litePullConsumer seek logic #3216 --- .../consumer/DefaultLitePullConsumerImpl.java | 28 +++++++++++++++++-- 1 file changed, 26 insertions(+), 2 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java index d28d23ad..46a72fb5 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java @@ -593,8 +593,19 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { } final Object objLock = messageQueueLock.fetchLockObject(messageQueue); synchronized (objLock) { - assignedMessageQueue.setSeekOffset(messageQueue, offset); clearMessageQueueInCache(messageQueue); + + PullTaskImpl oldPullTaskImpl = this.taskTable.get(messageQueue); + if (oldPullTaskImpl != null) { + oldPullTaskImpl.tryInterrupt(); + this.taskTable.remove(messageQueue); + } + assignedMessageQueue.setSeekOffset(messageQueue, offset); + if (!this.taskTable.containsKey(messageQueue)) { + PullTaskImpl pullTask = new PullTaskImpl(messageQueue); + this.taskTable.put(messageQueue, pullTask); + this.scheduledThreadPoolExecutor.schedule(pullTask, 0, TimeUnit.MILLISECONDS); + } } } @@ -718,16 +729,29 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { public class PullTaskImpl implements Runnable { private final MessageQueue messageQueue; private volatile boolean cancelled = false; + private Thread currentThread; public PullTaskImpl(final MessageQueue messageQueue) { this.messageQueue = messageQueue; } + public void tryInterrupt() { + setCancelled(true); + if (currentThread == null) { + return; + } + if (!currentThread.isInterrupted()) { + currentThread.interrupt(); + } + } + @Override public void run() { if (!this.isCancelled()) { + this.currentThread = Thread.currentThread(); + if (assignedMessageQueue.isPaused(messageQueue)) { scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_PAUSE, TimeUnit.MILLISECONDS); log.debug("Message Queue: {} has been paused!", messageQueue); @@ -803,7 +827,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { } else { subscriptionData = FilterAPI.buildSubscriptionData(topic, SubscriptionData.SUB_ALL); } - + PullResult pullResult = pull(messageQueue, subscriptionData, offset, defaultLitePullConsumer.getPullBatchSize()); if (this.isCancelled() || processQueue.isDropped()) { return; -- GitLab