diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java index ac44df43557e1a73479aa54d75a093e08b84f927..1d2d24fa3b9d05aea77137fc33439a4bff3e777b 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java @@ -163,7 +163,6 @@ public class PullAPIWrapper { this.recalculatePullFromWhichNode(mq), false); } - if (findBrokerResult != null) { { // check version diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java index 51722111588ae653575ab95a36c197881191ca47..80347d1052ed31f19abe8bb743d4b37ceb4a534d 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java @@ -1046,7 +1046,7 @@ public class MQClientInstance { if (this.brokerVersionTable.get(brokerName).containsKey(brokerAddr)) { return this.brokerVersionTable.get(brokerName).get(brokerAddr); } - }else{ + } else { HeartbeatData heartbeatData = prepareHeartbeatData(); try { int version = this.mQClientAPIImpl.sendHearbeat(brokerAddr, heartbeatData, 3000); diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/filter/SqlFilterIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/filter/SqlFilterIT.java index 79f15dcc99da9ea04b20c756ffc12ddc03199b53..a0f6555ced072f0f28fb78bdff3bd7735a910cde 100644 --- a/test/src/test/java/org/apache/rocketmq/test/client/consumer/filter/SqlFilterIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/filter/SqlFilterIT.java @@ -17,12 +17,18 @@ package org.apache.rocketmq.test.client.consumer.filter; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; import org.apache.log4j.Logger; +import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; import org.apache.rocketmq.client.consumer.MessageSelector; +import org.apache.rocketmq.client.consumer.PullResult; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.test.base.BaseConf; -import org.apache.rocketmq.test.client.consumer.broadcast.BaseBroadCastIT; -import org.apache.rocketmq.test.client.consumer.broadcast.normal.NormalMsgTwoSameGroupConsumerIT; -import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer; import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; import org.apache.rocketmq.test.client.rmq.RMQSqlConsumer; import org.apache.rocketmq.test.factory.ConsumerFactory; @@ -39,12 +45,14 @@ public class SqlFilterIT extends BaseConf { private static Logger logger = Logger.getLogger(SqlFilterIT.class); private RMQNormalProducer producer = null; private String topic = null; + private static final Map OFFSE_TABLE = new HashMap(); @Before public void setUp() { topic = initTopic(); logger.info(String.format("use topic: %s;", topic)); producer = getProducer(nsAddr, topic); + OFFSE_TABLE.clear(); } @After @@ -71,4 +79,65 @@ public class SqlFilterIT extends BaseConf { assertThat(consumer.getListener().getAllMsgBody().size()).isEqualTo(msgSize * 2); } + + @Test + public void testFilterPullConsumer() throws Exception { + int msgSize = 16; + + String group = initConsumerGroup(); + MessageSelector selector = MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))"); + DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(group); + consumer.setNamesrvAddr(nsAddr); + consumer.start(); + Thread.sleep(3000); + producer.send("TagA", msgSize); + producer.send("TagB", msgSize); + producer.send("TagC", msgSize); + Assert.assertEquals("Not all sent succeeded", msgSize * 3, producer.getAllUndupMsgBody().size()); + + List receivedMessage = new ArrayList<>(2); + Set mqs = consumer.fetchSubscribeMessageQueues(topic); + for (MessageQueue mq : mqs) { + SINGLE_MQ: + while (true) { + try { + PullResult pullResult = + consumer.pull(mq, selector, getMessageQueueOffset(mq), 32); + putMessageQueueOffset(mq, pullResult.getNextBeginOffset()); + switch (pullResult.getPullStatus()) { + case FOUND: + List msgs = pullResult.getMsgFoundList(); + for (MessageExt msg : msgs) { + receivedMessage.add(new String(msg.getBody())); + } + break; + case NO_MATCHED_MSG: + break; + case NO_NEW_MSG: + break SINGLE_MQ; + case OFFSET_ILLEGAL: + break; + default: + break; + } + } catch (Exception e) { + e.printStackTrace(); + } + } + } + + assertThat(receivedMessage.size()).isEqualTo(msgSize * 2); + } + + private static long getMessageQueueOffset(MessageQueue mq) { + Long offset = OFFSE_TABLE.get(mq); + if (offset != null) + return offset; + + return 0; + } + + private static void putMessageQueueOffset(MessageQueue mq, long offset) { + OFFSE_TABLE.put(mq, offset); + } }