提交 d7c2d25c 编写于 作者: Z zanglei

[ISSUE #2708] fix offset rollback when fetch offset from broker exception

上级 6aabf776
...@@ -785,7 +785,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { ...@@ -785,7 +785,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
long offset = 0L; long offset = 0L;
try { try {
offset = nextPullOffset(messageQueue); offset = nextPullOffset(messageQueue);
} catch (MQClientException e) { } catch (Exception e) {
log.error("Failed to get next pull offset", e); log.error("Failed to get next pull offset", e);
scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_ON_EXCEPTION, TimeUnit.MILLISECONDS); scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_ON_EXCEPTION, TimeUnit.MILLISECONDS);
return; return;
......
...@@ -273,7 +273,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { ...@@ -273,7 +273,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
long offset = -1L; long offset = -1L;
try { try {
offset = this.rebalanceImpl.computePullFromWhereWithException(pullRequest.getMessageQueue()); offset = this.rebalanceImpl.computePullFromWhereWithException(pullRequest.getMessageQueue());
} catch (MQClientException e) { } catch (Exception e) {
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
log.error("Failed to compute pull offset, pullResult: {}", pullRequest, e); log.error("Failed to compute pull offset, pullResult: {}", pullRequest, e);
return; return;
......
...@@ -378,7 +378,7 @@ public abstract class RebalanceImpl { ...@@ -378,7 +378,7 @@ public abstract class RebalanceImpl {
long nextOffset = -1L; long nextOffset = -1L;
try { try {
nextOffset = this.computePullFromWhereWithException(mq); nextOffset = this.computePullFromWhereWithException(mq);
} catch (MQClientException e) { } catch (Exception e) {
log.info("doRebalance, {}, compute offset failed, {}", consumerGroup, mq); log.info("doRebalance, {}, compute offset failed, {}", consumerGroup, mq);
continue; continue;
} }
......
...@@ -102,7 +102,8 @@ public class RebalanceLitePullImpl extends RebalanceImpl { ...@@ -102,7 +102,8 @@ public class RebalanceLitePullImpl extends RebalanceImpl {
try { try {
result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq); result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
} catch (MQClientException e) { } catch (MQClientException e) {
result = -1; log.warn("Compute consume offset from last offset exception, mq={}, exception={}", mq, e);
throw e;
} }
} }
} else { } else {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册