diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java index e835be1cd5202ce42eb970e42468e1a2524baac1..d28d23ad6bbb5c5f765ef8f1e470dcf00a15d237 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java @@ -785,7 +785,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { long offset = 0L; try { offset = nextPullOffset(messageQueue); - } catch (MQClientException e) { + } catch (Exception e) { log.error("Failed to get next pull offset", e); scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_ON_EXCEPTION, TimeUnit.MILLISECONDS); return; diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java index bb0b7f10436ad7ef0844bad9ec09849c72b5940c..59b8deb3fba71c0bbfe385c92ceb538fac15b2de 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java @@ -273,7 +273,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { long offset = -1L; try { offset = this.rebalanceImpl.computePullFromWhereWithException(pullRequest.getMessageQueue()); - } catch (MQClientException e) { + } catch (Exception e) { this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); log.error("Failed to compute pull offset, pullResult: {}", pullRequest, e); return; diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java index 833d465a4703184be6e9a17c20682fdd5d81b240..7677d8b685f43c43c789bfdffa7b42975f9f49ad 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java @@ -378,7 +378,7 @@ public abstract class RebalanceImpl { long nextOffset = -1L; try { nextOffset = this.computePullFromWhereWithException(mq); - } catch (MQClientException e) { + } catch (Exception e) { log.info("doRebalance, {}, compute offset failed, {}", consumerGroup, mq); continue; } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java index 286c684e43a52dc3759c0f3b9393e0ee75851ef4..8fe9400b0f74ab92c712051d40c937915365ab45 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java @@ -102,7 +102,8 @@ public class RebalanceLitePullImpl extends RebalanceImpl { try { result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq); } catch (MQClientException e) { - result = -1; + log.warn("Compute consume offset from last offset exception, mq={}, exception={}", mq, e); + throw e; } } } else {