提交 f619e451 编写于 作者: S shroman

Fixed typo.

上级 c4a3e0c1
...@@ -33,12 +33,12 @@ public class RMQNormalConsumer extends AbstractMQConsumer { ...@@ -33,12 +33,12 @@ public class RMQNormalConsumer extends AbstractMQConsumer {
super(nsAddr, topic, subExpression, consumerGroup, listner); super(nsAddr, topic, subExpression, consumerGroup, listner);
} }
public AbstractListener getListner() { public AbstractListener getListener() {
return listner; return listener;
} }
public void setListner(AbstractListener listner) { public void setListener(AbstractListener listener) {
this.listner = listner; this.listener = listener;
} }
public void create() { public void create() {
...@@ -51,7 +51,7 @@ public class RMQNormalConsumer extends AbstractMQConsumer { ...@@ -51,7 +51,7 @@ public class RMQNormalConsumer extends AbstractMQConsumer {
logger.error("consumer subscribe failed!"); logger.error("consumer subscribe failed!");
e.printStackTrace(); e.printStackTrace();
} }
consumer.setMessageListener(listner); consumer.setMessageListener(listener);
} }
public void start() { public void start() {
...@@ -79,7 +79,7 @@ public class RMQNormalConsumer extends AbstractMQConsumer { ...@@ -79,7 +79,7 @@ public class RMQNormalConsumer extends AbstractMQConsumer {
@Override @Override
public void clearMsg() { public void clearMsg() {
this.listner.clearMsg(); this.listener.clearMsg();
} }
public void restart() { public void restart() {
......
...@@ -20,7 +20,7 @@ package org.apache.rocketmq.test.clientinterface; ...@@ -20,7 +20,7 @@ package org.apache.rocketmq.test.clientinterface;
import org.apache.rocketmq.test.listener.AbstractListener; import org.apache.rocketmq.test.listener.AbstractListener;
public abstract class AbstractMQConsumer implements MQConsumer { public abstract class AbstractMQConsumer implements MQConsumer {
protected AbstractListener listner = null; protected AbstractListener listener = null;
protected String nsAddr = null; protected String nsAddr = null;
protected String topic = null; protected String topic = null;
protected String subExpression = null; protected String subExpression = null;
...@@ -31,11 +31,11 @@ public abstract class AbstractMQConsumer implements MQConsumer { ...@@ -31,11 +31,11 @@ public abstract class AbstractMQConsumer implements MQConsumer {
} }
public AbstractMQConsumer(String nsAddr, String topic, String subExpression, public AbstractMQConsumer(String nsAddr, String topic, String subExpression,
String consumerGroup, AbstractListener listner) { String consumerGroup, AbstractListener listener) {
this.topic = topic; this.topic = topic;
this.subExpression = subExpression; this.subExpression = subExpression;
this.consumerGroup = consumerGroup; this.consumerGroup = consumerGroup;
this.listner = listner; this.listener = listener;
this.nsAddr = nsAddr; this.nsAddr = nsAddr;
} }
...@@ -45,16 +45,16 @@ public abstract class AbstractMQConsumer implements MQConsumer { ...@@ -45,16 +45,16 @@ public abstract class AbstractMQConsumer implements MQConsumer {
} }
public void setDebug() { public void setDebug() {
if (listner != null) { if (listener != null) {
listner.setDebug(true); listener.setDebug(true);
} }
isDebug = true; isDebug = true;
} }
public void setDebug(boolean isDebug) { public void setDebug(boolean isDebug) {
if (listner != null) { if (listener != null) {
listner.setDebug(isDebug); listener.setDebug(isDebug);
} }
this.isDebug = isDebug; this.isDebug = isDebug;
...@@ -65,12 +65,12 @@ public abstract class AbstractMQConsumer implements MQConsumer { ...@@ -65,12 +65,12 @@ public abstract class AbstractMQConsumer implements MQConsumer {
this.subExpression = subExpression; this.subExpression = subExpression;
} }
public AbstractListener getListner() { public AbstractListener getListener() {
return listner; return listener;
} }
public void setListner(AbstractListener listner) { public void setListener(AbstractListener listener) {
this.listner = listner; this.listener = listener;
} }
public String getNsAddr() { public String getNsAddr() {
...@@ -106,7 +106,7 @@ public abstract class AbstractMQConsumer implements MQConsumer { ...@@ -106,7 +106,7 @@ public abstract class AbstractMQConsumer implements MQConsumer {
} }
public void clearMsg() { public void clearMsg() {
listner.clearMsg(); listener.clearMsg();
} }
} }
...@@ -30,16 +30,16 @@ import org.apache.rocketmq.test.util.TestUtil; ...@@ -30,16 +30,16 @@ import org.apache.rocketmq.test.util.TestUtil;
public class AbstractListener extends MQCollector implements MessageListener { public class AbstractListener extends MQCollector implements MessageListener {
public static Logger logger = Logger.getLogger(AbstractListener.class); public static Logger logger = Logger.getLogger(AbstractListener.class);
protected boolean isDebug = false; protected boolean isDebug = false;
protected String listnerName = null; protected String listenerName = null;
protected Collection<Object> allSendMsgs = null; protected Collection<Object> allSendMsgs = null;
public AbstractListener() { public AbstractListener() {
super(); super();
} }
public AbstractListener(String listnerName) { public AbstractListener(String listenerName) {
super(); super();
this.listnerName = listnerName; this.listenerName = listenerName;
} }
public AbstractListener(String originMsgCollector, String msgBodyCollector) { public AbstractListener(String originMsgCollector, String msgBodyCollector) {
...@@ -82,10 +82,10 @@ public class AbstractListener extends MQCollector implements MessageListener { ...@@ -82,10 +82,10 @@ public class AbstractListener extends MQCollector implements MessageListener {
} else { } else {
if (System.currentTimeMillis() - curTime >= timeoutMills) { if (System.currentTimeMillis() - curTime >= timeoutMills) {
logger.error(String.format("timeout but [%s] not recv all send messages!", logger.error(String.format("timeout but [%s] not recv all send messages!",
listnerName)); listenerName));
break; break;
} else { } else {
logger.info(String.format("[%s] still [%s] msg not recv!", listnerName, logger.info(String.format("[%s] still [%s] msg not recv!", listenerName,
sendMsgs.size())); sendMsgs.size()));
TestUtil.waitForMonment(500); TestUtil.waitForMonment(500);
} }
...@@ -105,10 +105,10 @@ public class AbstractListener extends MQCollector implements MessageListener { ...@@ -105,10 +105,10 @@ public class AbstractListener extends MQCollector implements MessageListener {
} }
if (System.currentTimeMillis() - curTime >= timeoutMills) { if (System.currentTimeMillis() - curTime >= timeoutMills) {
logger.error(String.format("timeout but [%s] not recv all send messages!", logger.error(String.format("timeout but [%s] not recv all send messages!",
listnerName)); listenerName));
break; break;
} else { } else {
logger.info(String.format("[%s] still [%s] msg not recv!", listnerName, logger.info(String.format("[%s] still [%s] msg not recv!", listenerName,
size - msgBodys.getDataSize())); size - msgBodys.getDataSize()));
TestUtil.waitForMonment(500); TestUtil.waitForMonment(500);
} }
......
...@@ -49,7 +49,7 @@ public class RMQDelayListner extends AbstractListener implements MessageListener ...@@ -49,7 +49,7 @@ public class RMQDelayListner extends AbstractListener implements MessageListener
long recvTime = System.currentTimeMillis(); long recvTime = System.currentTimeMillis();
for (MessageExt msg : msgs) { for (MessageExt msg : msgs) {
if (isDebug) { if (isDebug) {
logger.info(listnerName + ":" + msg); logger.info(listenerName + ":" + msg);
} }
msgBodys.addData(new String(msg.getBody())); msgBodys.addData(new String(msg.getBody()));
......
...@@ -51,8 +51,8 @@ public class RMQNormalListner extends AbstractListener implements MessageListene ...@@ -51,8 +51,8 @@ public class RMQNormalListner extends AbstractListener implements MessageListene
for (MessageExt msg : msgs) { for (MessageExt msg : msgs) {
msgIndex.getAndIncrement(); msgIndex.getAndIncrement();
if (isDebug) { if (isDebug) {
if (listnerName != null && listnerName != "") { if (listenerName != null && listenerName != "") {
logger.info(listnerName + ":" + msgIndex.get() + ":" logger.info(listenerName + ":" + msgIndex.get() + ":"
+ String.format("msgid:%s broker:%s queueId:%s offset:%s", + String.format("msgid:%s broker:%s queueId:%s offset:%s",
msg.getMsgId(), msg.getStoreHost(), msg.getQueueId(), msg.getMsgId(), msg.getStoreHost(), msg.getQueueId(),
msg.getQueueOffset())); msg.getQueueOffset()));
......
...@@ -68,8 +68,8 @@ public class RMQOrderListener extends AbstractListener implements MessageListene ...@@ -68,8 +68,8 @@ public class RMQOrderListener extends AbstractListener implements MessageListene
ConsumeOrderlyContext context) { ConsumeOrderlyContext context) {
for (MessageExt msg : msgs) { for (MessageExt msg : msgs) {
if (isDebug) { if (isDebug) {
if (listnerName != null && listnerName != "") { if (listenerName != null && listenerName != "") {
logger.info(listnerName + ": " + msg); logger.info(listenerName + ": " + msg);
} else { } else {
logger.info(msg); logger.info(msg);
} }
......
...@@ -57,22 +57,22 @@ public class NormalMsgDynamicBalanceIT extends BaseConf { ...@@ -57,22 +57,22 @@ public class NormalMsgDynamicBalanceIT extends BaseConf {
producer.send(msgSize); producer.send(msgSize);
MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(), consumer1.getListner(), MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(), consumer1.getListener(),
consumer2.getListner()); consumer2.getListener());
consumer2.shutdown(); consumer2.shutdown();
producer.send(msgSize); producer.send(msgSize);
Assert.assertEquals("Not all are sent", msgSize * 2, producer.getAllUndupMsgBody().size()); Assert.assertEquals("Not all are sent", msgSize * 2, producer.getAllUndupMsgBody().size());
boolean recvAll = MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(), boolean recvAll = MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(),
consumer1.getListner(), consumer2.getListner()); consumer1.getListener(), consumer2.getListener());
assertThat(recvAll).isEqualTo(true); assertThat(recvAll).isEqualTo(true);
boolean balance = VerifyUtils.verifyBalance(msgSize, boolean balance = VerifyUtils.verifyBalance(msgSize,
VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer1.getListner().getAllUndupMsgBody()).size() - msgSize, consumer1.getListener().getAllUndupMsgBody()).size() - msgSize,
VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer2.getListner().getAllUndupMsgBody()).size()); consumer2.getListener().getAllUndupMsgBody()).size());
assertThat(balance).isEqualTo(true); assertThat(balance).isEqualTo(true);
} }
...@@ -87,8 +87,8 @@ public class NormalMsgDynamicBalanceIT extends BaseConf { ...@@ -87,8 +87,8 @@ public class NormalMsgDynamicBalanceIT extends BaseConf {
producer.send(msgSize); producer.send(msgSize);
MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(), consumer1.getListner(), MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(), consumer1.getListener(),
consumer2.getListner(), consumer3.getListner()); consumer2.getListener(), consumer3.getListener());
consumer3.shutdown(); consumer3.shutdown();
producer.clearMsg(); producer.clearMsg();
consumer1.clearMsg(); consumer1.clearMsg();
...@@ -98,14 +98,14 @@ public class NormalMsgDynamicBalanceIT extends BaseConf { ...@@ -98,14 +98,14 @@ public class NormalMsgDynamicBalanceIT extends BaseConf {
Assert.assertEquals("Not all are sent", msgSize, producer.getAllUndupMsgBody().size()); Assert.assertEquals("Not all are sent", msgSize, producer.getAllUndupMsgBody().size());
boolean recvAll = MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(), boolean recvAll = MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(),
consumer1.getListner(), consumer2.getListner()); consumer1.getListener(), consumer2.getListener());
assertThat(recvAll).isEqualTo(true); assertThat(recvAll).isEqualTo(true);
boolean balance = VerifyUtils.verifyBalance(msgSize, boolean balance = VerifyUtils.verifyBalance(msgSize,
VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer1.getListner().getAllUndupMsgBody()).size(), consumer1.getListener().getAllUndupMsgBody()).size(),
VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer2.getListner().getAllUndupMsgBody()).size()); consumer2.getListener().getAllUndupMsgBody()).size());
assertThat(balance).isEqualTo(true); assertThat(balance).isEqualTo(true);
} }
} }
...@@ -61,14 +61,14 @@ public class NormalMsgStaticBalanceIT extends BaseConf { ...@@ -61,14 +61,14 @@ public class NormalMsgStaticBalanceIT extends BaseConf {
Assert.assertEquals("Not all are sent", msgSize, producer.getAllUndupMsgBody().size()); Assert.assertEquals("Not all are sent", msgSize, producer.getAllUndupMsgBody().size());
boolean recvAll = MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(), boolean recvAll = MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(),
consumer1.getListner(), consumer2.getListner()); consumer1.getListener(), consumer2.getListener());
assertThat(recvAll).isEqualTo(true); assertThat(recvAll).isEqualTo(true);
boolean balance = VerifyUtils.verifyBalance(msgSize, boolean balance = VerifyUtils.verifyBalance(msgSize,
VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer1.getListner().getAllUndupMsgBody()).size(), consumer1.getListener().getAllUndupMsgBody()).size(),
VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer2.getListner().getAllUndupMsgBody()).size()); consumer2.getListener().getAllUndupMsgBody()).size());
assertThat(balance).isEqualTo(true); assertThat(balance).isEqualTo(true);
} }
...@@ -88,22 +88,22 @@ public class NormalMsgStaticBalanceIT extends BaseConf { ...@@ -88,22 +88,22 @@ public class NormalMsgStaticBalanceIT extends BaseConf {
Assert.assertEquals("Not all are sent", msgSize, producer.getAllUndupMsgBody().size()); Assert.assertEquals("Not all are sent", msgSize, producer.getAllUndupMsgBody().size());
boolean recvAll = MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(), boolean recvAll = MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(),
consumer1.getListner(), consumer2.getListner(), consumer3.getListner(), consumer1.getListener(), consumer2.getListener(), consumer3.getListener(),
consumer4.getListner()); consumer4.getListener());
assertThat(recvAll).isEqualTo(true); assertThat(recvAll).isEqualTo(true);
boolean balance = VerifyUtils boolean balance = VerifyUtils
.verifyBalance(msgSize, .verifyBalance(msgSize,
VerifyUtils VerifyUtils
.getFilterdMessage(producer.getAllMsgBody(), .getFilterdMessage(producer.getAllMsgBody(),
consumer1.getListner().getAllUndupMsgBody()) consumer1.getListener().getAllUndupMsgBody())
.size(), .size(),
VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer2.getListner().getAllUndupMsgBody()).size(), consumer2.getListener().getAllUndupMsgBody()).size(),
VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer3.getListner().getAllUndupMsgBody()).size(), consumer3.getListener().getAllUndupMsgBody()).size(),
VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer4.getListner().getAllUndupMsgBody()).size()); consumer4.getListener().getAllUndupMsgBody()).size());
assertThat(balance).isEqualTo(true); assertThat(balance).isEqualTo(true);
} }
} }
...@@ -60,14 +60,14 @@ public class BroadCastNormalMsgNotRecvIT extends BaseBroadCastIT { ...@@ -60,14 +60,14 @@ public class BroadCastNormalMsgNotRecvIT extends BaseBroadCastIT {
producer.send(msgSize); producer.send(msgSize);
Assert.assertEquals("Not all sent succeeded", msgSize, producer.getAllUndupMsgBody().size()); Assert.assertEquals("Not all sent succeeded", msgSize, producer.getAllUndupMsgBody().size());
consumer1.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); consumer1.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer1.getListner().getAllMsgBody())) consumer1.getListener().getAllMsgBody()))
.containsExactlyElementsIn(producer.getAllMsgBody()); .containsExactlyElementsIn(producer.getAllMsgBody());
RMQBroadCastConsumer consumer2 = getBroadCastConsumer(nsAddr, RMQBroadCastConsumer consumer2 = getBroadCastConsumer(nsAddr,
consumer1.getConsumerGroup(), topic, "*", new RMQNormalListner(group + "_2")); consumer1.getConsumerGroup(), topic, "*", new RMQNormalListner(group + "_2"));
consumer2.getListner().waitForMessageConsume(producer.getAllMsgBody(), waitTime); consumer2.getListener().waitForMessageConsume(producer.getAllMsgBody(), waitTime);
assertThat(consumer2.getListner().getAllMsgBody().size()).isEqualTo(0); assertThat(consumer2.getListener().getAllMsgBody().size()).isEqualTo(0);
} }
} }
...@@ -64,13 +64,13 @@ public class BroadCastNormalMsgRecvCrashIT extends BaseBroadCastIT { ...@@ -64,13 +64,13 @@ public class BroadCastNormalMsgRecvCrashIT extends BaseBroadCastIT {
producer.send(msgSize); producer.send(msgSize);
Assert.assertEquals("Not all sent succeeded", msgSize, producer.getAllUndupMsgBody().size()); Assert.assertEquals("Not all sent succeeded", msgSize, producer.getAllUndupMsgBody().size());
consumer1.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); consumer1.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
consumer2.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); consumer2.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer1.getListner().getAllMsgBody())) consumer1.getListener().getAllMsgBody()))
.containsExactlyElementsIn(producer.getAllMsgBody()); .containsExactlyElementsIn(producer.getAllMsgBody());
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer2.getListner().getAllMsgBody())) consumer2.getListener().getAllMsgBody()))
.containsExactlyElementsIn(producer.getAllMsgBody()); .containsExactlyElementsIn(producer.getAllMsgBody());
consumer2.shutdown(); consumer2.shutdown();
...@@ -81,9 +81,9 @@ public class BroadCastNormalMsgRecvCrashIT extends BaseBroadCastIT { ...@@ -81,9 +81,9 @@ public class BroadCastNormalMsgRecvCrashIT extends BaseBroadCastIT {
producer.send(msgSize); producer.send(msgSize);
Assert.assertEquals("Not all sent succeeded", msgSize, producer.getAllUndupMsgBody().size()); Assert.assertEquals("Not all sent succeeded", msgSize, producer.getAllUndupMsgBody().size());
consumer1.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); consumer1.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer1.getListner().getAllMsgBody())) consumer1.getListener().getAllMsgBody()))
.containsExactlyElementsIn(producer.getAllMsgBody()); .containsExactlyElementsIn(producer.getAllMsgBody());
} }
......
...@@ -63,10 +63,10 @@ public class BroadCastNormalMsgRecvFailIT extends BaseBroadCastIT { ...@@ -63,10 +63,10 @@ public class BroadCastNormalMsgRecvFailIT extends BaseBroadCastIT {
producer.send(msgSize); producer.send(msgSize);
Assert.assertEquals("Not all sent succeeded", msgSize, producer.getAllUndupMsgBody().size()); Assert.assertEquals("Not all sent succeeded", msgSize, producer.getAllUndupMsgBody().size());
consumer1.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); consumer1.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer1.getListner().getAllMsgBody())) consumer1.getListener().getAllMsgBody()))
.containsExactlyElementsIn(producer.getAllMsgBody()); .containsExactlyElementsIn(producer.getAllMsgBody());
} }
} }
...@@ -62,9 +62,9 @@ public class BroadCastNormalMsgRecvStartLaterIT extends BaseBroadCastIT { ...@@ -62,9 +62,9 @@ public class BroadCastNormalMsgRecvStartLaterIT extends BaseBroadCastIT {
producer.send(msgSize); producer.send(msgSize);
Assert.assertEquals("Not all sent succeeded", msgSize, producer.getAllUndupMsgBody().size()); Assert.assertEquals("Not all sent succeeded", msgSize, producer.getAllUndupMsgBody().size());
consumer1.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); consumer1.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer1.getListner().getAllMsgBody())) consumer1.getListener().getAllMsgBody()))
.containsExactlyElementsIn(producer.getAllMsgBody()); .containsExactlyElementsIn(producer.getAllMsgBody());
producer.clearMsg(); producer.clearMsg();
...@@ -76,13 +76,13 @@ public class BroadCastNormalMsgRecvStartLaterIT extends BaseBroadCastIT { ...@@ -76,13 +76,13 @@ public class BroadCastNormalMsgRecvStartLaterIT extends BaseBroadCastIT {
producer.send(msgSize); producer.send(msgSize);
Assert.assertEquals("Not all sent succeeded", msgSize, producer.getAllUndupMsgBody().size()); Assert.assertEquals("Not all sent succeeded", msgSize, producer.getAllUndupMsgBody().size());
consumer1.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); consumer1.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
consumer2.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); consumer2.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer1.getListner().getAllMsgBody())) consumer1.getListener().getAllMsgBody()))
.containsExactlyElementsIn(producer.getAllMsgBody()); .containsExactlyElementsIn(producer.getAllMsgBody());
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer2.getListner().getAllMsgBody())) consumer2.getListener().getAllMsgBody()))
.containsExactlyElementsIn(producer.getAllMsgBody()); .containsExactlyElementsIn(producer.getAllMsgBody());
} }
} }
...@@ -65,14 +65,14 @@ public class BroadCastNormalMsgTwoDiffGroupRecvIT extends BaseBroadCastIT { ...@@ -65,14 +65,14 @@ public class BroadCastNormalMsgTwoDiffGroupRecvIT extends BaseBroadCastIT {
producer.send(msgSize); producer.send(msgSize);
Assert.assertEquals("Not all sent succeeded", msgSize, producer.getAllUndupMsgBody().size()); Assert.assertEquals("Not all sent succeeded", msgSize, producer.getAllUndupMsgBody().size());
consumer1.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); consumer1.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
consumer2.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); consumer2.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer1.getListner().getAllMsgBody())) consumer1.getListener().getAllMsgBody()))
.containsExactlyElementsIn(producer.getAllMsgBody()); .containsExactlyElementsIn(producer.getAllMsgBody());
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer2.getListner().getAllMsgBody())) consumer2.getListener().getAllMsgBody()))
.containsExactlyElementsIn(producer.getAllMsgBody()); .containsExactlyElementsIn(producer.getAllMsgBody());
} }
} }
...@@ -64,14 +64,14 @@ public class NormalMsgTwoSameGroupConsumerIT extends BaseBroadCastIT { ...@@ -64,14 +64,14 @@ public class NormalMsgTwoSameGroupConsumerIT extends BaseBroadCastIT {
producer.send(msgSize); producer.send(msgSize);
Assert.assertEquals("Not all are sent", msgSize, producer.getAllUndupMsgBody().size()); Assert.assertEquals("Not all are sent", msgSize, producer.getAllUndupMsgBody().size());
consumer1.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); consumer1.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
consumer2.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); consumer2.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer1.getListner().getAllMsgBody())) consumer1.getListener().getAllMsgBody()))
.containsExactlyElementsIn(producer.getAllMsgBody()); .containsExactlyElementsIn(producer.getAllMsgBody());
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer2.getListner().getAllMsgBody())) consumer2.getListener().getAllMsgBody()))
.containsExactlyElementsIn(producer.getAllMsgBody()); .containsExactlyElementsIn(producer.getAllMsgBody());
} }
......
...@@ -64,12 +64,12 @@ public class OrderMsgBroadCastIT extends BaseBroadCastIT { ...@@ -64,12 +64,12 @@ public class OrderMsgBroadCastIT extends BaseBroadCastIT {
MessageQueueMsg mqMsgs = new MessageQueueMsg(mqs, msgSize); MessageQueueMsg mqMsgs = new MessageQueueMsg(mqs, msgSize);
producer.send(mqMsgs.getMsgsWithMQ()); producer.send(mqMsgs.getMsgsWithMQ());
consumer1.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); consumer1.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
consumer2.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); consumer2.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) consumer1.getListner()).getMsgs())) assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) consumer1.getListener()).getMsgs()))
.isEqualTo(true); .isEqualTo(true);
assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) consumer2.getListner()).getMsgs())) assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) consumer2.getListener()).getMsgs()))
.isEqualTo(true); .isEqualTo(true);
} }
} }
...@@ -65,14 +65,14 @@ public class BroadCastTwoConsumerFilterIT extends BaseBroadCastIT { ...@@ -65,14 +65,14 @@ public class BroadCastTwoConsumerFilterIT extends BaseBroadCastIT {
producer.clearMsg(); producer.clearMsg();
producer.send(tag1, msgSize); producer.send(tag1, msgSize);
consumer1.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); consumer1.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
consumer2.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); consumer2.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer1.getListner().getAllMsgBody())) consumer1.getListener().getAllMsgBody()))
.containsExactlyElementsIn(producer.getAllMsgBody()); .containsExactlyElementsIn(producer.getAllMsgBody());
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer2.getListner().getAllMsgBody())) consumer2.getListener().getAllMsgBody()))
.containsExactlyElementsIn(producer.getAllMsgBody()); .containsExactlyElementsIn(producer.getAllMsgBody());
} }
} }
...@@ -62,14 +62,14 @@ public class BroadCastTwoConsumerSubDiffTagIT extends BaseBroadCastIT { ...@@ -62,14 +62,14 @@ public class BroadCastTwoConsumerSubDiffTagIT extends BaseBroadCastIT {
producer.send(tag, msgSize); producer.send(tag, msgSize);
Assert.assertEquals("Not all sent succeeded", msgSize, producer.getAllUndupMsgBody().size()); Assert.assertEquals("Not all sent succeeded", msgSize, producer.getAllUndupMsgBody().size());
consumer1.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); consumer1.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
consumer2.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); consumer2.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer1.getListner().getAllMsgBody())) consumer1.getListener().getAllMsgBody()))
.containsExactlyElementsIn(producer.getAllMsgBody()); .containsExactlyElementsIn(producer.getAllMsgBody());
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer2.getListner().getAllMsgBody())) consumer2.getListener().getAllMsgBody()))
.containsExactlyElementsIn(producer.getAllMsgBody()); .containsExactlyElementsIn(producer.getAllMsgBody());
} }
} }
...@@ -62,14 +62,14 @@ public class BroadCastTwoConsumerSubTagIT extends BaseBroadCastIT { ...@@ -62,14 +62,14 @@ public class BroadCastTwoConsumerSubTagIT extends BaseBroadCastIT {
producer.send(tag, msgSize); producer.send(tag, msgSize);
Assert.assertEquals("Not all sent succeeded", msgSize, producer.getAllUndupMsgBody().size()); Assert.assertEquals("Not all sent succeeded", msgSize, producer.getAllUndupMsgBody().size());
consumer1.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); consumer1.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
consumer2.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); consumer2.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer1.getListner().getAllMsgBody())) consumer1.getListener().getAllMsgBody()))
.containsExactlyElementsIn(producer.getAllMsgBody()); .containsExactlyElementsIn(producer.getAllMsgBody());
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer2.getListner().getAllMsgBody())) consumer2.getListener().getAllMsgBody()))
.containsExactlyElementsIn(producer.getAllMsgBody()); .containsExactlyElementsIn(producer.getAllMsgBody());
} }
} }
...@@ -65,11 +65,11 @@ public class DynamicAddAndCrashIT extends BaseConf { ...@@ -65,11 +65,11 @@ public class DynamicAddAndCrashIT extends BaseConf {
asyncDefaultMQProducer.waitSendAll(waitTime * 6); asyncDefaultMQProducer.waitSendAll(waitTime * 6);
MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(), consumer1.getListner(), MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(), consumer1.getListener(),
consumer2.getListner()); consumer2.getListener());
boolean recvAll = MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(), boolean recvAll = MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(),
consumer1.getListner(), consumer2.getListner()); consumer1.getListener(), consumer2.getListener());
assertThat(recvAll).isEqualTo(true); assertThat(recvAll).isEqualTo(true);
} }
...@@ -93,11 +93,11 @@ public class DynamicAddAndCrashIT extends BaseConf { ...@@ -93,11 +93,11 @@ public class DynamicAddAndCrashIT extends BaseConf {
asyncDefaultMQProducer.waitSendAll(waitTime * 6); asyncDefaultMQProducer.waitSendAll(waitTime * 6);
MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(), consumer1.getListner(), MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(), consumer1.getListener(),
consumer2.getListner(), consumer3.getListner()); consumer2.getListener(), consumer3.getListener());
boolean recvAll = MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(), boolean recvAll = MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(),
consumer1.getListner(), consumer2.getListner(), consumer3.getListner()); consumer1.getListener(), consumer2.getListener(), consumer3.getListener());
assertThat(recvAll).isEqualTo(true); assertThat(recvAll).isEqualTo(true);
} }
} }
...@@ -63,11 +63,11 @@ public class DynamicAddConsumerIT extends BaseConf { ...@@ -63,11 +63,11 @@ public class DynamicAddConsumerIT extends BaseConf {
asyncDefaultMQProducer.waitSendAll(waitTime * 6); asyncDefaultMQProducer.waitSendAll(waitTime * 6);
MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(), consumer1.getListner(), MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(), consumer1.getListener(),
consumer2.getListner()); consumer2.getListener());
boolean recvAll = MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(), boolean recvAll = MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(),
consumer1.getListner(), consumer2.getListner()); consumer1.getListener(), consumer2.getListener());
assertThat(recvAll).isEqualTo(true); assertThat(recvAll).isEqualTo(true);
} }
...@@ -87,11 +87,11 @@ public class DynamicAddConsumerIT extends BaseConf { ...@@ -87,11 +87,11 @@ public class DynamicAddConsumerIT extends BaseConf {
asyncDefaultMQProducer.waitSendAll(waitTime * 6); asyncDefaultMQProducer.waitSendAll(waitTime * 6);
MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(), consumer1.getListner(), MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(), consumer1.getListener(),
consumer2.getListner(), consumer3.getListner()); consumer2.getListener(), consumer3.getListener());
boolean recvAll = MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(), boolean recvAll = MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(),
consumer1.getListner(), consumer2.getListner(), consumer3.getListner()); consumer1.getListener(), consumer2.getListener(), consumer3.getListener());
assertThat(recvAll).isEqualTo(true); assertThat(recvAll).isEqualTo(true);
} }
} }
...@@ -64,11 +64,11 @@ public class DynamicCrashConsumerIT extends BaseConf { ...@@ -64,11 +64,11 @@ public class DynamicCrashConsumerIT extends BaseConf {
asyncDefaultMQProducer.waitSendAll(waitTime * 6); asyncDefaultMQProducer.waitSendAll(waitTime * 6);
MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(), consumer1.getListner(), MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(), consumer1.getListener(),
consumer2.getListner()); consumer2.getListener());
boolean recvAll = MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(), boolean recvAll = MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(),
consumer1.getListner(), consumer2.getListner()); consumer1.getListener(), consumer2.getListener());
assertThat(recvAll).isEqualTo(true); assertThat(recvAll).isEqualTo(true);
} }
...@@ -90,11 +90,11 @@ public class DynamicCrashConsumerIT extends BaseConf { ...@@ -90,11 +90,11 @@ public class DynamicCrashConsumerIT extends BaseConf {
asyncDefaultMQProducer.waitSendAll(waitTime * 6); asyncDefaultMQProducer.waitSendAll(waitTime * 6);
MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(), consumer1.getListner(), MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(), consumer1.getListener(),
consumer2.getListner(), consumer3.getListner()); consumer2.getListener(), consumer3.getListener());
boolean recvAll = MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(), boolean recvAll = MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(),
consumer1.getListner(), consumer2.getListner(), consumer3.getListner()); consumer1.getListener(), consumer2.getListener(), consumer3.getListener());
assertThat(recvAll).isEqualTo(true); assertThat(recvAll).isEqualTo(true);
} }
} }
...@@ -20,9 +20,6 @@ package org.apache.rocketmq.test.client.consumer.filter; ...@@ -20,9 +20,6 @@ package org.apache.rocketmq.test.client.consumer.filter;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.apache.rocketmq.client.consumer.MessageSelector; import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.test.base.BaseConf; import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.client.consumer.broadcast.BaseBroadCastIT;
import org.apache.rocketmq.test.client.consumer.broadcast.normal.NormalMsgTwoSameGroupConsumerIT;
import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.client.rmq.RMQSqlConsumer; import org.apache.rocketmq.test.client.rmq.RMQSqlConsumer;
import org.apache.rocketmq.test.factory.ConsumerFactory; import org.apache.rocketmq.test.factory.ConsumerFactory;
...@@ -64,11 +61,11 @@ public class SqlFilterIT extends BaseConf { ...@@ -64,11 +61,11 @@ public class SqlFilterIT extends BaseConf {
producer.send("TagB", msgSize); producer.send("TagB", msgSize);
producer.send("TagC", msgSize); producer.send("TagC", msgSize);
Assert.assertEquals("Not all sent succeeded", msgSize * 3, producer.getAllUndupMsgBody().size()); Assert.assertEquals("Not all sent succeeded", msgSize * 3, producer.getAllUndupMsgBody().size());
consumer.getListner().waitForMessageConsume(msgSize * 2, consumeTime); consumer.getListener().waitForMessageConsume(msgSize * 2, consumeTime);
assertThat(producer.getAllMsgBody()) assertThat(producer.getAllMsgBody())
.containsAllIn(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), .containsAllIn(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer.getListner().getAllMsgBody())); consumer.getListener().getAllMsgBody()));
assertThat(consumer.getListner().getAllMsgBody().size()).isEqualTo(msgSize * 2); assertThat(consumer.getListener().getAllMsgBody().size()).isEqualTo(msgSize * 2);
} }
} }
...@@ -61,10 +61,10 @@ public class MulTagSubIT extends BaseConf { ...@@ -61,10 +61,10 @@ public class MulTagSubIT extends BaseConf {
new RMQNormalListner()); new RMQNormalListner());
producer.send(tag, msgSize); producer.send(tag, msgSize);
Assert.assertEquals("Not all sent succeeded", msgSize, producer.getAllUndupMsgBody().size()); Assert.assertEquals("Not all sent succeeded", msgSize, producer.getAllUndupMsgBody().size());
consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer.getListner().getAllMsgBody())) consumer.getListener().getAllMsgBody()))
.containsExactlyElementsIn(producer.getAllMsgBody()); .containsExactlyElementsIn(producer.getAllMsgBody());
} }
...@@ -83,10 +83,10 @@ public class MulTagSubIT extends BaseConf { ...@@ -83,10 +83,10 @@ public class MulTagSubIT extends BaseConf {
producer.send(tag2Msgs); producer.send(tag2Msgs);
Assert.assertEquals("Not all sent succeeded", msgSize * 2, producer.getAllUndupMsgBody().size()); Assert.assertEquals("Not all sent succeeded", msgSize * 2, producer.getAllUndupMsgBody().size());
consumer.getListner().waitForMessageConsume(MQMessageFactory.getMessageBody(tag2Msgs), consumer.getListener().waitForMessageConsume(MQMessageFactory.getMessageBody(tag2Msgs),
consumeTime); consumeTime);
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer.getListner().getAllMsgBody())) consumer.getListener().getAllMsgBody()))
.containsExactlyElementsIn(MQMessageFactory.getMessageBody(tag2Msgs)); .containsExactlyElementsIn(MQMessageFactory.getMessageBody(tag2Msgs));
} }
...@@ -104,10 +104,10 @@ public class MulTagSubIT extends BaseConf { ...@@ -104,10 +104,10 @@ public class MulTagSubIT extends BaseConf {
Assert.assertEquals("Not all sent succeeded", msgSize * tags.length, Assert.assertEquals("Not all sent succeeded", msgSize * tags.length,
producer.getAllUndupMsgBody().size()); producer.getAllUndupMsgBody().size());
consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer.getListner().getAllMsgBody())) consumer.getListener().getAllMsgBody()))
.containsExactlyElementsIn(tagMessage.getAllTagMessageBody()); .containsExactlyElementsIn(tagMessage.getAllTagMessageBody());
} }
...@@ -125,11 +125,11 @@ public class MulTagSubIT extends BaseConf { ...@@ -125,11 +125,11 @@ public class MulTagSubIT extends BaseConf {
Assert.assertEquals("Not all sent succeeded", msgSize * tags.length, Assert.assertEquals("Not all sent succeeded", msgSize * tags.length,
producer.getAllUndupMsgBody().size()); producer.getAllUndupMsgBody().size());
consumer.getListner().waitForMessageConsume( consumer.getListener().waitForMessageConsume(
tagMessage.getMessageBodyByTag(tags[0], tags[1]), consumeTime); tagMessage.getMessageBodyByTag(tags[0], tags[1]), consumeTime);
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer.getListner().getAllMsgBody())).containsExactlyElementsIn( consumer.getListener().getAllMsgBody())).containsExactlyElementsIn(
tagMessage.getMessageBodyByTag(tags[0], tags[1])); tagMessage.getMessageBodyByTag(tags[0], tags[1]));
} }
...@@ -150,7 +150,7 @@ public class MulTagSubIT extends BaseConf { ...@@ -150,7 +150,7 @@ public class MulTagSubIT extends BaseConf {
TestUtils.waitForSeconds(5); TestUtils.waitForSeconds(5);
assertThat(VerifyUtils assertThat(VerifyUtils
.getFilterdMessage(producer.getAllMsgBody(), consumer.getListner().getAllMsgBody()) .getFilterdMessage(producer.getAllMsgBody(), consumer.getListener().getAllMsgBody())
.size()).isEqualTo(0); .size()).isEqualTo(0);
} }
} }
...@@ -57,9 +57,9 @@ public class TagMessageWith1ConsumerIT extends BaseConf { ...@@ -57,9 +57,9 @@ public class TagMessageWith1ConsumerIT extends BaseConf {
RMQNormalConsumer consumer = getConsumer(nsAddr, topic, tag, new RMQNormalListner()); RMQNormalConsumer consumer = getConsumer(nsAddr, topic, tag, new RMQNormalListner());
producer.send(tag, msgSize); producer.send(tag, msgSize);
Assert.assertEquals("Not all are sent", msgSize, producer.getAllUndupMsgBody().size()); Assert.assertEquals("Not all are sent", msgSize, producer.getAllUndupMsgBody().size());
consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer.getListner().getAllMsgBody())) consumer.getListener().getAllMsgBody()))
.containsExactlyElementsIn(producer.getAllMsgBody()); .containsExactlyElementsIn(producer.getAllMsgBody());
} }
...@@ -71,10 +71,10 @@ public class TagMessageWith1ConsumerIT extends BaseConf { ...@@ -71,10 +71,10 @@ public class TagMessageWith1ConsumerIT extends BaseConf {
new RMQNormalListner()); new RMQNormalListner());
producer.send(msgSize); producer.send(msgSize);
Assert.assertEquals("Not all are sent", msgSize, producer.getAllUndupMsgBody().size()); Assert.assertEquals("Not all are sent", msgSize, producer.getAllUndupMsgBody().size());
consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer.getListner().getAllMsgBody())) consumer.getListener().getAllMsgBody()))
.containsExactlyElementsIn(producer.getAllMsgBody()); .containsExactlyElementsIn(producer.getAllMsgBody());
} }
...@@ -87,10 +87,10 @@ public class TagMessageWith1ConsumerIT extends BaseConf { ...@@ -87,10 +87,10 @@ public class TagMessageWith1ConsumerIT extends BaseConf {
new RMQNormalListner()); new RMQNormalListner());
producer.send(tag, msgSize); producer.send(tag, msgSize);
Assert.assertEquals("Not all are sent", msgSize, producer.getAllUndupMsgBody().size()); Assert.assertEquals("Not all are sent", msgSize, producer.getAllUndupMsgBody().size());
consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer.getListner().getAllMsgBody())) consumer.getListener().getAllMsgBody()))
.containsExactlyElementsIn(producer.getAllMsgBody()); .containsExactlyElementsIn(producer.getAllMsgBody());
} }
...@@ -103,10 +103,10 @@ public class TagMessageWith1ConsumerIT extends BaseConf { ...@@ -103,10 +103,10 @@ public class TagMessageWith1ConsumerIT extends BaseConf {
new RMQNormalListner()); new RMQNormalListner());
producer.send(tag, msgSize); producer.send(tag, msgSize);
Assert.assertEquals("Not all are sent", msgSize, producer.getAllUndupMsgBody().size()); Assert.assertEquals("Not all are sent", msgSize, producer.getAllUndupMsgBody().size());
consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer.getListner().getAllMsgBody())) consumer.getListener().getAllMsgBody()))
.containsExactlyElementsIn(producer.getAllMsgBody()); .containsExactlyElementsIn(producer.getAllMsgBody());
} }
...@@ -119,10 +119,10 @@ public class TagMessageWith1ConsumerIT extends BaseConf { ...@@ -119,10 +119,10 @@ public class TagMessageWith1ConsumerIT extends BaseConf {
new RMQNormalListner()); new RMQNormalListner());
producer.send(tag, msgSize); producer.send(tag, msgSize);
Assert.assertEquals("Not all are sent", msgSize, producer.getAllUndupMsgBody().size()); Assert.assertEquals("Not all are sent", msgSize, producer.getAllUndupMsgBody().size());
consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer.getListner().getAllMsgBody())) consumer.getListener().getAllMsgBody()))
.containsExactlyElementsIn(producer.getAllMsgBody()); .containsExactlyElementsIn(producer.getAllMsgBody());
} }
...@@ -142,10 +142,10 @@ public class TagMessageWith1ConsumerIT extends BaseConf { ...@@ -142,10 +142,10 @@ public class TagMessageWith1ConsumerIT extends BaseConf {
producer.send(tag2Msgs); producer.send(tag2Msgs);
producer.send(10); producer.send(10);
Assert.assertEquals("Not all are sent", msgSize * 3, producer.getAllUndupMsgBody().size()); Assert.assertEquals("Not all are sent", msgSize * 3, producer.getAllUndupMsgBody().size());
consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer.getListner().getAllMsgBody())) consumer.getListener().getAllMsgBody()))
.containsExactlyElementsIn(producer.getAllMsgBody()); .containsExactlyElementsIn(producer.getAllMsgBody());
} }
...@@ -164,10 +164,10 @@ public class TagMessageWith1ConsumerIT extends BaseConf { ...@@ -164,10 +164,10 @@ public class TagMessageWith1ConsumerIT extends BaseConf {
producer.send(tag1Msgs); producer.send(tag1Msgs);
producer.send(tag2Msgs); producer.send(tag2Msgs);
Assert.assertEquals("Not all are sent", msgSize * 2, producer.getAllUndupMsgBody().size()); Assert.assertEquals("Not all are sent", msgSize * 2, producer.getAllUndupMsgBody().size());
consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer.getListner().getAllMsgBody())) consumer.getListener().getAllMsgBody()))
.containsExactlyElementsIn(producer.getAllMsgBody()); .containsExactlyElementsIn(producer.getAllMsgBody());
} }
...@@ -187,11 +187,11 @@ public class TagMessageWith1ConsumerIT extends BaseConf { ...@@ -187,11 +187,11 @@ public class TagMessageWith1ConsumerIT extends BaseConf {
producer.send(tag2Msgs); producer.send(tag2Msgs);
producer.send(10); producer.send(10);
Assert.assertEquals("Not all are sent", msgSize * 3, producer.getAllUndupMsgBody().size()); Assert.assertEquals("Not all are sent", msgSize * 3, producer.getAllUndupMsgBody().size());
consumer.getListner().waitForMessageConsume(MQMessageFactory.getMessageBody(tag2Msgs), consumer.getListener().waitForMessageConsume(MQMessageFactory.getMessageBody(tag2Msgs),
consumeTime); consumeTime);
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer.getListner().getAllMsgBody())) consumer.getListener().getAllMsgBody()))
.containsExactlyElementsIn(MQMessageFactory.getMessageBody(tag2Msgs)); .containsExactlyElementsIn(MQMessageFactory.getMessageBody(tag2Msgs));
} }
} }
...@@ -69,16 +69,16 @@ public class TagMessageWithMulConsumerIT extends BaseConf { ...@@ -69,16 +69,16 @@ public class TagMessageWithMulConsumerIT extends BaseConf {
producer.send(tag2Msgs); producer.send(tag2Msgs);
Assert.assertEquals("Not all are sent", msgSize * 2, producer.getAllUndupMsgBody().size()); Assert.assertEquals("Not all are sent", msgSize * 2, producer.getAllUndupMsgBody().size());
consumerTag1.getListner().waitForMessageConsume(MQMessageFactory.getMessageBody(tag1Msgs), consumerTag1.getListener().waitForMessageConsume(MQMessageFactory.getMessageBody(tag1Msgs),
consumeTime); consumeTime);
consumerTag2.getListner().waitForMessageConsume(MQMessageFactory.getMessageBody(tag2Msgs), consumerTag2.getListener().waitForMessageConsume(MQMessageFactory.getMessageBody(tag2Msgs),
consumeTime); consumeTime);
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumerTag1.getListner().getAllMsgBody())) consumerTag1.getListener().getAllMsgBody()))
.containsExactlyElementsIn(MQMessageFactory.getMessageBody(tag1Msgs)); .containsExactlyElementsIn(MQMessageFactory.getMessageBody(tag1Msgs));
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumerTag2.getListner().getAllMsgBody())) consumerTag2.getListener().getAllMsgBody()))
.containsExactlyElementsIn(MQMessageFactory.getMessageBody(tag2Msgs)); .containsExactlyElementsIn(MQMessageFactory.getMessageBody(tag2Msgs));
} }
...@@ -98,16 +98,16 @@ public class TagMessageWithMulConsumerIT extends BaseConf { ...@@ -98,16 +98,16 @@ public class TagMessageWithMulConsumerIT extends BaseConf {
Assert.assertEquals("Not all are sent", msgSize * tags.length, Assert.assertEquals("Not all are sent", msgSize * tags.length,
producer.getAllUndupMsgBody().size()); producer.getAllUndupMsgBody().size());
consumerTag1.getListner().waitForMessageConsume(tagMessage.getMessageBodyByTag(tags[0]), consumerTag1.getListener().waitForMessageConsume(tagMessage.getMessageBodyByTag(tags[0]),
consumeTime); consumeTime);
consumerTag2.getListner().waitForMessageConsume(tagMessage.getMessageBodyByTag(tags[1]), consumerTag2.getListener().waitForMessageConsume(tagMessage.getMessageBodyByTag(tags[1]),
consumeTime); consumeTime);
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumerTag1.getListner().getAllMsgBody())) consumerTag1.getListener().getAllMsgBody()))
.containsExactlyElementsIn(tagMessage.getMessageBodyByTag(tags[0])); .containsExactlyElementsIn(tagMessage.getMessageBodyByTag(tags[0]));
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumerTag2.getListner().getAllMsgBody())) consumerTag2.getListener().getAllMsgBody()))
.containsExactlyElementsIn(tagMessage.getMessageBodyByTag(tags[1])); .containsExactlyElementsIn(tagMessage.getMessageBodyByTag(tags[1]));
} }
...@@ -129,16 +129,16 @@ public class TagMessageWithMulConsumerIT extends BaseConf { ...@@ -129,16 +129,16 @@ public class TagMessageWithMulConsumerIT extends BaseConf {
Assert.assertEquals("Not all are sent", msgSize * tags.length, Assert.assertEquals("Not all are sent", msgSize * tags.length,
producer.getAllUndupMsgBody().size()); producer.getAllUndupMsgBody().size());
consumerTag1.getListner().waitForMessageConsume(tagMessage.getMessageBodyByTag(tags), consumerTag1.getListener().waitForMessageConsume(tagMessage.getMessageBodyByTag(tags),
consumeTime); consumeTime);
consumerTag2.getListner().waitForMessageConsume(tagMessage.getMessageBodyByTag(tags[0]), consumerTag2.getListener().waitForMessageConsume(tagMessage.getMessageBodyByTag(tags[0]),
consumeTime); consumeTime);
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumerTag1.getListner().getAllMsgBody())) consumerTag1.getListener().getAllMsgBody()))
.containsExactlyElementsIn(tagMessage.getAllTagMessageBody()); .containsExactlyElementsIn(tagMessage.getAllTagMessageBody());
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumerTag2.getListner().getAllMsgBody())) consumerTag2.getListener().getAllMsgBody()))
.containsExactlyElementsIn(tagMessage.getMessageBodyByTag(tags[0])); .containsExactlyElementsIn(tagMessage.getMessageBodyByTag(tags[0]));
} }
...@@ -169,27 +169,27 @@ public class TagMessageWithMulConsumerIT extends BaseConf { ...@@ -169,27 +169,27 @@ public class TagMessageWithMulConsumerIT extends BaseConf {
producer.send(tagMsgs); producer.send(tagMsgs);
Assert.assertEquals("Not all are sent", msgSize * 3, producer.getAllUndupMsgBody().size()); Assert.assertEquals("Not all are sent", msgSize * 3, producer.getAllUndupMsgBody().size());
consumerSubTwoMatchAll.getListner() consumerSubTwoMatchAll.getListener()
.waitForMessageConsume(tagMessage.getMessageBodyByTag(tags), consumeTime); .waitForMessageConsume(tagMessage.getMessageBodyByTag(tags), consumeTime);
consumerSubTwoMachieOne.getListner() consumerSubTwoMachieOne.getListener()
.waitForMessageConsume(tagMessage.getMessageBodyByTag(tags[0]), consumeTime); .waitForMessageConsume(tagMessage.getMessageBodyByTag(tags[0]), consumeTime);
consumerSubTag1.getListner().waitForMessageConsume(tagMessage.getMessageBodyByTag(tags[0]), consumerSubTag1.getListener().waitForMessageConsume(tagMessage.getMessageBodyByTag(tags[0]),
consumeTime); consumeTime);
consumerSubAll.getListner().waitForMessageConsume( consumerSubAll.getListener().waitForMessageConsume(
MQMessageFactory.getMessage(msgsWithNoTag, tagMessage.getAllTagMessageBody()), MQMessageFactory.getMessage(msgsWithNoTag, tagMessage.getAllTagMessageBody()),
consumeTime); consumeTime);
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumerSubTwoMatchAll.getListner().getAllMsgBody())) consumerSubTwoMatchAll.getListener().getAllMsgBody()))
.containsExactlyElementsIn(tagMessage.getAllTagMessageBody()); .containsExactlyElementsIn(tagMessage.getAllTagMessageBody());
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumerSubTwoMachieOne.getListner().getAllMsgBody())) consumerSubTwoMachieOne.getListener().getAllMsgBody()))
.containsExactlyElementsIn(tagMessage.getMessageBodyByTag(tags[0])); .containsExactlyElementsIn(tagMessage.getMessageBodyByTag(tags[0]));
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumerSubTag1.getListner().getAllMsgBody())) consumerSubTag1.getListener().getAllMsgBody()))
.containsExactlyElementsIn(tagMessage.getMessageBodyByTag(tags[0])); .containsExactlyElementsIn(tagMessage.getMessageBodyByTag(tags[0]));
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumerSubAll.getListner().getAllMsgBody())) consumerSubAll.getListener().getAllMsgBody()))
.containsExactlyElementsIn(MQMessageFactory.getMessage(msgsWithNoTag, .containsExactlyElementsIn(MQMessageFactory.getMessage(msgsWithNoTag,
tagMessage.getAllTagMessageBody())); tagMessage.getAllTagMessageBody()));
} }
......
...@@ -61,10 +61,10 @@ public class TagMessageWithSameGroupConsumerIT extends BaseConf { ...@@ -61,10 +61,10 @@ public class TagMessageWithSameGroupConsumerIT extends BaseConf {
new RMQNormalListner(originMsgDCName, msgBodyDCName)); new RMQNormalListner(originMsgDCName, msgBodyDCName));
producer.send(tag, msgSize); producer.send(tag, msgSize);
Assert.assertEquals("Not all are sent", msgSize, producer.getAllUndupMsgBody().size()); Assert.assertEquals("Not all are sent", msgSize, producer.getAllUndupMsgBody().size());
consumer1.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); consumer1.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer1.getListner().getAllMsgBody())) consumer1.getListener().getAllMsgBody()))
.containsExactlyElementsIn(producer.getAllMsgBody()); .containsExactlyElementsIn(producer.getAllMsgBody());
} }
...@@ -82,9 +82,9 @@ public class TagMessageWithSameGroupConsumerIT extends BaseConf { ...@@ -82,9 +82,9 @@ public class TagMessageWithSameGroupConsumerIT extends BaseConf {
new RMQNormalListner(originMsgDCName, msgBodyDCName)); new RMQNormalListner(originMsgDCName, msgBodyDCName));
TestUtils.waitForMoment(5); TestUtils.waitForMoment(5);
consumer1.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); consumer1.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer1.getListner().getAllMsgBody())) consumer1.getListener().getAllMsgBody()))
.containsExactlyElementsIn(producer.getAllMsgBody()); .containsExactlyElementsIn(producer.getAllMsgBody());
} }
...@@ -105,9 +105,9 @@ public class TagMessageWithSameGroupConsumerIT extends BaseConf { ...@@ -105,9 +105,9 @@ public class TagMessageWithSameGroupConsumerIT extends BaseConf {
mqClients.remove(1); mqClients.remove(1);
TestUtils.waitForMoment(5); TestUtils.waitForMoment(5);
consumer1.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); consumer1.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer1.getListner().getAllMsgBody())) consumer1.getListener().getAllMsgBody()))
.containsExactlyElementsIn(producer.getAllMsgBody()); .containsExactlyElementsIn(producer.getAllMsgBody());
} }
} }
...@@ -59,7 +59,7 @@ public class MulConsumerMulTopicIT extends BaseConf { ...@@ -59,7 +59,7 @@ public class MulConsumerMulTopicIT extends BaseConf {
Assert.assertEquals("Not all sent succeeded", msgSize * 2, producer.getAllUndupMsgBody().size()); Assert.assertEquals("Not all sent succeeded", msgSize * 2, producer.getAllUndupMsgBody().size());
boolean recvAll = MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(), boolean recvAll = MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(),
consumer1.getListner(), consumer2.getListner()); consumer1.getListener(), consumer2.getListener());
assertThat(recvAll).isEqualTo(true); assertThat(recvAll).isEqualTo(true);
} }
...@@ -80,7 +80,7 @@ public class MulConsumerMulTopicIT extends BaseConf { ...@@ -80,7 +80,7 @@ public class MulConsumerMulTopicIT extends BaseConf {
Assert.assertEquals("Not all sent succeeded", msgSize * 2, producer.getAllUndupMsgBody().size()); Assert.assertEquals("Not all sent succeeded", msgSize * 2, producer.getAllUndupMsgBody().size());
boolean recvAll = MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(), boolean recvAll = MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(),
consumer1.getListner(), consumer2.getListner()); consumer1.getListener(), consumer2.getListener());
assertThat(recvAll).isEqualTo(true); assertThat(recvAll).isEqualTo(true);
} }
...@@ -102,7 +102,7 @@ public class MulConsumerMulTopicIT extends BaseConf { ...@@ -102,7 +102,7 @@ public class MulConsumerMulTopicIT extends BaseConf {
producer.send(MQMessageFactory.getMsg(topic2, msgSize, tag1)); producer.send(MQMessageFactory.getMsg(topic2, msgSize, tag1));
boolean recvAll = MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(), boolean recvAll = MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(),
consumer1.getListner(), consumer2.getListner()); consumer1.getListener(), consumer2.getListener());
assertThat(recvAll).isEqualTo(true); assertThat(recvAll).isEqualTo(true);
} }
} }
...@@ -55,9 +55,9 @@ public class OneConsumerMulTopicIT extends BaseConf { ...@@ -55,9 +55,9 @@ public class OneConsumerMulTopicIT extends BaseConf {
producer.send(MQMessageFactory.getMsg(topic2, msgSize)); producer.send(MQMessageFactory.getMsg(topic2, msgSize));
Assert.assertEquals("Not all are sent", msgSize * 2, producer.getAllUndupMsgBody().size()); Assert.assertEquals("Not all are sent", msgSize * 2, producer.getAllUndupMsgBody().size());
consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer.getListner().getAllMsgBody())) consumer.getListener().getAllMsgBody()))
.containsExactlyElementsIn(producer.getAllMsgBody()); .containsExactlyElementsIn(producer.getAllMsgBody());
} }
...@@ -74,9 +74,9 @@ public class OneConsumerMulTopicIT extends BaseConf { ...@@ -74,9 +74,9 @@ public class OneConsumerMulTopicIT extends BaseConf {
producer.send(MQMessageFactory.getMsg(topic2, msgSize, tag)); producer.send(MQMessageFactory.getMsg(topic2, msgSize, tag));
Assert.assertEquals("Not all are sent", msgSize * 2, producer.getAllUndupMsgBody().size()); Assert.assertEquals("Not all are sent", msgSize * 2, producer.getAllUndupMsgBody().size());
consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer.getListner().getAllMsgBody())) consumer.getListener().getAllMsgBody()))
.containsExactlyElementsIn(producer.getAllMsgBody()); .containsExactlyElementsIn(producer.getAllMsgBody());
} }
...@@ -96,9 +96,9 @@ public class OneConsumerMulTopicIT extends BaseConf { ...@@ -96,9 +96,9 @@ public class OneConsumerMulTopicIT extends BaseConf {
producer.send(MQMessageFactory.getMsg(topic2, msgSize, tag1)); producer.send(MQMessageFactory.getMsg(topic2, msgSize, tag1));
Assert.assertEquals("Not all are sent", msgSize * 2, producer.getAllUndupMsgBody().size()); Assert.assertEquals("Not all are sent", msgSize * 2, producer.getAllUndupMsgBody().size());
consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer.getListner().getAllMsgBody())) consumer.getListener().getAllMsgBody()))
.containsExactlyElementsIn(producer.getAllMsgBody()); .containsExactlyElementsIn(producer.getAllMsgBody());
} }
} }
...@@ -59,12 +59,12 @@ public class AsyncSendWithMessageQueueIT extends BaseConf { ...@@ -59,12 +59,12 @@ public class AsyncSendWithMessageQueueIT extends BaseConf {
producer.waitForResponse(10 * 1000); producer.waitForResponse(10 * 1000);
assertThat(producer.getSuccessMsgCount()).isEqualTo(msgSize); assertThat(producer.getSuccessMsgCount()).isEqualTo(msgSize);
consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer.getListner().getAllMsgBody())) consumer.getListener().getAllMsgBody()))
.containsExactlyElementsIn(producer.getAllMsgBody()); .containsExactlyElementsIn(producer.getAllMsgBody());
VerifyUtils.verifyMessageQueueId(queueId, consumer.getListner().getAllOriginMsg()); VerifyUtils.verifyMessageQueueId(queueId, consumer.getListener().getAllOriginMsg());
producer.clearMsg(); producer.clearMsg();
consumer.clearMsg(); consumer.clearMsg();
...@@ -74,11 +74,11 @@ public class AsyncSendWithMessageQueueIT extends BaseConf { ...@@ -74,11 +74,11 @@ public class AsyncSendWithMessageQueueIT extends BaseConf {
producer.waitForResponse(10 * 1000); producer.waitForResponse(10 * 1000);
assertThat(producer.getSuccessMsgCount()).isEqualTo(msgSize); assertThat(producer.getSuccessMsgCount()).isEqualTo(msgSize);
consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer.getListner().getAllMsgBody())) consumer.getListener().getAllMsgBody()))
.containsExactlyElementsIn(producer.getAllMsgBody()); .containsExactlyElementsIn(producer.getAllMsgBody());
VerifyUtils.verifyMessageQueueId(queueId, consumer.getListner().getAllOriginMsg()); VerifyUtils.verifyMessageQueueId(queueId, consumer.getListener().getAllOriginMsg());
} }
} }
...@@ -71,12 +71,12 @@ public class AsyncSendWithMessageQueueSelectorIT extends BaseConf { ...@@ -71,12 +71,12 @@ public class AsyncSendWithMessageQueueSelectorIT extends BaseConf {
producer.waitForResponse(5 * 1000); producer.waitForResponse(5 * 1000);
assertThat(producer.getSuccessMsgCount()).isEqualTo(msgSize); assertThat(producer.getSuccessMsgCount()).isEqualTo(msgSize);
consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer.getListner().getAllMsgBody())) consumer.getListener().getAllMsgBody()))
.containsExactlyElementsIn(producer.getAllMsgBody()); .containsExactlyElementsIn(producer.getAllMsgBody());
VerifyUtils.verifyMessageQueueId(queueId, consumer.getListner().getAllOriginMsg()); VerifyUtils.verifyMessageQueueId(queueId, consumer.getListener().getAllOriginMsg());
producer.clearMsg(); producer.clearMsg();
consumer.clearMsg(); consumer.clearMsg();
...@@ -96,11 +96,11 @@ public class AsyncSendWithMessageQueueSelectorIT extends BaseConf { ...@@ -96,11 +96,11 @@ public class AsyncSendWithMessageQueueSelectorIT extends BaseConf {
producer.waitForResponse(5 * 1000); producer.waitForResponse(5 * 1000);
assertThat(producer.getSuccessMsgCount()).isEqualTo(msgSize); assertThat(producer.getSuccessMsgCount()).isEqualTo(msgSize);
consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer.getListner().getAllMsgBody())) consumer.getListener().getAllMsgBody()))
.containsExactlyElementsIn(producer.getAllMsgBody()); .containsExactlyElementsIn(producer.getAllMsgBody());
VerifyUtils.verifyMessageQueueId(queueId, consumer.getListner().getAllOriginMsg()); VerifyUtils.verifyMessageQueueId(queueId, consumer.getListener().getAllOriginMsg());
} }
} }
...@@ -55,9 +55,9 @@ public class AsyncSendWithOnlySendCallBackIT extends BaseConf { ...@@ -55,9 +55,9 @@ public class AsyncSendWithOnlySendCallBackIT extends BaseConf {
producer.waitForResponse(10 * 1000); producer.waitForResponse(10 * 1000);
assertThat(producer.getSuccessMsgCount()).isEqualTo(msgSize); assertThat(producer.getSuccessMsgCount()).isEqualTo(msgSize);
consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer.getListner().getAllMsgBody())) consumer.getListener().getAllMsgBody()))
.containsExactlyElementsIn(producer.getAllMsgBody()); .containsExactlyElementsIn(producer.getAllMsgBody());
} }
......
...@@ -63,10 +63,10 @@ public class MessageUserPropIT extends BaseConf { ...@@ -63,10 +63,10 @@ public class MessageUserPropIT extends BaseConf {
producer.send(msg, null); producer.send(msg, null);
assertThat(producer.getAllMsgBody().size()).isEqualTo(1); assertThat(producer.getAllMsgBody().size()).isEqualTo(1);
consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
Message sendMsg = (Message) producer.getFirstMsg(); Message sendMsg = (Message) producer.getFirstMsg();
Message recvMsg = (Message) consumer.getListner().getFirstMsg(); Message recvMsg = (Message) consumer.getListener().getFirstMsg();
assertThat(recvMsg.getUserProperty(msgKey)).isEqualTo(sendMsg.getUserProperty(msgKey)); assertThat(recvMsg.getUserProperty(msgKey)).isEqualTo(sendMsg.getUserProperty(msgKey));
} }
...@@ -85,10 +85,10 @@ public class MessageUserPropIT extends BaseConf { ...@@ -85,10 +85,10 @@ public class MessageUserPropIT extends BaseConf {
producer.send(msg, null); producer.send(msg, null);
assertThat(producer.getAllMsgBody().size()).isEqualTo(1); assertThat(producer.getAllMsgBody().size()).isEqualTo(1);
consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
Message sendMsg = (Message) producer.getFirstMsg(); Message sendMsg = (Message) producer.getFirstMsg();
Message recvMsg = (Message) consumer.getListner().getFirstMsg(); Message recvMsg = (Message) consumer.getListener().getFirstMsg();
assertThat(recvMsg.getUserProperty(msgKey)).isEqualTo(sendMsg.getUserProperty(msgKey)); assertThat(recvMsg.getUserProperty(msgKey)).isEqualTo(sendMsg.getUserProperty(msgKey));
} }
} }
...@@ -56,9 +56,9 @@ public class OneWaySendIT extends BaseConf { ...@@ -56,9 +56,9 @@ public class OneWaySendIT extends BaseConf {
producer.waitForResponse(5 * 1000); producer.waitForResponse(5 * 1000);
assertThat(producer.getAllMsgBody().size()).isEqualTo(msgSize); assertThat(producer.getAllMsgBody().size()).isEqualTo(msgSize);
consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer.getListner().getAllMsgBody())) consumer.getListener().getAllMsgBody()))
.containsExactlyElementsIn(producer.getAllMsgBody()); .containsExactlyElementsIn(producer.getAllMsgBody());
} }
} }
...@@ -59,9 +59,9 @@ public class OneWaySendWithMQIT extends BaseConf { ...@@ -59,9 +59,9 @@ public class OneWaySendWithMQIT extends BaseConf {
producer.sendOneWay(msgSize, mq); producer.sendOneWay(msgSize, mq);
producer.waitForResponse(5 * 1000); producer.waitForResponse(5 * 1000);
consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer.getListner().getAllMsgBody())) consumer.getListener().getAllMsgBody()))
.containsExactlyElementsIn(producer.getAllMsgBody()); .containsExactlyElementsIn(producer.getAllMsgBody());
producer.clearMsg(); producer.clearMsg();
...@@ -71,9 +71,9 @@ public class OneWaySendWithMQIT extends BaseConf { ...@@ -71,9 +71,9 @@ public class OneWaySendWithMQIT extends BaseConf {
producer.asyncSend(msgSize, mq); producer.asyncSend(msgSize, mq);
producer.waitForResponse(5 * 1000); producer.waitForResponse(5 * 1000);
consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer.getListner().getAllMsgBody())) consumer.getListener().getAllMsgBody()))
.containsExactlyElementsIn(producer.getAllMsgBody()); .containsExactlyElementsIn(producer.getAllMsgBody());
} }
} }
...@@ -71,12 +71,12 @@ public class OneWaySendWithSelectorIT extends BaseConf { ...@@ -71,12 +71,12 @@ public class OneWaySendWithSelectorIT extends BaseConf {
}); });
assertThat(producer.getAllMsgBody().size()).isEqualTo(msgSize); assertThat(producer.getAllMsgBody().size()).isEqualTo(msgSize);
consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer.getListner().getAllMsgBody())) consumer.getListener().getAllMsgBody()))
.containsExactlyElementsIn(producer.getAllMsgBody()); .containsExactlyElementsIn(producer.getAllMsgBody());
VerifyUtils.verifyMessageQueueId(queueId, consumer.getListner().getAllOriginMsg()); VerifyUtils.verifyMessageQueueId(queueId, consumer.getListener().getAllOriginMsg());
producer.clearMsg(); producer.clearMsg();
consumer.clearMsg(); consumer.clearMsg();
...@@ -94,11 +94,11 @@ public class OneWaySendWithSelectorIT extends BaseConf { ...@@ -94,11 +94,11 @@ public class OneWaySendWithSelectorIT extends BaseConf {
}); });
assertThat(producer.getAllMsgBody().size()).isEqualTo(msgSize); assertThat(producer.getAllMsgBody().size()).isEqualTo(msgSize);
consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer.getListner().getAllMsgBody())) consumer.getListener().getAllMsgBody()))
.containsExactlyElementsIn(producer.getAllMsgBody()); .containsExactlyElementsIn(producer.getAllMsgBody());
VerifyUtils.verifyMessageQueueId(queueId, consumer.getListner().getAllOriginMsg()); VerifyUtils.verifyMessageQueueId(queueId, consumer.getListener().getAllOriginMsg());
} }
} }
...@@ -63,20 +63,20 @@ public class OrderMsgDynamicRebalanceIT extends BaseConf { ...@@ -63,20 +63,20 @@ public class OrderMsgDynamicRebalanceIT extends BaseConf {
MessageQueueMsg mqMsgs = new MessageQueueMsg(mqs, msgSize); MessageQueueMsg mqMsgs = new MessageQueueMsg(mqs, msgSize);
producer.send(mqMsgs.getMsgsWithMQ()); producer.send(mqMsgs.getMsgsWithMQ());
MQWait.waitConsumeAll(30 * 1000, producer.getAllMsgBody(), consumer1.getListner(), MQWait.waitConsumeAll(30 * 1000, producer.getAllMsgBody(), consumer1.getListener(),
consumer2.getListner()); consumer2.getListener());
consumer2.shutdown(); consumer2.shutdown();
mqMsgs = new MessageQueueMsg(mqs, msgSize); mqMsgs = new MessageQueueMsg(mqs, msgSize);
producer.send(mqMsgs.getMsgsWithMQ()); producer.send(mqMsgs.getMsgsWithMQ());
boolean recvAll = MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(), boolean recvAll = MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(),
consumer1.getListner(), consumer2.getListner()); consumer1.getListener(), consumer2.getListener());
assertThat(recvAll).isEqualTo(true); assertThat(recvAll).isEqualTo(true);
assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) consumer1.getListner()).getMsgs())) assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) consumer1.getListener()).getMsgs()))
.isEqualTo(true); .isEqualTo(true);
assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) consumer2.getListner()).getMsgs())) assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) consumer2.getListener()).getMsgs()))
.isEqualTo(true); .isEqualTo(true);
} }
...@@ -94,22 +94,22 @@ public class OrderMsgDynamicRebalanceIT extends BaseConf { ...@@ -94,22 +94,22 @@ public class OrderMsgDynamicRebalanceIT extends BaseConf {
MessageQueueMsg mqMsgs = new MessageQueueMsg(mqs, msgSize); MessageQueueMsg mqMsgs = new MessageQueueMsg(mqs, msgSize);
producer.send(mqMsgs.getMsgsWithMQ()); producer.send(mqMsgs.getMsgsWithMQ());
MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(), consumer1.getListner(), MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(), consumer1.getListener(),
consumer2.getListner(), consumer3.getListner()); consumer2.getListener(), consumer3.getListener());
consumer3.shutdown(); consumer3.shutdown();
mqMsgs = new MessageQueueMsg(mqs, msgSize); mqMsgs = new MessageQueueMsg(mqs, msgSize);
producer.send(mqMsgs.getMsgsWithMQ()); producer.send(mqMsgs.getMsgsWithMQ());
boolean recvAll = MQWait.waitConsumeAll(30 * 1000, producer.getAllMsgBody(), boolean recvAll = MQWait.waitConsumeAll(30 * 1000, producer.getAllMsgBody(),
consumer1.getListner(), consumer2.getListner(), consumer3.getListner()); consumer1.getListener(), consumer2.getListener(), consumer3.getListener());
assertThat(recvAll).isEqualTo(true); assertThat(recvAll).isEqualTo(true);
assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) consumer1.getListner()).getMsgs())) assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) consumer1.getListener()).getMsgs()))
.isEqualTo(true); .isEqualTo(true);
assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) consumer2.getListner()).getMsgs())) assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) consumer2.getListener()).getMsgs()))
.isEqualTo(true); .isEqualTo(true);
assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) consumer3.getListner()).getMsgs())) assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) consumer3.getListener()).getMsgs()))
.isEqualTo(true); .isEqualTo(true);
} }
} }
...@@ -59,13 +59,13 @@ public class OrderMsgIT extends BaseConf { ...@@ -59,13 +59,13 @@ public class OrderMsgIT extends BaseConf {
MessageQueueMsg mqMsgs = new MessageQueueMsg(mqs, msgSize); MessageQueueMsg mqMsgs = new MessageQueueMsg(mqs, msgSize);
producer.send(mqMsgs.getMsgsWithMQ()); producer.send(mqMsgs.getMsgsWithMQ());
consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer.getListner().getAllMsgBody())) consumer.getListener().getAllMsgBody()))
.containsExactlyElementsIn(mqMsgs.getMsgBodys()); .containsExactlyElementsIn(mqMsgs.getMsgBodys());
assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) consumer.getListner()).getMsgs())) assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) consumer.getListener()).getMsgs()))
.isEqualTo(true); .isEqualTo(true);
} }
...@@ -77,13 +77,13 @@ public class OrderMsgIT extends BaseConf { ...@@ -77,13 +77,13 @@ public class OrderMsgIT extends BaseConf {
msgSize); msgSize);
producer.send(mqMsgs.getMsgsWithMQ()); producer.send(mqMsgs.getMsgsWithMQ());
consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer.getListner().getAllMsgBody())) consumer.getListener().getAllMsgBody()))
.containsExactlyElementsIn(mqMsgs.getMsgBodys()); .containsExactlyElementsIn(mqMsgs.getMsgBodys());
assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) consumer.getListner()).getMsgs())) assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) consumer.getListener()).getMsgs()))
.isEqualTo(true); .isEqualTo(true);
} }
...@@ -96,13 +96,13 @@ public class OrderMsgIT extends BaseConf { ...@@ -96,13 +96,13 @@ public class OrderMsgIT extends BaseConf {
msgSize); msgSize);
producer.send(mqMsgs.getMsgsWithMQ()); producer.send(mqMsgs.getMsgsWithMQ());
consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer.getListner().getAllMsgBody())) consumer.getListener().getAllMsgBody()))
.containsExactlyElementsIn(mqMsgs.getMsgBodys()); .containsExactlyElementsIn(mqMsgs.getMsgBodys());
assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) consumer.getListner()).getMsgs())) assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) consumer.getListener()).getMsgs()))
.isEqualTo(true); .isEqualTo(true);
} }
} }
...@@ -64,19 +64,19 @@ public class OrderMsgRebalanceIT extends BaseConf { ...@@ -64,19 +64,19 @@ public class OrderMsgRebalanceIT extends BaseConf {
producer.send(mqMsgs.getMsgsWithMQ()); producer.send(mqMsgs.getMsgsWithMQ());
boolean recvAll = MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(), boolean recvAll = MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(),
consumer1.getListner(), consumer2.getListner()); consumer1.getListener(), consumer2.getListener());
assertThat(recvAll).isEqualTo(true); assertThat(recvAll).isEqualTo(true);
boolean balance = VerifyUtils.verifyBalance(producer.getAllMsgBody().size(), boolean balance = VerifyUtils.verifyBalance(producer.getAllMsgBody().size(),
VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer1.getListner().getAllUndupMsgBody()).size(), consumer1.getListener().getAllUndupMsgBody()).size(),
VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer2.getListner().getAllUndupMsgBody()).size()); consumer2.getListener().getAllUndupMsgBody()).size());
assertThat(balance).isEqualTo(true); assertThat(balance).isEqualTo(true);
assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) consumer1.getListner()).getMsgs())) assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) consumer1.getListener()).getMsgs()))
.isEqualTo(true); .isEqualTo(true);
assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) consumer2.getListner()).getMsgs())) assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) consumer2.getListener()).getMsgs()))
.isEqualTo(true); .isEqualTo(true);
} }
...@@ -97,36 +97,36 @@ public class OrderMsgRebalanceIT extends BaseConf { ...@@ -97,36 +97,36 @@ public class OrderMsgRebalanceIT extends BaseConf {
producer.send(mqMsgs.getMsgsWithMQ()); producer.send(mqMsgs.getMsgsWithMQ());
boolean recvAll = MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(), boolean recvAll = MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(),
consumer1.getListner(), consumer2.getListner(), consumer3.getListner(), consumer1.getListener(), consumer2.getListener(), consumer3.getListener(),
consumer4.getListner()); consumer4.getListener());
assertThat(recvAll).isEqualTo(true); assertThat(recvAll).isEqualTo(true);
boolean balance = VerifyUtils boolean balance = VerifyUtils
.verifyBalance(producer.getAllMsgBody().size(), .verifyBalance(producer.getAllMsgBody().size(),
VerifyUtils VerifyUtils
.getFilterdMessage(producer.getAllMsgBody(), .getFilterdMessage(producer.getAllMsgBody(),
consumer1.getListner().getAllUndupMsgBody()) consumer1.getListener().getAllUndupMsgBody())
.size(), .size(),
VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer2.getListner().getAllUndupMsgBody()).size(), consumer2.getListener().getAllUndupMsgBody()).size(),
VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer3.getListner().getAllUndupMsgBody()).size(), consumer3.getListener().getAllUndupMsgBody()).size(),
VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer4.getListner().getAllUndupMsgBody()).size()); consumer4.getListener().getAllUndupMsgBody()).size());
logger.info(String.format("consumer1:%s;consumer2:%s;consumer3:%s,consumer4:%s", logger.info(String.format("consumer1:%s;consumer2:%s;consumer3:%s,consumer4:%s",
consumer1.getListner().getAllMsgBody().size(), consumer1.getListener().getAllMsgBody().size(),
consumer2.getListner().getAllMsgBody().size(), consumer2.getListener().getAllMsgBody().size(),
consumer3.getListner().getAllMsgBody().size(), consumer3.getListener().getAllMsgBody().size(),
consumer4.getListner().getAllMsgBody().size())); consumer4.getListener().getAllMsgBody().size()));
assertThat(balance).isEqualTo(true); assertThat(balance).isEqualTo(true);
assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) consumer1.getListner()).getMsgs())) assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) consumer1.getListener()).getMsgs()))
.isEqualTo(true); .isEqualTo(true);
assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) consumer2.getListner()).getMsgs())) assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) consumer2.getListener()).getMsgs()))
.isEqualTo(true); .isEqualTo(true);
assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) consumer3.getListner()).getMsgs())) assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) consumer3.getListener()).getMsgs()))
.isEqualTo(true); .isEqualTo(true);
assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) consumer4.getListner()).getMsgs())) assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) consumer4.getListener()).getMsgs()))
.isEqualTo(true); .isEqualTo(true);
} }
......
...@@ -60,13 +60,13 @@ public class OrderMsgWithTagIT extends BaseConf { ...@@ -60,13 +60,13 @@ public class OrderMsgWithTagIT extends BaseConf {
MessageQueueMsg mqMsgs = new MessageQueueMsg(mqs, msgSize, tag); MessageQueueMsg mqMsgs = new MessageQueueMsg(mqs, msgSize, tag);
producer.send(mqMsgs.getMsgsWithMQ()); producer.send(mqMsgs.getMsgsWithMQ());
consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer.getListner().getAllMsgBody())) consumer.getListener().getAllMsgBody()))
.containsExactlyElementsIn(mqMsgs.getMsgBodys()); .containsExactlyElementsIn(mqMsgs.getMsgBodys());
assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) consumer.getListner()).getMsgs())) assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) consumer.getListener()).getMsgs()))
.isEqualTo(true); .isEqualTo(true);
} }
...@@ -80,13 +80,13 @@ public class OrderMsgWithTagIT extends BaseConf { ...@@ -80,13 +80,13 @@ public class OrderMsgWithTagIT extends BaseConf {
MessageQueueMsg mqMsgs = new MessageQueueMsg(mqs, msgSize, tag); MessageQueueMsg mqMsgs = new MessageQueueMsg(mqs, msgSize, tag);
producer.send(mqMsgs.getMsgsWithMQ()); producer.send(mqMsgs.getMsgsWithMQ());
consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer.getListner().getAllMsgBody())) consumer.getListener().getAllMsgBody()))
.containsExactlyElementsIn(mqMsgs.getMsgBodys()); .containsExactlyElementsIn(mqMsgs.getMsgBodys());
assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) consumer.getListner()).getMsgs())) assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) consumer.getListener()).getMsgs()))
.isEqualTo(true); .isEqualTo(true);
} }
...@@ -106,13 +106,13 @@ public class OrderMsgWithTagIT extends BaseConf { ...@@ -106,13 +106,13 @@ public class OrderMsgWithTagIT extends BaseConf {
mqMsgs = new MessageQueueMsg(mqs, msgSize, tag1); mqMsgs = new MessageQueueMsg(mqs, msgSize, tag1);
producer.send(mqMsgs.getMsgsWithMQ()); producer.send(mqMsgs.getMsgsWithMQ());
consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer.getListner().getAllMsgBody())) consumer.getListener().getAllMsgBody()))
.containsExactlyElementsIn(mqMsgs.getMsgBodys()); .containsExactlyElementsIn(mqMsgs.getMsgBodys());
assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) consumer.getListner()).getMsgs())) assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) consumer.getListener()).getMsgs()))
.isEqualTo(true); .isEqualTo(true);
} }
...@@ -134,12 +134,12 @@ public class OrderMsgWithTagIT extends BaseConf { ...@@ -134,12 +134,12 @@ public class OrderMsgWithTagIT extends BaseConf {
producer.send(mqMsgs.getMsgsWithMQ()); producer.send(mqMsgs.getMsgsWithMQ());
boolean recvAll = MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(), boolean recvAll = MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(),
consumer1.getListner(), consumer2.getListner()); consumer1.getListener(), consumer2.getListener());
assertThat(recvAll).isEqualTo(true); assertThat(recvAll).isEqualTo(true);
assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) consumer1.getListner()).getMsgs())) assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) consumer1.getListener()).getMsgs()))
.isEqualTo(true); .isEqualTo(true);
assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) consumer2.getListner()).getMsgs())) assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) consumer2.getListener()).getMsgs()))
.isEqualTo(true); .isEqualTo(true);
} }
...@@ -160,10 +160,10 @@ public class OrderMsgWithTagIT extends BaseConf { ...@@ -160,10 +160,10 @@ public class OrderMsgWithTagIT extends BaseConf {
producer.send(mqMsgs.getMsgsWithMQ()); producer.send(mqMsgs.getMsgsWithMQ());
boolean recvAll = MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(), boolean recvAll = MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(),
consumer.getListner()); consumer.getListener());
assertThat(recvAll).isEqualTo(true); assertThat(recvAll).isEqualTo(true);
assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) consumer.getListner()).getMsgs())) assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) consumer.getListener()).getMsgs()))
.isEqualTo(true); .isEqualTo(true);
} }
} }
...@@ -57,11 +57,11 @@ public class QueryMsgByIdIT extends BaseConf { ...@@ -57,11 +57,11 @@ public class QueryMsgByIdIT extends BaseConf {
int msgSize = 20; int msgSize = 20;
producer.send(msgSize); producer.send(msgSize);
Assert.assertEquals("Not all are sent", msgSize, producer.getAllUndupMsgBody().size()); Assert.assertEquals("Not all are sent", msgSize, producer.getAllUndupMsgBody().size());
consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
Assert.assertEquals("Not all are consumed", 0, VerifyUtils.verify(producer.getAllMsgBody(), Assert.assertEquals("Not all are consumed", 0, VerifyUtils.verify(producer.getAllMsgBody(),
consumer.getListner().getAllMsgBody())); consumer.getListener().getAllMsgBody()));
MessageExt recvMsg = (MessageExt) consumer.getListner().getFirstMsg(); MessageExt recvMsg = (MessageExt) consumer.getListener().getFirstMsg();
MessageExt queryMsg = null; MessageExt queryMsg = null;
try { try {
TestUtils.waitForMoment(3000); TestUtils.waitForMoment(3000);
......
...@@ -57,12 +57,12 @@ public class NormalMsgDelayIT extends DelayConf { ...@@ -57,12 +57,12 @@ public class NormalMsgDelayIT extends DelayConf {
producer.send(delayMsgs); producer.send(delayMsgs);
Assert.assertEquals("Not all sent succeeded", msgSize, producer.getAllUndupMsgBody().size()); Assert.assertEquals("Not all sent succeeded", msgSize, producer.getAllUndupMsgBody().size());
consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
Assert.assertEquals("Not all are consumed", 0, VerifyUtils.verify(producer.getAllMsgBody(), Assert.assertEquals("Not all are consumed", 0, VerifyUtils.verify(producer.getAllMsgBody(),
consumer.getListner().getAllMsgBody())); consumer.getListener().getAllMsgBody()));
Assert.assertEquals("Timer is not correct", true, Assert.assertEquals("Timer is not correct", true,
VerifyUtils.verifyDelay(DELAY_LEVEL[delayLevel - 1] * 1000, VerifyUtils.verifyDelay(DELAY_LEVEL[delayLevel - 1] * 1000,
((RMQDelayListner) consumer.getListner()).getMsgDelayTimes())); ((RMQDelayListner) consumer.getListener()).getMsgDelayTimes()));
} }
@Test @Test
...@@ -72,13 +72,13 @@ public class NormalMsgDelayIT extends DelayConf { ...@@ -72,13 +72,13 @@ public class NormalMsgDelayIT extends DelayConf {
producer.send(delayMsgs); producer.send(delayMsgs);
Assert.assertEquals("Not all sent succeeded", msgSize, producer.getAllUndupMsgBody().size()); Assert.assertEquals("Not all sent succeeded", msgSize, producer.getAllUndupMsgBody().size());
consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(),
DELAY_LEVEL[delayLevel - 1] * 1000 * 2); DELAY_LEVEL[delayLevel - 1] * 1000 * 2);
Assert.assertEquals("Not all are consumed", 0, VerifyUtils.verify(producer.getAllMsgBody(), Assert.assertEquals("Not all are consumed", 0, VerifyUtils.verify(producer.getAllMsgBody(),
consumer.getListner().getAllMsgBody())); consumer.getListener().getAllMsgBody()));
Assert.assertEquals("Timer is not correct", true, Assert.assertEquals("Timer is not correct", true,
VerifyUtils.verifyDelay(DELAY_LEVEL[delayLevel - 1] * 1000, VerifyUtils.verifyDelay(DELAY_LEVEL[delayLevel - 1] * 1000,
((RMQDelayListner) consumer.getListner()).getMsgDelayTimes())); ((RMQDelayListner) consumer.getListener()).getMsgDelayTimes()));
} }
@Test @Test
...@@ -88,13 +88,13 @@ public class NormalMsgDelayIT extends DelayConf { ...@@ -88,13 +88,13 @@ public class NormalMsgDelayIT extends DelayConf {
producer.send(delayMsgs); producer.send(delayMsgs);
Assert.assertEquals("Not all are sent", msgSize, producer.getAllUndupMsgBody().size()); Assert.assertEquals("Not all are sent", msgSize, producer.getAllUndupMsgBody().size());
consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(),
DELAY_LEVEL[delayLevel - 1] * 1000 * 2); DELAY_LEVEL[delayLevel - 1] * 1000 * 2);
Assert.assertEquals("Not all are consumed", 0, VerifyUtils.verify(producer.getAllMsgBody(), Assert.assertEquals("Not all are consumed", 0, VerifyUtils.verify(producer.getAllMsgBody(),
consumer.getListner().getAllMsgBody())); consumer.getListener().getAllMsgBody()));
Assert.assertEquals("Timer is not correct", true, Assert.assertEquals("Timer is not correct", true,
VerifyUtils.verifyDelay(DELAY_LEVEL[delayLevel - 1] * 1000, VerifyUtils.verifyDelay(DELAY_LEVEL[delayLevel - 1] * 1000,
((RMQDelayListner) consumer.getListner()).getMsgDelayTimes())); ((RMQDelayListner) consumer.getListener()).getMsgDelayTimes()));
} }
@Test @Test
...@@ -104,12 +104,12 @@ public class NormalMsgDelayIT extends DelayConf { ...@@ -104,12 +104,12 @@ public class NormalMsgDelayIT extends DelayConf {
producer.send(delayMsgs); producer.send(delayMsgs);
Assert.assertEquals("Not all are sent", msgSize, producer.getAllUndupMsgBody().size()); Assert.assertEquals("Not all are sent", msgSize, producer.getAllUndupMsgBody().size());
consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(),
DELAY_LEVEL[delayLevel - 1] * 1000 * 2); DELAY_LEVEL[delayLevel - 1] * 1000 * 2);
Assert.assertEquals("Not all are consumed", 0, VerifyUtils.verify(producer.getAllMsgBody(), Assert.assertEquals("Not all are consumed", 0, VerifyUtils.verify(producer.getAllMsgBody(),
consumer.getListner().getAllMsgBody())); consumer.getListener().getAllMsgBody()));
Assert.assertEquals("Timer is not correct", true, Assert.assertEquals("Timer is not correct", true,
VerifyUtils.verifyDelay(DELAY_LEVEL[delayLevel - 1] * 1000, VerifyUtils.verifyDelay(DELAY_LEVEL[delayLevel - 1] * 1000,
((RMQDelayListner) consumer.getListner()).getMsgDelayTimes())); ((RMQDelayListner) consumer.getListener()).getMsgDelayTimes()));
} }
} }
...@@ -54,9 +54,9 @@ public class NormalMessageSendAndRecvIT extends BaseConf { ...@@ -54,9 +54,9 @@ public class NormalMessageSendAndRecvIT extends BaseConf {
int msgSize = 10; int msgSize = 10;
producer.send(msgSize); producer.send(msgSize);
Assert.assertEquals("Not all sent succeeded", msgSize, producer.getAllUndupMsgBody().size()); Assert.assertEquals("Not all sent succeeded", msgSize, producer.getAllUndupMsgBody().size());
consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer.getListner().getAllMsgBody())) consumer.getListener().getAllMsgBody()))
.containsExactlyElementsIn(producer.getAllMsgBody()); .containsExactlyElementsIn(producer.getAllMsgBody());
} }
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册