提交 6a9628b3 编写于 作者: V vsair 提交者: dongeforever

[ROCKETMQ-179] Fix errors of IT test cases closes apache/incubator-rocketmq#94

上级 58f1574b
...@@ -461,10 +461,6 @@ ...@@ -461,10 +461,6 @@
<argLine>@{failsafeArgLine}</argLine> <argLine>@{failsafeArgLine}</argLine>
<excludes> <excludes>
<exclude>**/NormalMsgDelayIT.java</exclude> <exclude>**/NormalMsgDelayIT.java</exclude>
<exclude>**/BroadCastNormalMsgNotRecvIT.java</exclude>
<exclude>**/TagMessageWithSameGroupConsumerIT.java</exclude>
<exclude>**/AsyncSendWithMessageQueueSelectorIT.java</exclude>
<exclude>**/AsyncSendWithMessageQueueIT.java</exclude>
</excludes> </excludes>
</configuration> </configuration>
<executions> <executions>
......
...@@ -91,11 +91,21 @@ public abstract class MQCollector { ...@@ -91,11 +91,21 @@ public abstract class MQCollector {
} }
public void clearMsg() { public void clearMsg() {
msgBodys.resetData(); if (msgBodys != null) {
originMsgs.resetData(); msgBodys.resetData();
errorMsgs.resetData(); }
originMsgIndex.clear(); if (originMsgs != null) {
msgRTs.resetData(); originMsgs.resetData();
}
if (originMsgs != null) {
errorMsgs.resetData();
}
if (originMsgIndex != null) {
originMsgIndex.clear();
}
if (msgRTs != null) {
msgRTs.resetData();
}
} }
public void lockCollectors() { public void lockCollectors() {
......
...@@ -63,7 +63,9 @@ public class RMQNormalListner extends AbstractListener implements MessageListene ...@@ -63,7 +63,9 @@ public class RMQNormalListner extends AbstractListener implements MessageListene
msgBodys.addData(new String(msg.getBody())); msgBodys.addData(new String(msg.getBody()));
originMsgs.addData(msg); originMsgs.addData(msg);
originMsgIndex.put(new String(msg.getBody()), msg); if (originMsgIndex != null) {
originMsgIndex.put(new String(msg.getBody()), msg);
}
} }
return consumeStatus; return consumeStatus;
} }
......
...@@ -45,7 +45,6 @@ public class MQAdmin { ...@@ -45,7 +45,6 @@ public class MQAdmin {
mqAdminExt.start(); mqAdminExt.start();
mqAdminExt.createTopic(clusterName, topic, queueNum); mqAdminExt.createTopic(clusterName, topic, queueNum);
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace();
} }
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
......
...@@ -46,6 +46,8 @@ public class IntegrationTestBase { ...@@ -46,6 +46,8 @@ public class IntegrationTestBase {
protected static final List<BrokerController> BROKER_CONTROLLERS = new ArrayList<>(); protected static final List<BrokerController> BROKER_CONTROLLERS = new ArrayList<>();
protected static final List<NamesrvController> NAMESRV_CONTROLLERS = new ArrayList<>(); protected static final List<NamesrvController> NAMESRV_CONTROLLERS = new ArrayList<>();
protected static int topicCreateTime = 30 * 1000; protected static int topicCreateTime = 30 * 1000;
protected static final int COMMIT_LOG_SIZE = 1024 * 1024 * 256;
protected static final int INDEX_NUM = 1000;
protected static Random random = new Random(); protected static Random random = new Random();
...@@ -53,18 +55,30 @@ public class IntegrationTestBase { ...@@ -53,18 +55,30 @@ public class IntegrationTestBase {
Runtime.getRuntime().addShutdownHook(new Thread() { Runtime.getRuntime().addShutdownHook(new Thread() {
@Override public void run() { @Override public void run() {
for (NamesrvController namesrvController : NAMESRV_CONTROLLERS) { try {
if (namesrvController != null) { for (BrokerController brokerController : BROKER_CONTROLLERS) {
namesrvController.shutdown(); if (brokerController != null) {
brokerController.shutdown();
}
} }
}
for (BrokerController brokerController : BROKER_CONTROLLERS) { // should destroy message store, otherwise could not delete the temp files.
if (brokerController != null) { for (BrokerController brokerController : BROKER_CONTROLLERS) {
brokerController.shutdown(); if (brokerController != null) {
brokerController.getMessageStore().destroy();
}
} }
}
for (File file : TMPE_FILES) { for (NamesrvController namesrvController : NAMESRV_CONTROLLERS) {
deleteFile(file); if (namesrvController != null) {
namesrvController.shutdown();
}
}
for (File file : TMPE_FILES) {
deleteFile(file);
}
} catch (Exception e){
logger.error("Shutdown error", e);
} }
} }
}); });
...@@ -75,7 +89,7 @@ public class IntegrationTestBase { ...@@ -75,7 +89,7 @@ public class IntegrationTestBase {
String baseDir = System.getProperty("user.home") + SEP + "unitteststore-" + UUID.randomUUID(); String baseDir = System.getProperty("user.home") + SEP + "unitteststore-" + UUID.randomUUID();
final File file = new File(baseDir); final File file = new File(baseDir);
if (file.exists()) { if (file.exists()) {
logger.info(String.format("[%s] has already existed, please bake up and remove it for integration tests", baseDir)); logger.info(String.format("[%s] has already existed, please back up and remove it for integration tests", baseDir));
System.exit(1); System.exit(1);
} }
TMPE_FILES.add(file); TMPE_FILES.add(file);
...@@ -116,6 +130,9 @@ public class IntegrationTestBase { ...@@ -116,6 +130,9 @@ public class IntegrationTestBase {
storeConfig.setStorePathRootDir(baseDir); storeConfig.setStorePathRootDir(baseDir);
storeConfig.setStorePathCommitLog(baseDir + SEP + "commitlog"); storeConfig.setStorePathCommitLog(baseDir + SEP + "commitlog");
storeConfig.setHaListenPort(8000 + random.nextInt(1000)); storeConfig.setHaListenPort(8000 + random.nextInt(1000));
storeConfig.setMapedFileSizeCommitLog(COMMIT_LOG_SIZE);
storeConfig.setMaxIndexNum(INDEX_NUM);
storeConfig.setMaxHashSlotNum(INDEX_NUM * 4);
nettyServerConfig.setListenPort(10000 + random.nextInt(1000)); nettyServerConfig.setListenPort(10000 + random.nextInt(1000));
BrokerController brokerController = new BrokerController(brokerConfig, nettyServerConfig, nettyClientConfig, storeConfig); BrokerController brokerController = new BrokerController(brokerConfig, nettyServerConfig, nettyClientConfig, storeConfig);
try { try {
......
...@@ -36,6 +36,7 @@ public class TagMessageWithSameGroupConsumerIT extends BaseConf { ...@@ -36,6 +36,7 @@ public class TagMessageWithSameGroupConsumerIT extends BaseConf {
private static Logger logger = Logger.getLogger(TagMessageWith1ConsumerIT.class); private static Logger logger = Logger.getLogger(TagMessageWith1ConsumerIT.class);
private RMQNormalProducer producer = null; private RMQNormalProducer producer = null;
private String topic = null; private String topic = null;
private String tag = "tag";
@Before @Before
public void setUp() { public void setUp() {
...@@ -51,13 +52,12 @@ public class TagMessageWithSameGroupConsumerIT extends BaseConf { ...@@ -51,13 +52,12 @@ public class TagMessageWithSameGroupConsumerIT extends BaseConf {
@Test @Test
public void testTwoConsumerWithSameGroup() { public void testTwoConsumerWithSameGroup() {
String tag = "jueyin";
int msgSize = 20; int msgSize = 20;
String originMsgDCName = RandomUtils.getStringByUUID(); String originMsgDCName = RandomUtils.getStringByUUID();
String msgBodyDCName = RandomUtils.getStringByUUID(); String msgBodyDCName = RandomUtils.getStringByUUID();
RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, tag, RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, tag,
new RMQNormalListner(originMsgDCName, msgBodyDCName)); new RMQNormalListner(originMsgDCName, msgBodyDCName));
RMQNormalConsumer consumer2 = getConsumer(nsAddr, consumer1.getConsumerGroup(), tag, getConsumer(nsAddr, consumer1.getConsumerGroup(), tag,
new RMQNormalListner(originMsgDCName, msgBodyDCName)); new RMQNormalListner(originMsgDCName, msgBodyDCName));
producer.send(tag, msgSize); producer.send(tag, msgSize);
Assert.assertEquals("Not all are sent", msgSize, producer.getAllUndupMsgBody().size()); Assert.assertEquals("Not all are sent", msgSize, producer.getAllUndupMsgBody().size());
...@@ -70,7 +70,6 @@ public class TagMessageWithSameGroupConsumerIT extends BaseConf { ...@@ -70,7 +70,6 @@ public class TagMessageWithSameGroupConsumerIT extends BaseConf {
@Test @Test
public void testConsumerStartWithInterval() { public void testConsumerStartWithInterval() {
String tag = "jueyin";
int msgSize = 100; int msgSize = 100;
String originMsgDCName = RandomUtils.getStringByUUID(); String originMsgDCName = RandomUtils.getStringByUUID();
String msgBodyDCName = RandomUtils.getStringByUUID(); String msgBodyDCName = RandomUtils.getStringByUUID();
...@@ -79,7 +78,7 @@ public class TagMessageWithSameGroupConsumerIT extends BaseConf { ...@@ -79,7 +78,7 @@ public class TagMessageWithSameGroupConsumerIT extends BaseConf {
new RMQNormalListner(originMsgDCName, msgBodyDCName)); new RMQNormalListner(originMsgDCName, msgBodyDCName));
producer.send(tag, msgSize, 100); producer.send(tag, msgSize, 100);
TestUtils.waitForMoment(5); TestUtils.waitForMoment(5);
RMQNormalConsumer consumer2 = getConsumer(nsAddr, consumer1.getConsumerGroup(), tag, getConsumer(nsAddr, consumer1.getConsumerGroup(), tag,
new RMQNormalListner(originMsgDCName, msgBodyDCName)); new RMQNormalListner(originMsgDCName, msgBodyDCName));
TestUtils.waitForMoment(5); TestUtils.waitForMoment(5);
...@@ -90,8 +89,7 @@ public class TagMessageWithSameGroupConsumerIT extends BaseConf { ...@@ -90,8 +89,7 @@ public class TagMessageWithSameGroupConsumerIT extends BaseConf {
} }
@Test @Test
public void testConsumerStartTwoAndCrashOnsAfterWhile() { public void testConsumerStartTwoAndCrashOneAfterWhile() {
String tag = "jueyin";
int msgSize = 100; int msgSize = 100;
String originMsgDCName = RandomUtils.getStringByUUID(); String originMsgDCName = RandomUtils.getStringByUUID();
String msgBodyDCName = RandomUtils.getStringByUUID(); String msgBodyDCName = RandomUtils.getStringByUUID();
......
...@@ -33,7 +33,6 @@ import static com.google.common.truth.Truth.assertThat; ...@@ -33,7 +33,6 @@ import static com.google.common.truth.Truth.assertThat;
public class AsyncSendWithMessageQueueIT extends BaseConf { public class AsyncSendWithMessageQueueIT extends BaseConf {
private static Logger logger = Logger.getLogger(TagMessageWith1ConsumerIT.class); private static Logger logger = Logger.getLogger(TagMessageWith1ConsumerIT.class);
private static boolean sendFail = false;
private RMQAsyncSendProducer producer = null; private RMQAsyncSendProducer producer = null;
private String topic = null; private String topic = null;
...@@ -57,7 +56,7 @@ public class AsyncSendWithMessageQueueIT extends BaseConf { ...@@ -57,7 +56,7 @@ public class AsyncSendWithMessageQueueIT extends BaseConf {
MessageQueue mq = new MessageQueue(topic, broker1Name, queueId); MessageQueue mq = new MessageQueue(topic, broker1Name, queueId);
producer.asyncSend(msgSize, mq); producer.asyncSend(msgSize, mq);
producer.waitForResponse(5 * 1000); producer.waitForResponse(10 * 1000);
assertThat(producer.getSuccessMsgCount()).isEqualTo(msgSize); assertThat(producer.getSuccessMsgCount()).isEqualTo(msgSize);
consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
...@@ -72,7 +71,7 @@ public class AsyncSendWithMessageQueueIT extends BaseConf { ...@@ -72,7 +71,7 @@ public class AsyncSendWithMessageQueueIT extends BaseConf {
mq = new MessageQueue(topic, broker2Name, queueId); mq = new MessageQueue(topic, broker2Name, queueId);
producer.asyncSend(msgSize, mq); producer.asyncSend(msgSize, mq);
producer.waitForResponse(5 * 1000); producer.waitForResponse(10 * 1000);
assertThat(producer.getSuccessMsgCount()).isEqualTo(msgSize); assertThat(producer.getSuccessMsgCount()).isEqualTo(msgSize);
consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
......
...@@ -36,7 +36,6 @@ import static com.google.common.truth.Truth.assertThat; ...@@ -36,7 +36,6 @@ import static com.google.common.truth.Truth.assertThat;
public class AsyncSendWithMessageQueueSelectorIT extends BaseConf { public class AsyncSendWithMessageQueueSelectorIT extends BaseConf {
private static Logger logger = Logger.getLogger(TagMessageWith1ConsumerIT.class); private static Logger logger = Logger.getLogger(TagMessageWith1ConsumerIT.class);
private static boolean sendFail = false;
private RMQAsyncSendProducer producer = null; private RMQAsyncSendProducer producer = null;
private String topic = null; private String topic = null;
......
...@@ -24,7 +24,6 @@ import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer; ...@@ -24,7 +24,6 @@ import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.factory.MQMessageFactory; import org.apache.rocketmq.test.factory.MQMessageFactory;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQDelayListner; import org.apache.rocketmq.test.listener.rmq.concurrent.RMQDelayListner;
import org.apache.rocketmq.test.listener.rmq.order.RMQOrderListener;
import org.apache.rocketmq.test.util.VerifyUtils; import org.apache.rocketmq.test.util.VerifyUtils;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
...@@ -43,7 +42,7 @@ public class NormalMsgDelayIT extends DelayConf { ...@@ -43,7 +42,7 @@ public class NormalMsgDelayIT extends DelayConf {
topic = initTopic(); topic = initTopic();
logger.info(String.format("use topic: %s;", topic)); logger.info(String.format("use topic: %s;", topic));
producer = getProducer(nsAddr, topic); producer = getProducer(nsAddr, topic);
consumer = getConsumer(nsAddr, topic, "*", new RMQOrderListener()); consumer = getConsumer(nsAddr, topic, "*", new RMQDelayListner());
} }
@After @After
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册