diff --git a/pom.xml b/pom.xml index feb8b148457f349d1cc44217d9da25c8a3d8ae15..6fd59ac5bcf85d1978e4178ab3e2bdfbe0d309fd 100644 --- a/pom.xml +++ b/pom.xml @@ -461,10 +461,6 @@ @{failsafeArgLine} **/NormalMsgDelayIT.java - **/BroadCastNormalMsgNotRecvIT.java - **/TagMessageWithSameGroupConsumerIT.java - **/AsyncSendWithMessageQueueSelectorIT.java - **/AsyncSendWithMessageQueueIT.java diff --git a/test/src/main/java/org/apache/rocketmq/test/clientinterface/MQCollector.java b/test/src/main/java/org/apache/rocketmq/test/clientinterface/MQCollector.java index 42d4b629b4881a8f8de2f1818ea0c57f81cf9a24..7ccf92a7c3c4df561806725b1e2a27cf88f0a8c9 100644 --- a/test/src/main/java/org/apache/rocketmq/test/clientinterface/MQCollector.java +++ b/test/src/main/java/org/apache/rocketmq/test/clientinterface/MQCollector.java @@ -91,11 +91,21 @@ public abstract class MQCollector { } public void clearMsg() { - msgBodys.resetData(); - originMsgs.resetData(); - errorMsgs.resetData(); - originMsgIndex.clear(); - msgRTs.resetData(); + if (msgBodys != null) { + msgBodys.resetData(); + } + if (originMsgs != null) { + originMsgs.resetData(); + } + if (originMsgs != null) { + errorMsgs.resetData(); + } + if (originMsgIndex != null) { + originMsgIndex.clear(); + } + if (msgRTs != null) { + msgRTs.resetData(); + } } public void lockCollectors() { diff --git a/test/src/main/java/org/apache/rocketmq/test/listener/rmq/concurrent/RMQNormalListner.java b/test/src/main/java/org/apache/rocketmq/test/listener/rmq/concurrent/RMQNormalListner.java index 0d408810a1f57ef8433fa3aceb32be820d3499aa..471fb4817ef80ef31977edec67322d6a38347165 100644 --- a/test/src/main/java/org/apache/rocketmq/test/listener/rmq/concurrent/RMQNormalListner.java +++ b/test/src/main/java/org/apache/rocketmq/test/listener/rmq/concurrent/RMQNormalListner.java @@ -63,7 +63,9 @@ public class RMQNormalListner extends AbstractListener implements MessageListene msgBodys.addData(new String(msg.getBody())); originMsgs.addData(msg); - originMsgIndex.put(new String(msg.getBody()), msg); + if (originMsgIndex != null) { + originMsgIndex.put(new String(msg.getBody()), msg); + } } return consumeStatus; } diff --git a/test/src/main/java/org/apache/rocketmq/test/util/MQAdmin.java b/test/src/main/java/org/apache/rocketmq/test/util/MQAdmin.java index 680780aeb5bc66bfa3d11374995289af57bf247d..bd151d0561ef6e37732857ce75aca599184a996e 100644 --- a/test/src/main/java/org/apache/rocketmq/test/util/MQAdmin.java +++ b/test/src/main/java/org/apache/rocketmq/test/util/MQAdmin.java @@ -45,7 +45,6 @@ public class MQAdmin { mqAdminExt.start(); mqAdminExt.createTopic(clusterName, topic, queueNum); } catch (Exception e) { - e.printStackTrace(); } long startTime = System.currentTimeMillis(); diff --git a/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java b/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java index 5329991a3e3596dc3ddaee2f7f6375c9077f8ce8..9805eba7c42ce0e39110729ce3cba186c3970602 100644 --- a/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java +++ b/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java @@ -46,6 +46,8 @@ public class IntegrationTestBase { protected static final List BROKER_CONTROLLERS = new ArrayList<>(); protected static final List NAMESRV_CONTROLLERS = new ArrayList<>(); protected static int topicCreateTime = 30 * 1000; + protected static final int COMMIT_LOG_SIZE = 1024 * 1024 * 256; + protected static final int INDEX_NUM = 1000; protected static Random random = new Random(); @@ -53,18 +55,30 @@ public class IntegrationTestBase { Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { - for (NamesrvController namesrvController : NAMESRV_CONTROLLERS) { - if (namesrvController != null) { - namesrvController.shutdown(); + try { + for (BrokerController brokerController : BROKER_CONTROLLERS) { + if (brokerController != null) { + brokerController.shutdown(); + } } - } - for (BrokerController brokerController : BROKER_CONTROLLERS) { - if (brokerController != null) { - brokerController.shutdown(); + + // should destroy message store, otherwise could not delete the temp files. + for (BrokerController brokerController : BROKER_CONTROLLERS) { + if (brokerController != null) { + brokerController.getMessageStore().destroy(); + } } - } - for (File file : TMPE_FILES) { - deleteFile(file); + + for (NamesrvController namesrvController : NAMESRV_CONTROLLERS) { + if (namesrvController != null) { + namesrvController.shutdown(); + } + } + for (File file : TMPE_FILES) { + deleteFile(file); + } + } catch (Exception e){ + logger.error("Shutdown error", e); } } }); @@ -75,7 +89,7 @@ public class IntegrationTestBase { String baseDir = System.getProperty("user.home") + SEP + "unitteststore-" + UUID.randomUUID(); final File file = new File(baseDir); if (file.exists()) { - logger.info(String.format("[%s] has already existed, please bake up and remove it for integration tests", baseDir)); + logger.info(String.format("[%s] has already existed, please back up and remove it for integration tests", baseDir)); System.exit(1); } TMPE_FILES.add(file); @@ -116,6 +130,9 @@ public class IntegrationTestBase { storeConfig.setStorePathRootDir(baseDir); storeConfig.setStorePathCommitLog(baseDir + SEP + "commitlog"); storeConfig.setHaListenPort(8000 + random.nextInt(1000)); + storeConfig.setMapedFileSizeCommitLog(COMMIT_LOG_SIZE); + storeConfig.setMaxIndexNum(INDEX_NUM); + storeConfig.setMaxHashSlotNum(INDEX_NUM * 4); nettyServerConfig.setListenPort(10000 + random.nextInt(1000)); BrokerController brokerController = new BrokerController(brokerConfig, nettyServerConfig, nettyClientConfig, storeConfig); try { diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/tag/TagMessageWithSameGroupConsumerIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/tag/TagMessageWithSameGroupConsumerIT.java index 4cf8161e39c4a0c7c8a659651882448c3d9425f7..135cbec8bd755b115d2c646b64c7da452d25bf82 100644 --- a/test/src/test/java/org/apache/rocketmq/test/client/consumer/tag/TagMessageWithSameGroupConsumerIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/tag/TagMessageWithSameGroupConsumerIT.java @@ -36,6 +36,7 @@ public class TagMessageWithSameGroupConsumerIT extends BaseConf { private static Logger logger = Logger.getLogger(TagMessageWith1ConsumerIT.class); private RMQNormalProducer producer = null; private String topic = null; + private String tag = "tag"; @Before public void setUp() { @@ -51,13 +52,12 @@ public class TagMessageWithSameGroupConsumerIT extends BaseConf { @Test public void testTwoConsumerWithSameGroup() { - String tag = "jueyin"; int msgSize = 20; String originMsgDCName = RandomUtils.getStringByUUID(); String msgBodyDCName = RandomUtils.getStringByUUID(); RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, tag, new RMQNormalListner(originMsgDCName, msgBodyDCName)); - RMQNormalConsumer consumer2 = getConsumer(nsAddr, consumer1.getConsumerGroup(), tag, + getConsumer(nsAddr, consumer1.getConsumerGroup(), tag, new RMQNormalListner(originMsgDCName, msgBodyDCName)); producer.send(tag, msgSize); Assert.assertEquals("Not all are sent", msgSize, producer.getAllUndupMsgBody().size()); @@ -70,7 +70,6 @@ public class TagMessageWithSameGroupConsumerIT extends BaseConf { @Test public void testConsumerStartWithInterval() { - String tag = "jueyin"; int msgSize = 100; String originMsgDCName = RandomUtils.getStringByUUID(); String msgBodyDCName = RandomUtils.getStringByUUID(); @@ -79,7 +78,7 @@ public class TagMessageWithSameGroupConsumerIT extends BaseConf { new RMQNormalListner(originMsgDCName, msgBodyDCName)); producer.send(tag, msgSize, 100); TestUtils.waitForMoment(5); - RMQNormalConsumer consumer2 = getConsumer(nsAddr, consumer1.getConsumerGroup(), tag, + getConsumer(nsAddr, consumer1.getConsumerGroup(), tag, new RMQNormalListner(originMsgDCName, msgBodyDCName)); TestUtils.waitForMoment(5); @@ -90,8 +89,7 @@ public class TagMessageWithSameGroupConsumerIT extends BaseConf { } @Test - public void testConsumerStartTwoAndCrashOnsAfterWhile() { - String tag = "jueyin"; + public void testConsumerStartTwoAndCrashOneAfterWhile() { int msgSize = 100; String originMsgDCName = RandomUtils.getStringByUUID(); String msgBodyDCName = RandomUtils.getStringByUUID(); diff --git a/test/src/test/java/org/apache/rocketmq/test/client/producer/async/AsyncSendWithMessageQueueIT.java b/test/src/test/java/org/apache/rocketmq/test/client/producer/async/AsyncSendWithMessageQueueIT.java index 53a992c323ecef92dbd855546243e57f1ac696d5..24a75473ad9454272397d1182a869bc3b1611db3 100644 --- a/test/src/test/java/org/apache/rocketmq/test/client/producer/async/AsyncSendWithMessageQueueIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/client/producer/async/AsyncSendWithMessageQueueIT.java @@ -33,7 +33,6 @@ import static com.google.common.truth.Truth.assertThat; public class AsyncSendWithMessageQueueIT extends BaseConf { private static Logger logger = Logger.getLogger(TagMessageWith1ConsumerIT.class); - private static boolean sendFail = false; private RMQAsyncSendProducer producer = null; private String topic = null; @@ -57,7 +56,7 @@ public class AsyncSendWithMessageQueueIT extends BaseConf { MessageQueue mq = new MessageQueue(topic, broker1Name, queueId); producer.asyncSend(msgSize, mq); - producer.waitForResponse(5 * 1000); + producer.waitForResponse(10 * 1000); assertThat(producer.getSuccessMsgCount()).isEqualTo(msgSize); consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); @@ -72,7 +71,7 @@ public class AsyncSendWithMessageQueueIT extends BaseConf { mq = new MessageQueue(topic, broker2Name, queueId); producer.asyncSend(msgSize, mq); - producer.waitForResponse(5 * 1000); + producer.waitForResponse(10 * 1000); assertThat(producer.getSuccessMsgCount()).isEqualTo(msgSize); consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); diff --git a/test/src/test/java/org/apache/rocketmq/test/client/producer/async/AsyncSendWithMessageQueueSelectorIT.java b/test/src/test/java/org/apache/rocketmq/test/client/producer/async/AsyncSendWithMessageQueueSelectorIT.java index 68c2b0e2dc08661bf2206b7fee63facebc7da913..843441d3f34a72b34e77bbbe8d7810db64ddcf66 100644 --- a/test/src/test/java/org/apache/rocketmq/test/client/producer/async/AsyncSendWithMessageQueueSelectorIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/client/producer/async/AsyncSendWithMessageQueueSelectorIT.java @@ -36,7 +36,6 @@ import static com.google.common.truth.Truth.assertThat; public class AsyncSendWithMessageQueueSelectorIT extends BaseConf { private static Logger logger = Logger.getLogger(TagMessageWith1ConsumerIT.class); - private static boolean sendFail = false; private RMQAsyncSendProducer producer = null; private String topic = null; diff --git a/test/src/test/java/org/apache/rocketmq/test/delay/NormalMsgDelayIT.java b/test/src/test/java/org/apache/rocketmq/test/delay/NormalMsgDelayIT.java index 5206dcb6e18319d7e93254f46ef22a8f05163181..dc5f230253ab3e551421033b42108e1d3c7adc0f 100644 --- a/test/src/test/java/org/apache/rocketmq/test/delay/NormalMsgDelayIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/delay/NormalMsgDelayIT.java @@ -24,7 +24,6 @@ import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer; import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; import org.apache.rocketmq.test.factory.MQMessageFactory; import org.apache.rocketmq.test.listener.rmq.concurrent.RMQDelayListner; -import org.apache.rocketmq.test.listener.rmq.order.RMQOrderListener; import org.apache.rocketmq.test.util.VerifyUtils; import org.junit.After; import org.junit.Assert; @@ -43,7 +42,7 @@ public class NormalMsgDelayIT extends DelayConf { topic = initTopic(); logger.info(String.format("use topic: %s;", topic)); producer = getProducer(nsAddr, topic); - consumer = getConsumer(nsAddr, topic, "*", new RMQOrderListener()); + consumer = getConsumer(nsAddr, topic, "*", new RMQDelayListner()); } @After