From da589b19755c0c73830718f5d3917d4fdba70e26 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=96=9C=E9=98=B3?= Date: Tue, 25 May 2021 22:13:12 +0800 Subject: [PATCH] [ISSUE #2708] Client may submit wrong offset when network instability --- .../impl/consumer/DefaultLitePullConsumerImpl.java | 5 ++++- .../impl/consumer/DefaultMQPushConsumerImpl.java | 8 +++++--- .../rocketmq/client/impl/consumer/PullRequest.java | 10 +++++----- .../rocketmq/client/impl/consumer/RebalanceImpl.java | 2 +- 4 files changed, 15 insertions(+), 10 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java index 32b05d55..1e6b8dc9 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java @@ -115,6 +115,8 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { */ private static final long PULL_TIME_DELAY_MILLS_WHEN_PAUSE = 1000; + private static final long PULL_TIME_DELAY_MILLS_ON_EXCEPTION = 3 * 1000; + private DefaultLitePullConsumer defaultLitePullConsumer; private final ConcurrentMap taskTable = @@ -743,7 +745,8 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { try { offset = nextPullOffset(messageQueue); } catch (MQClientException e) { - log.error("get next pull offset failed, maybe timeout exception"); + log.error("Failed to get next pull offset", e); + scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_ON_EXCEPTION, TimeUnit.MILLISECONDS); return; } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java index 52cd25fa..e3db9a35 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java @@ -269,12 +269,14 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { } } else { if (processQueue.isLocked()) { - if (!pullRequest.isLockedFirst()) { + if (!pullRequest.isPreviouslyLocked()) { long offset = -1L; try { offset = this.rebalanceImpl.computePullFromWhereWithException(pullRequest.getMessageQueue()); } catch (MQClientException e) { - log.error("compute consume offset failed, maybe timeout exception"); + this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); + log.error("Failed to compute pull offset", e); + return; } boolean brokerBusy = offset < pullRequest.getNextOffset(); log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}", @@ -284,7 +286,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { pullRequest, offset); } - pullRequest.setLockedFirst(true); + pullRequest.setPreviouslyLocked(true); pullRequest.setNextOffset(offset); } } else { diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullRequest.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullRequest.java index 10aded07..bf03ec38 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullRequest.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullRequest.java @@ -23,14 +23,14 @@ public class PullRequest { private MessageQueue messageQueue; private ProcessQueue processQueue; private long nextOffset; - private boolean lockedFirst = false; + private boolean previouslyLocked = false; - public boolean isLockedFirst() { - return lockedFirst; + public boolean isPreviouslyLocked() { + return previouslyLocked; } - public void setLockedFirst(boolean lockedFirst) { - this.lockedFirst = lockedFirst; + public void setPreviouslyLocked(boolean previouslyLocked) { + this.previouslyLocked = previouslyLocked; } public String getConsumerGroup() { diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java index 8d31314b..833d465a 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java @@ -375,7 +375,7 @@ public abstract class RebalanceImpl { this.removeDirtyOffset(mq); ProcessQueue pq = new ProcessQueue(); - long nextOffset = 0L; + long nextOffset = -1L; try { nextOffset = this.computePullFromWhereWithException(mq); } catch (MQClientException e) { -- GitLab