From 1e4307e7da13db9c4f28796436eb67efc052ccb6 Mon Sep 17 00:00:00 2001 From: vsair Date: Fri, 21 Apr 2017 18:21:35 +0800 Subject: [PATCH] [ROCKETMQ-179] Fix errors of IT test cases closes apache/incubator-rocketmq#94 --- pom.xml | 4 -- .../test/clientinterface/MQCollector.java | 20 +++++++--- .../rmq/concurrent/RMQNormalListner.java | 4 +- .../apache/rocketmq/test/util/MQAdmin.java | 1 - .../test/base/IntegrationTestBase.java | 39 +++++++++++++------ .../TagMessageWithSameGroupConsumerIT.java | 10 ++--- .../async/AsyncSendWithMessageQueueIT.java | 5 +-- .../AsyncSendWithMessageQueueSelectorIT.java | 1 - .../rocketmq/test/delay/NormalMsgDelayIT.java | 3 +- 9 files changed, 53 insertions(+), 34 deletions(-) diff --git a/pom.xml b/pom.xml index feb8b148..6fd59ac5 100644 --- a/pom.xml +++ b/pom.xml @@ -461,10 +461,6 @@ @{failsafeArgLine} **/NormalMsgDelayIT.java - **/BroadCastNormalMsgNotRecvIT.java - **/TagMessageWithSameGroupConsumerIT.java - **/AsyncSendWithMessageQueueSelectorIT.java - **/AsyncSendWithMessageQueueIT.java diff --git a/test/src/main/java/org/apache/rocketmq/test/clientinterface/MQCollector.java b/test/src/main/java/org/apache/rocketmq/test/clientinterface/MQCollector.java index 42d4b629..7ccf92a7 100644 --- a/test/src/main/java/org/apache/rocketmq/test/clientinterface/MQCollector.java +++ b/test/src/main/java/org/apache/rocketmq/test/clientinterface/MQCollector.java @@ -91,11 +91,21 @@ public abstract class MQCollector { } public void clearMsg() { - msgBodys.resetData(); - originMsgs.resetData(); - errorMsgs.resetData(); - originMsgIndex.clear(); - msgRTs.resetData(); + if (msgBodys != null) { + msgBodys.resetData(); + } + if (originMsgs != null) { + originMsgs.resetData(); + } + if (originMsgs != null) { + errorMsgs.resetData(); + } + if (originMsgIndex != null) { + originMsgIndex.clear(); + } + if (msgRTs != null) { + msgRTs.resetData(); + } } public void lockCollectors() { diff --git a/test/src/main/java/org/apache/rocketmq/test/listener/rmq/concurrent/RMQNormalListner.java b/test/src/main/java/org/apache/rocketmq/test/listener/rmq/concurrent/RMQNormalListner.java index 0d408810..471fb481 100644 --- a/test/src/main/java/org/apache/rocketmq/test/listener/rmq/concurrent/RMQNormalListner.java +++ b/test/src/main/java/org/apache/rocketmq/test/listener/rmq/concurrent/RMQNormalListner.java @@ -63,7 +63,9 @@ public class RMQNormalListner extends AbstractListener implements MessageListene msgBodys.addData(new String(msg.getBody())); originMsgs.addData(msg); - originMsgIndex.put(new String(msg.getBody()), msg); + if (originMsgIndex != null) { + originMsgIndex.put(new String(msg.getBody()), msg); + } } return consumeStatus; } diff --git a/test/src/main/java/org/apache/rocketmq/test/util/MQAdmin.java b/test/src/main/java/org/apache/rocketmq/test/util/MQAdmin.java index 680780ae..bd151d05 100644 --- a/test/src/main/java/org/apache/rocketmq/test/util/MQAdmin.java +++ b/test/src/main/java/org/apache/rocketmq/test/util/MQAdmin.java @@ -45,7 +45,6 @@ public class MQAdmin { mqAdminExt.start(); mqAdminExt.createTopic(clusterName, topic, queueNum); } catch (Exception e) { - e.printStackTrace(); } long startTime = System.currentTimeMillis(); diff --git a/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java b/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java index 5329991a..9805eba7 100644 --- a/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java +++ b/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java @@ -46,6 +46,8 @@ public class IntegrationTestBase { protected static final List BROKER_CONTROLLERS = new ArrayList<>(); protected static final List NAMESRV_CONTROLLERS = new ArrayList<>(); protected static int topicCreateTime = 30 * 1000; + protected static final int COMMIT_LOG_SIZE = 1024 * 1024 * 256; + protected static final int INDEX_NUM = 1000; protected static Random random = new Random(); @@ -53,18 +55,30 @@ public class IntegrationTestBase { Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { - for (NamesrvController namesrvController : NAMESRV_CONTROLLERS) { - if (namesrvController != null) { - namesrvController.shutdown(); + try { + for (BrokerController brokerController : BROKER_CONTROLLERS) { + if (brokerController != null) { + brokerController.shutdown(); + } } - } - for (BrokerController brokerController : BROKER_CONTROLLERS) { - if (brokerController != null) { - brokerController.shutdown(); + + // should destroy message store, otherwise could not delete the temp files. + for (BrokerController brokerController : BROKER_CONTROLLERS) { + if (brokerController != null) { + brokerController.getMessageStore().destroy(); + } } - } - for (File file : TMPE_FILES) { - deleteFile(file); + + for (NamesrvController namesrvController : NAMESRV_CONTROLLERS) { + if (namesrvController != null) { + namesrvController.shutdown(); + } + } + for (File file : TMPE_FILES) { + deleteFile(file); + } + } catch (Exception e){ + logger.error("Shutdown error", e); } } }); @@ -75,7 +89,7 @@ public class IntegrationTestBase { String baseDir = System.getProperty("user.home") + SEP + "unitteststore-" + UUID.randomUUID(); final File file = new File(baseDir); if (file.exists()) { - logger.info(String.format("[%s] has already existed, please bake up and remove it for integration tests", baseDir)); + logger.info(String.format("[%s] has already existed, please back up and remove it for integration tests", baseDir)); System.exit(1); } TMPE_FILES.add(file); @@ -116,6 +130,9 @@ public class IntegrationTestBase { storeConfig.setStorePathRootDir(baseDir); storeConfig.setStorePathCommitLog(baseDir + SEP + "commitlog"); storeConfig.setHaListenPort(8000 + random.nextInt(1000)); + storeConfig.setMapedFileSizeCommitLog(COMMIT_LOG_SIZE); + storeConfig.setMaxIndexNum(INDEX_NUM); + storeConfig.setMaxHashSlotNum(INDEX_NUM * 4); nettyServerConfig.setListenPort(10000 + random.nextInt(1000)); BrokerController brokerController = new BrokerController(brokerConfig, nettyServerConfig, nettyClientConfig, storeConfig); try { diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/tag/TagMessageWithSameGroupConsumerIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/tag/TagMessageWithSameGroupConsumerIT.java index 4cf8161e..135cbec8 100644 --- a/test/src/test/java/org/apache/rocketmq/test/client/consumer/tag/TagMessageWithSameGroupConsumerIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/tag/TagMessageWithSameGroupConsumerIT.java @@ -36,6 +36,7 @@ public class TagMessageWithSameGroupConsumerIT extends BaseConf { private static Logger logger = Logger.getLogger(TagMessageWith1ConsumerIT.class); private RMQNormalProducer producer = null; private String topic = null; + private String tag = "tag"; @Before public void setUp() { @@ -51,13 +52,12 @@ public class TagMessageWithSameGroupConsumerIT extends BaseConf { @Test public void testTwoConsumerWithSameGroup() { - String tag = "jueyin"; int msgSize = 20; String originMsgDCName = RandomUtils.getStringByUUID(); String msgBodyDCName = RandomUtils.getStringByUUID(); RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, tag, new RMQNormalListner(originMsgDCName, msgBodyDCName)); - RMQNormalConsumer consumer2 = getConsumer(nsAddr, consumer1.getConsumerGroup(), tag, + getConsumer(nsAddr, consumer1.getConsumerGroup(), tag, new RMQNormalListner(originMsgDCName, msgBodyDCName)); producer.send(tag, msgSize); Assert.assertEquals("Not all are sent", msgSize, producer.getAllUndupMsgBody().size()); @@ -70,7 +70,6 @@ public class TagMessageWithSameGroupConsumerIT extends BaseConf { @Test public void testConsumerStartWithInterval() { - String tag = "jueyin"; int msgSize = 100; String originMsgDCName = RandomUtils.getStringByUUID(); String msgBodyDCName = RandomUtils.getStringByUUID(); @@ -79,7 +78,7 @@ public class TagMessageWithSameGroupConsumerIT extends BaseConf { new RMQNormalListner(originMsgDCName, msgBodyDCName)); producer.send(tag, msgSize, 100); TestUtils.waitForMoment(5); - RMQNormalConsumer consumer2 = getConsumer(nsAddr, consumer1.getConsumerGroup(), tag, + getConsumer(nsAddr, consumer1.getConsumerGroup(), tag, new RMQNormalListner(originMsgDCName, msgBodyDCName)); TestUtils.waitForMoment(5); @@ -90,8 +89,7 @@ public class TagMessageWithSameGroupConsumerIT extends BaseConf { } @Test - public void testConsumerStartTwoAndCrashOnsAfterWhile() { - String tag = "jueyin"; + public void testConsumerStartTwoAndCrashOneAfterWhile() { int msgSize = 100; String originMsgDCName = RandomUtils.getStringByUUID(); String msgBodyDCName = RandomUtils.getStringByUUID(); diff --git a/test/src/test/java/org/apache/rocketmq/test/client/producer/async/AsyncSendWithMessageQueueIT.java b/test/src/test/java/org/apache/rocketmq/test/client/producer/async/AsyncSendWithMessageQueueIT.java index 53a992c3..24a75473 100644 --- a/test/src/test/java/org/apache/rocketmq/test/client/producer/async/AsyncSendWithMessageQueueIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/client/producer/async/AsyncSendWithMessageQueueIT.java @@ -33,7 +33,6 @@ import static com.google.common.truth.Truth.assertThat; public class AsyncSendWithMessageQueueIT extends BaseConf { private static Logger logger = Logger.getLogger(TagMessageWith1ConsumerIT.class); - private static boolean sendFail = false; private RMQAsyncSendProducer producer = null; private String topic = null; @@ -57,7 +56,7 @@ public class AsyncSendWithMessageQueueIT extends BaseConf { MessageQueue mq = new MessageQueue(topic, broker1Name, queueId); producer.asyncSend(msgSize, mq); - producer.waitForResponse(5 * 1000); + producer.waitForResponse(10 * 1000); assertThat(producer.getSuccessMsgCount()).isEqualTo(msgSize); consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); @@ -72,7 +71,7 @@ public class AsyncSendWithMessageQueueIT extends BaseConf { mq = new MessageQueue(topic, broker2Name, queueId); producer.asyncSend(msgSize, mq); - producer.waitForResponse(5 * 1000); + producer.waitForResponse(10 * 1000); assertThat(producer.getSuccessMsgCount()).isEqualTo(msgSize); consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); diff --git a/test/src/test/java/org/apache/rocketmq/test/client/producer/async/AsyncSendWithMessageQueueSelectorIT.java b/test/src/test/java/org/apache/rocketmq/test/client/producer/async/AsyncSendWithMessageQueueSelectorIT.java index 68c2b0e2..843441d3 100644 --- a/test/src/test/java/org/apache/rocketmq/test/client/producer/async/AsyncSendWithMessageQueueSelectorIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/client/producer/async/AsyncSendWithMessageQueueSelectorIT.java @@ -36,7 +36,6 @@ import static com.google.common.truth.Truth.assertThat; public class AsyncSendWithMessageQueueSelectorIT extends BaseConf { private static Logger logger = Logger.getLogger(TagMessageWith1ConsumerIT.class); - private static boolean sendFail = false; private RMQAsyncSendProducer producer = null; private String topic = null; diff --git a/test/src/test/java/org/apache/rocketmq/test/delay/NormalMsgDelayIT.java b/test/src/test/java/org/apache/rocketmq/test/delay/NormalMsgDelayIT.java index 5206dcb6..dc5f2302 100644 --- a/test/src/test/java/org/apache/rocketmq/test/delay/NormalMsgDelayIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/delay/NormalMsgDelayIT.java @@ -24,7 +24,6 @@ import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer; import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; import org.apache.rocketmq.test.factory.MQMessageFactory; import org.apache.rocketmq.test.listener.rmq.concurrent.RMQDelayListner; -import org.apache.rocketmq.test.listener.rmq.order.RMQOrderListener; import org.apache.rocketmq.test.util.VerifyUtils; import org.junit.After; import org.junit.Assert; @@ -43,7 +42,7 @@ public class NormalMsgDelayIT extends DelayConf { topic = initTopic(); logger.info(String.format("use topic: %s;", topic)); producer = getProducer(nsAddr, topic); - consumer = getConsumer(nsAddr, topic, "*", new RMQOrderListener()); + consumer = getConsumer(nsAddr, topic, "*", new RMQDelayListner()); } @After -- GitLab