diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java index 0929916062233f4ae776e9318835e30de27a7e01..a17109818151f3620fbeb616b7cb1ab6102dba0f 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java @@ -280,7 +280,7 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService { case SUSPEND_CURRENT_QUEUE_A_MOMENT: this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size()); if (checkReconsumeTimes(msgs)) { - consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs); + consumeRequest.getProcessQueue().makeMessageToConsumeAgain(msgs); this.submitConsumeRequestLater( consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue(), @@ -312,7 +312,7 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService { case SUSPEND_CURRENT_QUEUE_A_MOMENT: this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size()); if (checkReconsumeTimes(msgs)) { - consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs); + consumeRequest.getProcessQueue().makeMessageToConsumeAgain(msgs); this.submitConsumeRequestLater( consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue(), diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java index ab585ea4c98e3592e6058a123d78cfc4f48bfe3d..a9dbc3157bad1a0d8003c49200d9e7c75a045720 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java @@ -348,12 +348,6 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { break; case NO_NEW_MSG: - pullRequest.setNextOffset(pullResult.getNextBeginOffset()); - - DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest); - - DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); - break; case NO_MATCHED_MSG: pullRequest.setNextOffset(pullResult.getNextBeginOffset()); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java index f659bd3f83100c5a1837f4ecadc7bbfdee9f63aa..4b9ea62c336743a5af35dbd85dfd31cb9c59fc19 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java @@ -280,7 +280,7 @@ public class ProcessQueue { return -1; } - public void makeMessageToCosumeAgain(List msgs) { + public void makeMessageToConsumeAgain(List msgs) { try { this.lockTreeMap.writeLock().lockInterruptibly(); try { diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java index 231ac0c68f3e26d099de8318c62741d320e7cb57..cc42a9e830ee9dde11869423dd879cefdd9d967a 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java @@ -194,7 +194,7 @@ public class PullAPIWrapper { String brokerAddr = findBrokerResult.getBrokerAddr(); if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) { - brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr); + brokerAddr = computePullFromWhichFilterServer(mq.getTopic(), brokerAddr); } PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage( @@ -223,7 +223,7 @@ public class PullAPIWrapper { return MixAll.MASTER_ID; } - private String computPullFromWhichFilterServer(final String topic, final String brokerAddr) + private String computePullFromWhichFilterServer(final String topic, final String brokerAddr) throws MQClientException { ConcurrentMap topicRouteTable = this.mQClientFactory.getTopicRouteTable(); if (topicRouteTable != null) {