提交 82a59610 编写于 作者: M maowei.ymw 提交者: von gosling

Add integration test case

上级 11bb2672
......@@ -163,7 +163,6 @@ public class PullAPIWrapper {
this.recalculatePullFromWhichNode(mq), false);
}
if (findBrokerResult != null) {
{
// check version
......
......@@ -1046,7 +1046,7 @@ public class MQClientInstance {
if (this.brokerVersionTable.get(brokerName).containsKey(brokerAddr)) {
return this.brokerVersionTable.get(brokerName).get(brokerAddr);
}
}else{
} else {
HeartbeatData heartbeatData = prepareHeartbeatData();
try {
int version = this.mQClientAPIImpl.sendHearbeat(brokerAddr, heartbeatData, 3000);
......
......@@ -17,12 +17,18 @@
package org.apache.rocketmq.test.client.consumer.filter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.log4j.Logger;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.client.consumer.broadcast.BaseBroadCastIT;
import org.apache.rocketmq.test.client.consumer.broadcast.normal.NormalMsgTwoSameGroupConsumerIT;
import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.client.rmq.RMQSqlConsumer;
import org.apache.rocketmq.test.factory.ConsumerFactory;
......@@ -39,12 +45,14 @@ public class SqlFilterIT extends BaseConf {
private static Logger logger = Logger.getLogger(SqlFilterIT.class);
private RMQNormalProducer producer = null;
private String topic = null;
private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<MessageQueue, Long>();
@Before
public void setUp() {
topic = initTopic();
logger.info(String.format("use topic: %s;", topic));
producer = getProducer(nsAddr, topic);
OFFSE_TABLE.clear();
}
@After
......@@ -71,4 +79,65 @@ public class SqlFilterIT extends BaseConf {
assertThat(consumer.getListener().getAllMsgBody().size()).isEqualTo(msgSize * 2);
}
@Test
public void testFilterPullConsumer() throws Exception {
int msgSize = 16;
String group = initConsumerGroup();
MessageSelector selector = MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))");
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(group);
consumer.setNamesrvAddr(nsAddr);
consumer.start();
Thread.sleep(3000);
producer.send("TagA", msgSize);
producer.send("TagB", msgSize);
producer.send("TagC", msgSize);
Assert.assertEquals("Not all sent succeeded", msgSize * 3, producer.getAllUndupMsgBody().size());
List<String> receivedMessage = new ArrayList<>(2);
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues(topic);
for (MessageQueue mq : mqs) {
SINGLE_MQ:
while (true) {
try {
PullResult pullResult =
consumer.pull(mq, selector, getMessageQueueOffset(mq), 32);
putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
switch (pullResult.getPullStatus()) {
case FOUND:
List<MessageExt> msgs = pullResult.getMsgFoundList();
for (MessageExt msg : msgs) {
receivedMessage.add(new String(msg.getBody()));
}
break;
case NO_MATCHED_MSG:
break;
case NO_NEW_MSG:
break SINGLE_MQ;
case OFFSET_ILLEGAL:
break;
default:
break;
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
assertThat(receivedMessage.size()).isEqualTo(msgSize * 2);
}
private static long getMessageQueueOffset(MessageQueue mq) {
Long offset = OFFSE_TABLE.get(mq);
if (offset != null)
return offset;
return 0;
}
private static void putMessageQueueOffset(MessageQueue mq, long offset) {
OFFSE_TABLE.put(mq, offset);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册