From 6ab6afd006fcef784b3e3202b497080fcf4d023a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=96=9C=E9=98=B3?= Date: Tue, 25 May 2021 22:02:43 +0800 Subject: [PATCH] [ISSUE #2708] Client may submit wrong offset when network instability --- .../consumer/DefaultLitePullConsumerImpl.java | 15 +++++++++---- .../consumer/DefaultMQPushConsumerImpl.java | 7 ++++++- .../client/impl/consumer/RebalanceImpl.java | 20 +++++++++++++++++- .../impl/consumer/RebalanceLitePullImpl.java | 18 ++++++++++++++-- .../impl/consumer/RebalancePullImpl.java | 7 +++++++ .../impl/consumer/RebalancePushImpl.java | 21 ++++++++++++++++--- 6 files changed, 77 insertions(+), 11 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java index 280de46f..32b05d55 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java @@ -626,9 +626,9 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { } } - private long fetchConsumeOffset(MessageQueue messageQueue) { + private long fetchConsumeOffset(MessageQueue messageQueue) throws MQClientException { checkServiceState(); - long offset = this.rebalanceImpl.computePullFromWhere(messageQueue); + long offset = this.rebalanceImpl.computePullFromWhereWithException(messageQueue); return offset; } @@ -652,7 +652,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { } } - private long nextPullOffset(MessageQueue messageQueue) { + private long nextPullOffset(MessageQueue messageQueue) throws MQClientException { long offset = -1; long seekOffset = assignedMessageQueue.getSeekOffset(messageQueue); if (seekOffset != -1) { @@ -739,7 +739,14 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { return; } - long offset = nextPullOffset(messageQueue); + long offset = 0L; + try { + offset = nextPullOffset(messageQueue); + } catch (MQClientException e) { + log.error("get next pull offset failed, maybe timeout exception"); + return; + } + if (this.isCancelled() || processQueue.isDropped()) { return; } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java index e08f7800..52cd25fa 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java @@ -270,7 +270,12 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { } else { if (processQueue.isLocked()) { if (!pullRequest.isLockedFirst()) { - final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue()); + long offset = -1L; + try { + offset = this.rebalanceImpl.computePullFromWhereWithException(pullRequest.getMessageQueue()); + } catch (MQClientException e) { + log.error("compute consume offset failed, maybe timeout exception"); + } boolean brokerBusy = offset < pullRequest.getNextOffset(); log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}", pullRequest, offset, brokerBusy); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java index b8972a92..8d31314b 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java @@ -28,6 +28,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy; +import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.impl.FindBrokerResult; import org.apache.rocketmq.client.impl.factory.MQClientInstance; import org.apache.rocketmq.client.log.ClientLogger; @@ -373,7 +374,15 @@ public abstract class RebalanceImpl { this.removeDirtyOffset(mq); ProcessQueue pq = new ProcessQueue(); - long nextOffset = this.computePullFromWhere(mq); + + long nextOffset = 0L; + try { + nextOffset = this.computePullFromWhereWithException(mq); + } catch (MQClientException e) { + log.info("doRebalance, {}, compute offset failed, {}", consumerGroup, mq); + continue; + } + if (nextOffset >= 0) { ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq); if (pre != null) { @@ -408,8 +417,17 @@ public abstract class RebalanceImpl { public abstract void removeDirtyOffset(final MessageQueue mq); + /** + * When the network is unstable, using this interface may return wrong offset. + * It is recommended to use computePullFromWhereWithException instead. + * @param mq + * @return offset + */ + @Deprecated public abstract long computePullFromWhere(final MessageQueue mq); + public abstract long computePullFromWhereWithException(final MessageQueue mq) throws MQClientException; + public abstract void dispatchPullRequest(final List pullRequestList); public void removeProcessQueue(final MessageQueue mq) { diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java index 9d1ea749..286c684e 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java @@ -74,8 +74,20 @@ public class RebalanceLitePullImpl extends RebalanceImpl { this.litePullConsumerImpl.getOffsetStore().removeOffset(mq); } + @Deprecated @Override public long computePullFromWhere(MessageQueue mq) { + long result = -1L; + try { + result = computePullFromWhereWithException(mq); + } catch (MQClientException e) { + log.warn("Compute consume offset exception, mq={}", mq); + } + return result; + } + + @Override + public long computePullFromWhereWithException(MessageQueue mq) throws MQClientException { ConsumeFromWhere consumeFromWhere = litePullConsumerImpl.getDefaultLitePullConsumer().getConsumeFromWhere(); long result = -1; switch (consumeFromWhere) { @@ -118,7 +130,8 @@ public class RebalanceLitePullImpl extends RebalanceImpl { try { result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq); } catch (MQClientException e) { - result = -1; + log.warn("Compute consume offset from last offset exception, mq={}, exception={}", mq, e); + throw e; } } else { try { @@ -126,7 +139,8 @@ public class RebalanceLitePullImpl extends RebalanceImpl { UtilAll.YYYYMMDDHHMMSS).getTime(); result = this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp); } catch (MQClientException e) { - result = -1; + log.warn("Compute consume offset from last offset exception, mq={}, exception={}", mq, e); + throw e; } } } else { diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePullImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePullImpl.java index 9dd408c1..6df7eb7f 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePullImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePullImpl.java @@ -20,6 +20,7 @@ import java.util.List; import java.util.Set; import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy; import org.apache.rocketmq.client.consumer.MessageQueueListener; +import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.impl.factory.MQClientInstance; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; @@ -68,11 +69,17 @@ public class RebalancePullImpl extends RebalanceImpl { this.defaultMQPullConsumerImpl.getOffsetStore().removeOffset(mq); } + @Deprecated @Override public long computePullFromWhere(MessageQueue mq) { return 0; } + @Override + public long computePullFromWhereWithException(MessageQueue mq) throws MQClientException { + return 0; + } + @Override public void dispatchPullRequest(List pullRequestList) { } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java index 95823916..666b696f 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java @@ -137,8 +137,20 @@ public class RebalancePushImpl extends RebalanceImpl { this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq); } + @Deprecated @Override public long computePullFromWhere(MessageQueue mq) { + long result = -1L; + try { + result = computePullFromWhereWithException(mq); + } catch (MQClientException e) { + log.warn("Compute consume offset exception, mq={}", mq); + } + return result; + } + + @Override + public long computePullFromWhereWithException(MessageQueue mq) throws MQClientException { long result = -1; final ConsumeFromWhere consumeFromWhere = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeFromWhere(); final OffsetStore offsetStore = this.defaultMQPushConsumerImpl.getOffsetStore(); @@ -159,7 +171,8 @@ public class RebalancePushImpl extends RebalanceImpl { try { result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq); } catch (MQClientException e) { - result = -1; + log.warn("Compute consume offset from last offset exception, mq={}, exception={}", mq, e); + throw e; } } } else { @@ -187,7 +200,8 @@ public class RebalancePushImpl extends RebalanceImpl { try { result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq); } catch (MQClientException e) { - result = -1; + log.warn("Compute consume offset from last offset exception, mq={}, exception={}", mq, e); + throw e; } } else { try { @@ -195,7 +209,8 @@ public class RebalancePushImpl extends RebalanceImpl { UtilAll.YYYYMMDDHHMMSS).getTime(); result = this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp); } catch (MQClientException e) { - result = -1; + log.warn("Compute consume offset from last offset exception, mq={}, exception={}", mq, e); + throw e; } } } else { -- GitLab