From d7c2d25c2a7344506ee001f7dfaf72f62c5f6c47 Mon Sep 17 00:00:00 2001 From: zanglei Date: Fri, 16 Jul 2021 11:01:15 +0800 Subject: [PATCH] [ISSUE #2708] fix offset rollback when fetch offset from broker exception --- .../client/impl/consumer/DefaultLitePullConsumerImpl.java | 2 +- .../client/impl/consumer/DefaultMQPushConsumerImpl.java | 2 +- .../apache/rocketmq/client/impl/consumer/RebalanceImpl.java | 2 +- .../rocketmq/client/impl/consumer/RebalanceLitePullImpl.java | 3 ++- 4 files changed, 5 insertions(+), 4 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java index e835be1c..d28d23ad 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java @@ -785,7 +785,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { long offset = 0L; try { offset = nextPullOffset(messageQueue); - } catch (MQClientException e) { + } catch (Exception e) { log.error("Failed to get next pull offset", e); scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_ON_EXCEPTION, TimeUnit.MILLISECONDS); return; diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java index bb0b7f10..59b8deb3 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java @@ -273,7 +273,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { long offset = -1L; try { offset = this.rebalanceImpl.computePullFromWhereWithException(pullRequest.getMessageQueue()); - } catch (MQClientException e) { + } catch (Exception e) { this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); log.error("Failed to compute pull offset, pullResult: {}", pullRequest, e); return; diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java index 833d465a..7677d8b6 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java @@ -378,7 +378,7 @@ public abstract class RebalanceImpl { long nextOffset = -1L; try { nextOffset = this.computePullFromWhereWithException(mq); - } catch (MQClientException e) { + } catch (Exception e) { log.info("doRebalance, {}, compute offset failed, {}", consumerGroup, mq); continue; } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java index 286c684e..8fe9400b 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java @@ -102,7 +102,8 @@ public class RebalanceLitePullImpl extends RebalanceImpl { try { result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq); } catch (MQClientException e) { - result = -1; + log.warn("Compute consume offset from last offset exception, mq={}, exception={}", mq, e); + throw e; } } } else { -- GitLab