未验证 提交 5b945a90 编写于 作者: H Heng Du 提交者: GitHub

Merge pull request #3158 from Zanglei06/fix_rmq_client_offset_dev

[ISSUE #2708] fix offset rollback when fetch offset from broker exception
......@@ -785,7 +785,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
long offset = 0L;
try {
offset = nextPullOffset(messageQueue);
} catch (MQClientException e) {
} catch (Exception e) {
log.error("Failed to get next pull offset", e);
scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_ON_EXCEPTION, TimeUnit.MILLISECONDS);
return;
......
......@@ -273,7 +273,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
long offset = -1L;
try {
offset = this.rebalanceImpl.computePullFromWhereWithException(pullRequest.getMessageQueue());
} catch (MQClientException e) {
} catch (Exception e) {
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
log.error("Failed to compute pull offset, pullResult: {}", pullRequest, e);
return;
......
......@@ -378,7 +378,7 @@ public abstract class RebalanceImpl {
long nextOffset = -1L;
try {
nextOffset = this.computePullFromWhereWithException(mq);
} catch (MQClientException e) {
} catch (Exception e) {
log.info("doRebalance, {}, compute offset failed, {}", consumerGroup, mq);
continue;
}
......
......@@ -102,7 +102,8 @@ public class RebalanceLitePullImpl extends RebalanceImpl {
try {
result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
} catch (MQClientException e) {
result = -1;
log.warn("Compute consume offset from last offset exception, mq={}, exception={}", mq, e);
throw e;
}
}
} else {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册