diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java index 32b05d55e26d34c80400c2cd22f5dd08b5dfd8d6..1e6b8dc99699aa9236c9e8a3dea0825606b74e2f 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java @@ -115,6 +115,8 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { */ private static final long PULL_TIME_DELAY_MILLS_WHEN_PAUSE = 1000; + private static final long PULL_TIME_DELAY_MILLS_ON_EXCEPTION = 3 * 1000; + private DefaultLitePullConsumer defaultLitePullConsumer; private final ConcurrentMap taskTable = @@ -743,7 +745,8 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { try { offset = nextPullOffset(messageQueue); } catch (MQClientException e) { - log.error("get next pull offset failed, maybe timeout exception"); + log.error("Failed to get next pull offset", e); + scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_ON_EXCEPTION, TimeUnit.MILLISECONDS); return; } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java index 52cd25fa8aa88f303ca11c06b6369138d9f9bd82..e3db9a35a37a1ba797a78373f82882877777cf08 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java @@ -269,12 +269,14 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { } } else { if (processQueue.isLocked()) { - if (!pullRequest.isLockedFirst()) { + if (!pullRequest.isPreviouslyLocked()) { long offset = -1L; try { offset = this.rebalanceImpl.computePullFromWhereWithException(pullRequest.getMessageQueue()); } catch (MQClientException e) { - log.error("compute consume offset failed, maybe timeout exception"); + this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); + log.error("Failed to compute pull offset", e); + return; } boolean brokerBusy = offset < pullRequest.getNextOffset(); log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}", @@ -284,7 +286,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { pullRequest, offset); } - pullRequest.setLockedFirst(true); + pullRequest.setPreviouslyLocked(true); pullRequest.setNextOffset(offset); } } else { diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullRequest.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullRequest.java index 10aded076252b7513b5eb42c67ebb28136f42930..bf03ec38c3a68d9a789504f3291b9b66d4c833cf 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullRequest.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullRequest.java @@ -23,14 +23,14 @@ public class PullRequest { private MessageQueue messageQueue; private ProcessQueue processQueue; private long nextOffset; - private boolean lockedFirst = false; + private boolean previouslyLocked = false; - public boolean isLockedFirst() { - return lockedFirst; + public boolean isPreviouslyLocked() { + return previouslyLocked; } - public void setLockedFirst(boolean lockedFirst) { - this.lockedFirst = lockedFirst; + public void setPreviouslyLocked(boolean previouslyLocked) { + this.previouslyLocked = previouslyLocked; } public String getConsumerGroup() { diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java index 8d31314b0f73e80d17f157180f5b74887d628592..833d465a4703184be6e9a17c20682fdd5d81b240 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java @@ -375,7 +375,7 @@ public abstract class RebalanceImpl { this.removeDirtyOffset(mq); ProcessQueue pq = new ProcessQueue(); - long nextOffset = 0L; + long nextOffset = -1L; try { nextOffset = this.computePullFromWhereWithException(mq); } catch (MQClientException e) {