diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java index 280de46ff2d6d8a09e202083e48948f38ce10b9c..1e6b8dc99699aa9236c9e8a3dea0825606b74e2f 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java @@ -115,6 +115,8 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { */ private static final long PULL_TIME_DELAY_MILLS_WHEN_PAUSE = 1000; + private static final long PULL_TIME_DELAY_MILLS_ON_EXCEPTION = 3 * 1000; + private DefaultLitePullConsumer defaultLitePullConsumer; private final ConcurrentMap taskTable = @@ -626,9 +628,9 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { } } - private long fetchConsumeOffset(MessageQueue messageQueue) { + private long fetchConsumeOffset(MessageQueue messageQueue) throws MQClientException { checkServiceState(); - long offset = this.rebalanceImpl.computePullFromWhere(messageQueue); + long offset = this.rebalanceImpl.computePullFromWhereWithException(messageQueue); return offset; } @@ -652,7 +654,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { } } - private long nextPullOffset(MessageQueue messageQueue) { + private long nextPullOffset(MessageQueue messageQueue) throws MQClientException { long offset = -1; long seekOffset = assignedMessageQueue.getSeekOffset(messageQueue); if (seekOffset != -1) { @@ -739,7 +741,15 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { return; } - long offset = nextPullOffset(messageQueue); + long offset = 0L; + try { + offset = nextPullOffset(messageQueue); + } catch (MQClientException e) { + log.error("Failed to get next pull offset", e); + scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_ON_EXCEPTION, TimeUnit.MILLISECONDS); + return; + } + if (this.isCancelled() || processQueue.isDropped()) { return; } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java index e08f78003044e5592aa27159635f4a18d86723f4..bb0b7f10436ad7ef0844bad9ec09849c72b5940c 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java @@ -269,8 +269,15 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { } } else { if (processQueue.isLocked()) { - if (!pullRequest.isLockedFirst()) { - final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue()); + if (!pullRequest.isPreviouslyLocked()) { + long offset = -1L; + try { + offset = this.rebalanceImpl.computePullFromWhereWithException(pullRequest.getMessageQueue()); + } catch (MQClientException e) { + this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); + log.error("Failed to compute pull offset, pullResult: {}", pullRequest, e); + return; + } boolean brokerBusy = offset < pullRequest.getNextOffset(); log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}", pullRequest, offset, brokerBusy); @@ -279,7 +286,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { pullRequest, offset); } - pullRequest.setLockedFirst(true); + pullRequest.setPreviouslyLocked(true); pullRequest.setNextOffset(offset); } } else { diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullRequest.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullRequest.java index 10aded076252b7513b5eb42c67ebb28136f42930..bf03ec38c3a68d9a789504f3291b9b66d4c833cf 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullRequest.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullRequest.java @@ -23,14 +23,14 @@ public class PullRequest { private MessageQueue messageQueue; private ProcessQueue processQueue; private long nextOffset; - private boolean lockedFirst = false; + private boolean previouslyLocked = false; - public boolean isLockedFirst() { - return lockedFirst; + public boolean isPreviouslyLocked() { + return previouslyLocked; } - public void setLockedFirst(boolean lockedFirst) { - this.lockedFirst = lockedFirst; + public void setPreviouslyLocked(boolean previouslyLocked) { + this.previouslyLocked = previouslyLocked; } public String getConsumerGroup() { diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java index b8972a92e8fdb3402b92c7c1b1a8015cb38a127b..833d465a4703184be6e9a17c20682fdd5d81b240 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java @@ -28,6 +28,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy; +import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.impl.FindBrokerResult; import org.apache.rocketmq.client.impl.factory.MQClientInstance; import org.apache.rocketmq.client.log.ClientLogger; @@ -373,7 +374,15 @@ public abstract class RebalanceImpl { this.removeDirtyOffset(mq); ProcessQueue pq = new ProcessQueue(); - long nextOffset = this.computePullFromWhere(mq); + + long nextOffset = -1L; + try { + nextOffset = this.computePullFromWhereWithException(mq); + } catch (MQClientException e) { + log.info("doRebalance, {}, compute offset failed, {}", consumerGroup, mq); + continue; + } + if (nextOffset >= 0) { ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq); if (pre != null) { @@ -408,8 +417,17 @@ public abstract class RebalanceImpl { public abstract void removeDirtyOffset(final MessageQueue mq); + /** + * When the network is unstable, using this interface may return wrong offset. + * It is recommended to use computePullFromWhereWithException instead. + * @param mq + * @return offset + */ + @Deprecated public abstract long computePullFromWhere(final MessageQueue mq); + public abstract long computePullFromWhereWithException(final MessageQueue mq) throws MQClientException; + public abstract void dispatchPullRequest(final List pullRequestList); public void removeProcessQueue(final MessageQueue mq) { diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java index 9d1ea7492eea2ffc662defea11f4fe5d1bf84f38..286c684e43a52dc3759c0f3b9393e0ee75851ef4 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java @@ -74,8 +74,20 @@ public class RebalanceLitePullImpl extends RebalanceImpl { this.litePullConsumerImpl.getOffsetStore().removeOffset(mq); } + @Deprecated @Override public long computePullFromWhere(MessageQueue mq) { + long result = -1L; + try { + result = computePullFromWhereWithException(mq); + } catch (MQClientException e) { + log.warn("Compute consume offset exception, mq={}", mq); + } + return result; + } + + @Override + public long computePullFromWhereWithException(MessageQueue mq) throws MQClientException { ConsumeFromWhere consumeFromWhere = litePullConsumerImpl.getDefaultLitePullConsumer().getConsumeFromWhere(); long result = -1; switch (consumeFromWhere) { @@ -118,7 +130,8 @@ public class RebalanceLitePullImpl extends RebalanceImpl { try { result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq); } catch (MQClientException e) { - result = -1; + log.warn("Compute consume offset from last offset exception, mq={}, exception={}", mq, e); + throw e; } } else { try { @@ -126,7 +139,8 @@ public class RebalanceLitePullImpl extends RebalanceImpl { UtilAll.YYYYMMDDHHMMSS).getTime(); result = this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp); } catch (MQClientException e) { - result = -1; + log.warn("Compute consume offset from last offset exception, mq={}, exception={}", mq, e); + throw e; } } } else { diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePullImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePullImpl.java index 9dd408c1d140ea4f7ce4aa31df5e5181c997b753..6df7eb7f00cb3141bda3cef444f6a88c7abeeadd 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePullImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePullImpl.java @@ -20,6 +20,7 @@ import java.util.List; import java.util.Set; import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy; import org.apache.rocketmq.client.consumer.MessageQueueListener; +import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.impl.factory.MQClientInstance; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; @@ -68,11 +69,17 @@ public class RebalancePullImpl extends RebalanceImpl { this.defaultMQPullConsumerImpl.getOffsetStore().removeOffset(mq); } + @Deprecated @Override public long computePullFromWhere(MessageQueue mq) { return 0; } + @Override + public long computePullFromWhereWithException(MessageQueue mq) throws MQClientException { + return 0; + } + @Override public void dispatchPullRequest(List pullRequestList) { } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java index 9582391622cf6d2c0555e6e0b4d75ca9b9fef48c..666b696ffa89a631243e2e6533cfa82a52ff383f 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java @@ -137,8 +137,20 @@ public class RebalancePushImpl extends RebalanceImpl { this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq); } + @Deprecated @Override public long computePullFromWhere(MessageQueue mq) { + long result = -1L; + try { + result = computePullFromWhereWithException(mq); + } catch (MQClientException e) { + log.warn("Compute consume offset exception, mq={}", mq); + } + return result; + } + + @Override + public long computePullFromWhereWithException(MessageQueue mq) throws MQClientException { long result = -1; final ConsumeFromWhere consumeFromWhere = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeFromWhere(); final OffsetStore offsetStore = this.defaultMQPushConsumerImpl.getOffsetStore(); @@ -159,7 +171,8 @@ public class RebalancePushImpl extends RebalanceImpl { try { result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq); } catch (MQClientException e) { - result = -1; + log.warn("Compute consume offset from last offset exception, mq={}, exception={}", mq, e); + throw e; } } } else { @@ -187,7 +200,8 @@ public class RebalancePushImpl extends RebalanceImpl { try { result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq); } catch (MQClientException e) { - result = -1; + log.warn("Compute consume offset from last offset exception, mq={}, exception={}", mq, e); + throw e; } } else { try { @@ -195,7 +209,8 @@ public class RebalancePushImpl extends RebalanceImpl { UtilAll.YYYYMMDDHHMMSS).getTime(); result = this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp); } catch (MQClientException e) { - result = -1; + log.warn("Compute consume offset from last offset exception, mq={}, exception={}", mq, e); + throw e; } } } else { diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java index c2c60824ec80e9f4f3e2eac99c95704739349ecd..6c8463542fa61e00e7f84fa980c5588dd827ceee 100644 --- a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java @@ -48,6 +48,7 @@ import org.apache.rocketmq.client.impl.consumer.PullMessageService; import org.apache.rocketmq.client.impl.consumer.PullRequest; import org.apache.rocketmq.client.impl.consumer.PullResultExt; import org.apache.rocketmq.client.impl.consumer.RebalanceImpl; +import org.apache.rocketmq.client.impl.consumer.RebalancePushImpl; import org.apache.rocketmq.client.impl.factory.MQClientInstance; import org.apache.rocketmq.common.message.MessageClientExt; import org.apache.rocketmq.common.message.MessageDecoder; @@ -74,6 +75,7 @@ import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.nullable; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; @@ -87,6 +89,7 @@ public class DefaultMQPushConsumerTest { @Mock private MQClientAPIImpl mQClientAPIImpl; private RebalanceImpl rebalanceImpl; + private RebalancePushImpl rebalancePushImpl; private DefaultMQPushConsumer pushConsumer; @Before @@ -130,6 +133,7 @@ public class DefaultMQPushConsumerTest { }); DefaultMQPushConsumerImpl pushConsumerImpl = pushConsumer.getDefaultMQPushConsumerImpl(); + rebalancePushImpl = spy(new RebalancePushImpl(pushConsumer.getDefaultMQPushConsumerImpl())); // suppress updateTopicRouteInfoFromNameServer pushConsumer.changeInstanceNameToPID(); @@ -142,7 +146,7 @@ public class DefaultMQPushConsumerTest { doReturn(new FindBrokerResult("127.0.0.1:10911", false)).when(mQClientFactory).findBrokerAddressInSubscribe(anyString(), anyLong(), anyBoolean()); rebalanceImpl = spy(pushConsumerImpl.getRebalanceImpl()); - doReturn(123L).when(rebalanceImpl).computePullFromWhere(any(MessageQueue.class)); + doReturn(123L).when(rebalanceImpl).computePullFromWhereWithException(any(MessageQueue.class)); FieldUtils.writeDeclaredField(pushConsumerImpl, "rebalanceImpl", rebalanceImpl, true); Set messageQueueSet = new HashSet(); @@ -262,7 +266,7 @@ public class DefaultMQPushConsumerTest { pushConsumer.getDefaultMQPushConsumerImpl().setConsumeMessageService(new ConsumeMessageConcurrentlyService(pushConsumer.getDefaultMQPushConsumerImpl(), new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, - ConsumeConcurrentlyContext context) { + ConsumeConcurrentlyContext context) { countDownLatch.countDown(); try { Thread.sleep(1000); @@ -320,4 +324,21 @@ public class DefaultMQPushConsumerTest { } return new PullResultExt(pullStatus, requestHeader.getQueueOffset() + messageExtList.size(), 123, 2048, messageExtList, 0, outputStream.toByteArray()); } + + @Test + public void testPullMessage_ExceptionOccursWhenComputePullFromWhere() throws MQClientException { + final CountDownLatch countDownLatch = new CountDownLatch(1); + final MessageExt[] messageExts = new MessageExt[1]; + pushConsumer.getDefaultMQPushConsumerImpl().setConsumeMessageService( + new ConsumeMessageConcurrentlyService(pushConsumer.getDefaultMQPushConsumerImpl(), + (msgs, context) -> { + messageExts[0] = msgs.get(0); + return null; + })); + + pushConsumer.getDefaultMQPushConsumerImpl().setConsumeOrderly(true); + PullMessageService pullMessageService = mQClientFactory.getPullMessageService(); + pullMessageService.executePullRequestImmediately(createPullRequest()); + assertThat(messageExts[0]).isNull(); + } }