未验证 提交 2820e477 编写于 作者: G guyinyou 提交者: GitHub

[ISSUE #3215] polish litePullConsumer seek logic #3216

上级 a7e71cb4
......@@ -593,8 +593,19 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
}
final Object objLock = messageQueueLock.fetchLockObject(messageQueue);
synchronized (objLock) {
assignedMessageQueue.setSeekOffset(messageQueue, offset);
clearMessageQueueInCache(messageQueue);
PullTaskImpl oldPullTaskImpl = this.taskTable.get(messageQueue);
if (oldPullTaskImpl != null) {
oldPullTaskImpl.tryInterrupt();
this.taskTable.remove(messageQueue);
}
assignedMessageQueue.setSeekOffset(messageQueue, offset);
if (!this.taskTable.containsKey(messageQueue)) {
PullTaskImpl pullTask = new PullTaskImpl(messageQueue);
this.taskTable.put(messageQueue, pullTask);
this.scheduledThreadPoolExecutor.schedule(pullTask, 0, TimeUnit.MILLISECONDS);
}
}
}
......@@ -718,16 +729,29 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
public class PullTaskImpl implements Runnable {
private final MessageQueue messageQueue;
private volatile boolean cancelled = false;
private Thread currentThread;
public PullTaskImpl(final MessageQueue messageQueue) {
this.messageQueue = messageQueue;
}
public void tryInterrupt() {
setCancelled(true);
if (currentThread == null) {
return;
}
if (!currentThread.isInterrupted()) {
currentThread.interrupt();
}
}
@Override
public void run() {
if (!this.isCancelled()) {
this.currentThread = Thread.currentThread();
if (assignedMessageQueue.isPaused(messageQueue)) {
scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_PAUSE, TimeUnit.MILLISECONDS);
log.debug("Message Queue: {} has been paused!", messageQueue);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册