diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java index 280de46ff2d6d8a09e202083e48948f38ce10b9c..32b05d55e26d34c80400c2cd22f5dd08b5dfd8d6 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java @@ -626,9 +626,9 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { } } - private long fetchConsumeOffset(MessageQueue messageQueue) { + private long fetchConsumeOffset(MessageQueue messageQueue) throws MQClientException { checkServiceState(); - long offset = this.rebalanceImpl.computePullFromWhere(messageQueue); + long offset = this.rebalanceImpl.computePullFromWhereWithException(messageQueue); return offset; } @@ -652,7 +652,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { } } - private long nextPullOffset(MessageQueue messageQueue) { + private long nextPullOffset(MessageQueue messageQueue) throws MQClientException { long offset = -1; long seekOffset = assignedMessageQueue.getSeekOffset(messageQueue); if (seekOffset != -1) { @@ -739,7 +739,14 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { return; } - long offset = nextPullOffset(messageQueue); + long offset = 0L; + try { + offset = nextPullOffset(messageQueue); + } catch (MQClientException e) { + log.error("get next pull offset failed, maybe timeout exception"); + return; + } + if (this.isCancelled() || processQueue.isDropped()) { return; } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java index e08f78003044e5592aa27159635f4a18d86723f4..52cd25fa8aa88f303ca11c06b6369138d9f9bd82 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java @@ -270,7 +270,12 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { } else { if (processQueue.isLocked()) { if (!pullRequest.isLockedFirst()) { - final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue()); + long offset = -1L; + try { + offset = this.rebalanceImpl.computePullFromWhereWithException(pullRequest.getMessageQueue()); + } catch (MQClientException e) { + log.error("compute consume offset failed, maybe timeout exception"); + } boolean brokerBusy = offset < pullRequest.getNextOffset(); log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}", pullRequest, offset, brokerBusy); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java index b8972a92e8fdb3402b92c7c1b1a8015cb38a127b..8d31314b0f73e80d17f157180f5b74887d628592 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java @@ -28,6 +28,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy; +import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.impl.FindBrokerResult; import org.apache.rocketmq.client.impl.factory.MQClientInstance; import org.apache.rocketmq.client.log.ClientLogger; @@ -373,7 +374,15 @@ public abstract class RebalanceImpl { this.removeDirtyOffset(mq); ProcessQueue pq = new ProcessQueue(); - long nextOffset = this.computePullFromWhere(mq); + + long nextOffset = 0L; + try { + nextOffset = this.computePullFromWhereWithException(mq); + } catch (MQClientException e) { + log.info("doRebalance, {}, compute offset failed, {}", consumerGroup, mq); + continue; + } + if (nextOffset >= 0) { ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq); if (pre != null) { @@ -408,8 +417,17 @@ public abstract class RebalanceImpl { public abstract void removeDirtyOffset(final MessageQueue mq); + /** + * When the network is unstable, using this interface may return wrong offset. + * It is recommended to use computePullFromWhereWithException instead. + * @param mq + * @return offset + */ + @Deprecated public abstract long computePullFromWhere(final MessageQueue mq); + public abstract long computePullFromWhereWithException(final MessageQueue mq) throws MQClientException; + public abstract void dispatchPullRequest(final List pullRequestList); public void removeProcessQueue(final MessageQueue mq) { diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java index 9d1ea7492eea2ffc662defea11f4fe5d1bf84f38..286c684e43a52dc3759c0f3b9393e0ee75851ef4 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java @@ -74,8 +74,20 @@ public class RebalanceLitePullImpl extends RebalanceImpl { this.litePullConsumerImpl.getOffsetStore().removeOffset(mq); } + @Deprecated @Override public long computePullFromWhere(MessageQueue mq) { + long result = -1L; + try { + result = computePullFromWhereWithException(mq); + } catch (MQClientException e) { + log.warn("Compute consume offset exception, mq={}", mq); + } + return result; + } + + @Override + public long computePullFromWhereWithException(MessageQueue mq) throws MQClientException { ConsumeFromWhere consumeFromWhere = litePullConsumerImpl.getDefaultLitePullConsumer().getConsumeFromWhere(); long result = -1; switch (consumeFromWhere) { @@ -118,7 +130,8 @@ public class RebalanceLitePullImpl extends RebalanceImpl { try { result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq); } catch (MQClientException e) { - result = -1; + log.warn("Compute consume offset from last offset exception, mq={}, exception={}", mq, e); + throw e; } } else { try { @@ -126,7 +139,8 @@ public class RebalanceLitePullImpl extends RebalanceImpl { UtilAll.YYYYMMDDHHMMSS).getTime(); result = this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp); } catch (MQClientException e) { - result = -1; + log.warn("Compute consume offset from last offset exception, mq={}, exception={}", mq, e); + throw e; } } } else { diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePullImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePullImpl.java index 9dd408c1d140ea4f7ce4aa31df5e5181c997b753..6df7eb7f00cb3141bda3cef444f6a88c7abeeadd 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePullImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePullImpl.java @@ -20,6 +20,7 @@ import java.util.List; import java.util.Set; import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy; import org.apache.rocketmq.client.consumer.MessageQueueListener; +import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.impl.factory.MQClientInstance; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; @@ -68,11 +69,17 @@ public class RebalancePullImpl extends RebalanceImpl { this.defaultMQPullConsumerImpl.getOffsetStore().removeOffset(mq); } + @Deprecated @Override public long computePullFromWhere(MessageQueue mq) { return 0; } + @Override + public long computePullFromWhereWithException(MessageQueue mq) throws MQClientException { + return 0; + } + @Override public void dispatchPullRequest(List pullRequestList) { } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java index 9582391622cf6d2c0555e6e0b4d75ca9b9fef48c..666b696ffa89a631243e2e6533cfa82a52ff383f 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java @@ -137,8 +137,20 @@ public class RebalancePushImpl extends RebalanceImpl { this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq); } + @Deprecated @Override public long computePullFromWhere(MessageQueue mq) { + long result = -1L; + try { + result = computePullFromWhereWithException(mq); + } catch (MQClientException e) { + log.warn("Compute consume offset exception, mq={}", mq); + } + return result; + } + + @Override + public long computePullFromWhereWithException(MessageQueue mq) throws MQClientException { long result = -1; final ConsumeFromWhere consumeFromWhere = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeFromWhere(); final OffsetStore offsetStore = this.defaultMQPushConsumerImpl.getOffsetStore(); @@ -159,7 +171,8 @@ public class RebalancePushImpl extends RebalanceImpl { try { result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq); } catch (MQClientException e) { - result = -1; + log.warn("Compute consume offset from last offset exception, mq={}, exception={}", mq, e); + throw e; } } } else { @@ -187,7 +200,8 @@ public class RebalancePushImpl extends RebalanceImpl { try { result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq); } catch (MQClientException e) { - result = -1; + log.warn("Compute consume offset from last offset exception, mq={}, exception={}", mq, e); + throw e; } } else { try { @@ -195,7 +209,8 @@ public class RebalancePushImpl extends RebalanceImpl { UtilAll.YYYYMMDDHHMMSS).getTime(); result = this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp); } catch (MQClientException e) { - result = -1; + log.warn("Compute consume offset from last offset exception, mq={}, exception={}", mq, e); + throw e; } } } else {