diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java index 676c03cef41ccc8891a170b954fe18b48743c189..f832370987ab32c7a83964bac2faad1f3795755e 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java @@ -432,8 +432,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { throw new IllegalArgumentException("Topic can not be null or empty."); } setSubscriptionType(SubscriptionType.SUBSCRIBE); - SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(defaultLitePullConsumer.getConsumerGroup(), - topic, subExpression); + SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(topic, subExpression); this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData); this.defaultLitePullConsumer.setMessageQueueListener(new MessageQueueListenerImpl()); assignedMessageQueue.setRebalanceImpl(this.rebalanceImpl); @@ -749,8 +748,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { subscriptionData = rebalanceImpl.getSubscriptionInner().get(topic); } else { String topic = this.messageQueue.getTopic(); - subscriptionData = FilterAPI.buildSubscriptionData(defaultLitePullConsumer.getConsumerGroup(), - topic, SubscriptionData.SUB_ALL); + subscriptionData = FilterAPI.buildSubscriptionData(topic, SubscriptionData.SUB_ALL); } PullResult pullResult = pull(messageQueue, subscriptionData, offset, defaultLitePullConsumer.getPullBatchSize()); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java index afd72a08002c6c9124feb6260f011e9810407842..eed5fa43f04fff819345def276320a442ee3f9fc 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java @@ -205,8 +205,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { } try { - return FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(), - mq.getTopic(), subExpression); + return FilterAPI.buildSubscriptionData(mq.getTopic(), subExpression); } catch (Exception e) { throw new MQClientException("parse subscription error", e); } @@ -301,8 +300,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { public void subscriptionAutomatically(final String topic) { if (!this.rebalanceImpl.getSubscriptionInner().containsKey(topic)) { try { - SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(), - topic, SubscriptionData.SUB_ALL); + SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(topic, SubscriptionData.SUB_ALL); this.rebalanceImpl.subscriptionInner.putIfAbsent(topic, subscriptionData); } catch (Exception ignore) { } @@ -365,7 +363,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { for (String t : topics) { SubscriptionData ms = null; try { - ms = FilterAPI.buildSubscriptionData(this.groupName(), t, SubscriptionData.SUB_ALL); + ms = FilterAPI.buildSubscriptionData(t, SubscriptionData.SUB_ALL); } catch (Exception e) { log.error("parse subscription error", e); } @@ -742,8 +740,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { Set registerTopics = this.defaultMQPullConsumer.getRegisterTopics(); if (registerTopics != null) { for (final String topic : registerTopics) { - SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(), - topic, SubscriptionData.SUB_ALL); + SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(topic, SubscriptionData.SUB_ALL); this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData); } } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java index a9dbc3157bad1a0d8003c49200d9e7c75a045720..e08f78003044e5592aa27159635f4a18d86723f4 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java @@ -827,8 +827,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { for (final Map.Entry entry : sub.entrySet()) { final String topic = entry.getKey(); final String subString = entry.getValue(); - SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), - topic, subString); + SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(topic, subString); this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData); } } @@ -842,8 +841,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { break; case CLUSTERING: final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()); - SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), - retryTopic, SubscriptionData.SUB_ALL); + SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(retryTopic, SubscriptionData.SUB_ALL); this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData); break; default: @@ -874,8 +872,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { public void subscribe(String topic, String subExpression) throws MQClientException { try { - SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), - topic, subExpression); + SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(topic, subExpression); this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData); if (this.mQClientFactory != null) { this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); @@ -887,8 +884,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { public void subscribe(String topic, String fullClassName, String filterClassSource) throws MQClientException { try { - SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), - topic, "*"); + SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(topic, "*"); subscriptionData.setSubString(fullClassName); subscriptionData.setClassFilterMode(true); subscriptionData.setFilterClassSource(filterClassSource); diff --git a/common/src/main/java/org/apache/rocketmq/common/filter/FilterAPI.java b/common/src/main/java/org/apache/rocketmq/common/filter/FilterAPI.java index 9268a6ef45d0663b1c5a08b81efc93e6ef988741..c5b51b1aff57b17ee24d010230cfae64d0d9a0a3 100644 --- a/common/src/main/java/org/apache/rocketmq/common/filter/FilterAPI.java +++ b/common/src/main/java/org/apache/rocketmq/common/filter/FilterAPI.java @@ -36,8 +36,7 @@ public class FilterAPI { return simple; } - public static SubscriptionData buildSubscriptionData(final String consumerGroup, String topic, - String subString) throws Exception { + public static SubscriptionData buildSubscriptionData(String topic, String subString) throws Exception { SubscriptionData subscriptionData = new SubscriptionData(); subscriptionData.setTopic(topic); subscriptionData.setSubString(subString); @@ -67,7 +66,7 @@ public class FilterAPI { public static SubscriptionData build(final String topic, final String subString, final String type) throws Exception { if (ExpressionType.TAG.equals(type) || type == null) { - return buildSubscriptionData(null, topic, subString); + return buildSubscriptionData(topic, subString); } if (subString == null || subString.length() < 1) { diff --git a/common/src/test/java/org/apache/rocketmq/common/filter/FilterAPITest.java b/common/src/test/java/org/apache/rocketmq/common/filter/FilterAPITest.java index 73ab09e61c4cd8ac5b4524815506af316c340b80..5190f88f47b531051a3d65fa91b5165b9ffb13fa 100644 --- a/common/src/test/java/org/apache/rocketmq/common/filter/FilterAPITest.java +++ b/common/src/test/java/org/apache/rocketmq/common/filter/FilterAPITest.java @@ -33,7 +33,7 @@ public class FilterAPITest { @Test public void testBuildSubscriptionData() throws Exception { SubscriptionData subscriptionData = - FilterAPI.buildSubscriptionData(group, topic, subString); + FilterAPI.buildSubscriptionData(topic, subString); assertThat(subscriptionData.getTopic()).isEqualTo(topic); assertThat(subscriptionData.getSubString()).isEqualTo(subString); String[] tags = subString.split("\\|\\|");