diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java index fad0b4f1b584d301c7683aac3be36531627e2721..4d18a9be1b7c7fbdb870fef98a9d45075879c142 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java @@ -83,9 +83,12 @@ public class AssignedMessageQueue { return -1; } - public void updatePullOffset(MessageQueue messageQueue, long offset) { + public void updatePullOffset(MessageQueue messageQueue, long offset, ProcessQueue processQueue) { MessageQueueState messageQueueState = assignedMessageQueueState.get(messageQueue); if (messageQueueState != null) { + if (messageQueueState.getProcessQueue() != processQueue) { + return; + } messageQueueState.setPullOffset(offset); } } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java index f832370987ab32c7a83964bac2faad1f3795755e..280de46ff2d6d8a09e202083e48948f38ce10b9c 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java @@ -612,9 +612,9 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { } } - private void updatePullOffset(MessageQueue messageQueue, long nextPullOffset) { + private void updatePullOffset(MessageQueue messageQueue, long nextPullOffset, ProcessQueue processQueue) { if (assignedMessageQueue.getSeekOffset(messageQueue) == -1) { - assignedMessageQueue.updatePullOffset(messageQueue, nextPullOffset); + assignedMessageQueue.updatePullOffset(messageQueue, nextPullOffset, processQueue); } } @@ -740,6 +740,9 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { } long offset = nextPullOffset(messageQueue); + if (this.isCancelled() || processQueue.isDropped()) { + return; + } long pullDelayTimeMills = 0; try { SubscriptionData subscriptionData; @@ -752,7 +755,9 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { } PullResult pullResult = pull(messageQueue, subscriptionData, offset, defaultLitePullConsumer.getPullBatchSize()); - + if (this.isCancelled() || processQueue.isDropped()) { + return; + } switch (pullResult.getPullStatus()) { case FOUND: final Object objLock = messageQueueLock.fetchLockObject(messageQueue); @@ -769,7 +774,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { default: break; } - updatePullOffset(messageQueue, pullResult.getNextBeginOffset()); + updatePullOffset(messageQueue, pullResult.getNextBeginOffset(), processQueue); } catch (Throwable e) { pullDelayTimeMills = pullTimeDelayMillsWhenException; log.error("An error occurred in pull message process.", e);