diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java index 1804df20e094233b779c9decf939d7e5506d0b6a..39c43d592d7772d335e166fa88af6a8322fd9438 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java @@ -47,6 +47,7 @@ import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.ServiceState; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; +import org.apache.rocketmq.common.filter.ExpressionType; import org.apache.rocketmq.common.filter.FilterAPI; import org.apache.rocketmq.common.help.FAQUrl; import org.apache.rocketmq.logging.InternalLogger; @@ -227,11 +228,12 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout; + boolean isTagType = ExpressionType.isTagType(subscriptionData.getExpressionType()); PullResult pullResult = this.pullAPIWrapper.pullKernelImpl( mq, subscriptionData.getSubString(), subscriptionData.getExpressionType(), - subscriptionData.getSubVersion(), + isTagType ? 0L : subscriptionData.getSubVersion(), offset, maxNums, sysFlag, @@ -455,11 +457,12 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout; + boolean isTagType = ExpressionType.isTagType(subscriptionData.getExpressionType()); this.pullAPIWrapper.pullKernelImpl( mq, subscriptionData.getSubString(), subscriptionData.getExpressionType(), - subscriptionData.getSubVersion(), + isTagType ? 0L : subscriptionData.getSubVersion(), offset, maxNums, sysFlag,