From 881aef5d3195b918c87043ea75a19a3fb805b2e0 Mon Sep 17 00:00:00 2001 From: yukon Date: Sun, 22 Jan 2017 16:23:54 +0800 Subject: [PATCH] [ROCKETMQ-56] Polish unit tests for rocketmq-store --- .../store/DefaultMessageStoreTest.java | 65 ++----- .../rocketmq/store/MappedFileQueueTest.java | 162 +++++------------- .../apache/rocketmq/store/MappedFileTest.java | 56 ++---- .../rocketmq/store/StoreCheckpointTest.java | 21 +-- .../rocketmq/store/index/IndexFileTest.java | 26 ++- .../store/schedule/ScheduleMessageTest.java | 133 -------------- 6 files changed, 91 insertions(+), 372 deletions(-) delete mode 100644 store/src/test/java/org/apache/rocketmq/store/schedule/ScheduleMessageTest.java diff --git a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java index 477ac10b..5c9c46f7 100644 --- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java @@ -24,42 +24,28 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.store.config.FlushDiskType; import org.apache.rocketmq.store.config.MessageStoreConfig; -import org.junit.AfterClass; -import org.junit.BeforeClass; +import org.junit.Before; import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertTrue; public class DefaultMessageStoreTest { - private static final Logger logger = LoggerFactory.getLogger(DefaultMessageStoreTest.class); - - private static final String StoreMessage = "Once, there was a chance for me!"; - - private static int QUEUE_TOTAL = 100; - - private static AtomicInteger QueueId = new AtomicInteger(0); - - private static SocketAddress BornHost; - - private static SocketAddress StoreHost; - - private static byte[] MessageBody; - - @BeforeClass - public static void setUpBeforeClass() throws Exception { + private final String StoreMessage = "Once, there was a chance for me!"; + private int QUEUE_TOTAL = 100; + private AtomicInteger QueueId = new AtomicInteger(0); + private SocketAddress BornHost; + private SocketAddress StoreHost; + private byte[] MessageBody; + + @Before + public void init() throws Exception { StoreHost = new InetSocketAddress(InetAddress.getLocalHost(), 8123); BornHost = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 0); } - @AfterClass - public static void tearDownAfterClass() throws Exception { - } - @Test - public void test_write_read() throws Exception { - logger.debug("================================================================"); + public void testWriteAndRead() throws Exception { long totalMsgs = 100; QUEUE_TOTAL = 1; MessageBody = StoreMessage.getBytes(); @@ -77,29 +63,23 @@ public class DefaultMessageStoreTest { master.start(); try { for (long i = 0; i < totalMsgs; i++) { - PutMessageResult result = master.putMessage(buildMessage()); - logger.debug(i + "\t" + result.getAppendMessageResult().getMsgId()); + master.putMessage(buildMessage()); } for (long i = 0; i < totalMsgs; i++) { GetMessageResult result = master.getMessage("GROUP_A", "TOPIC_A", 0, i, 1024 * 1024, null); - if (result == null) { - logger.debug("result == null " + i); - } - assertTrue(result != null); + assertThat(result).isNotNull(); result.release(); - logger.debug("read " + i + " OK"); } } finally { master.shutdown(); master.destroy(); } - logger.debug("================================================================"); } public MessageExtBrokerInner buildMessage() { MessageExtBrokerInner msg = new MessageExtBrokerInner(); - msg.setTopic("AAA"); + msg.setTopic("FooBar"); msg.setTags("TAG1"); msg.setKeys("Hello"); msg.setBody(MessageBody); @@ -113,8 +93,7 @@ public class DefaultMessageStoreTest { } @Test - public void test_group_commit() throws Exception { - logger.debug("================================================================"); + public void testGroupCommit() throws Exception { long totalMsgs = 100; QUEUE_TOTAL = 1; MessageBody = StoreMessage.getBytes(); @@ -128,32 +107,24 @@ public class DefaultMessageStoreTest { master.start(); try { for (long i = 0; i < totalMsgs; i++) { - PutMessageResult result = master.putMessage(buildMessage()); - logger.debug(i + "\t" + result.getAppendMessageResult().getMsgId()); + master.putMessage(buildMessage()); } for (long i = 0; i < totalMsgs; i++) { GetMessageResult result = master.getMessage("GROUP_A", "TOPIC_A", 0, i, 1024 * 1024, null); - if (result == null) { - logger.debug("result == null " + i); - } - assertTrue(result != null); + assertThat(result).isNotNull(); result.release(); - logger.debug("read " + i + " OK"); } } finally { master.shutdown(); master.destroy(); } - logger.debug("================================================================"); } private class MyMessageArrivingListener implements MessageArrivingListener { - @Override public void arriving(String topic, int queueId, long logicOffset, long tagsCode) { - // Do nothing here } } } diff --git a/store/src/test/java/org/apache/rocketmq/store/MappedFileQueueTest.java b/store/src/test/java/org/apache/rocketmq/store/MappedFileQueueTest.java index 2d6c112a..f1f9c1fb 100644 --- a/store/src/test/java/org/apache/rocketmq/store/MappedFileQueueTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/MappedFileQueueTest.java @@ -15,70 +15,30 @@ * limitations under the License. */ -/** - * $Id: MappedFileQueueTest.java 1831 2013-05-16 01:39:51Z vintagewang@apache.org $ - */ package org.apache.rocketmq.store; import java.nio.ByteBuffer; import java.util.Arrays; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; +import static org.assertj.core.api.Assertions.assertThat; public class MappedFileQueueTest { - private static final Logger logger = LoggerFactory.getLogger(MappedFileQueueTest.class); - - // private static final String StoreMessage = - // "Once, there was a chance for me! but I did not treasure it. if"; - - @BeforeClass - public static void setUpBeforeClass() throws Exception { - } - - @AfterClass - public static void tearDownAfterClass() throws Exception { - } - - @Before - public void setUp() throws Exception { - } - - @After - public void tearDown() throws Exception { - } - @Test - public void test_getLastMappedFile() { + public void testGetLastMappedFile() { final String fixedMsg = "0123456789abcdef"; - logger.debug("================================================================"); MappedFileQueue mappedFileQueue = new MappedFileQueue("target/unit_test_store/a/", 1024, null); for (int i = 0; i < 1024; i++) { MappedFile mappedFile = mappedFileQueue.getLastMappedFile(0); - assertTrue(mappedFile != null); - - boolean result = mappedFile.appendMessage(fixedMsg.getBytes()); - if (!result) { - logger.debug("appendMessage " + i); - } - assertTrue(result); + assertThat(mappedFile).isNotNull(); + assertThat(mappedFile.appendMessage(fixedMsg.getBytes())).isTrue(); } mappedFileQueue.shutdown(1000); mappedFileQueue.destroy(); - logger.debug("MappedFileQueue.getLastMappedFile() OK"); } @Test @@ -86,162 +46,134 @@ public class MappedFileQueueTest { // four-byte string. final String fixedMsg = "abcd"; - logger.debug("================================================================"); MappedFileQueue mappedFileQueue = new MappedFileQueue("target/unit_test_store/b/", 1024, null); for (int i = 0; i < 1024; i++) { MappedFile mappedFile = mappedFileQueue.getLastMappedFile(0); - assertTrue(mappedFile != null); - - boolean result = mappedFile.appendMessage(fixedMsg.getBytes()); - assertTrue(result); + assertThat(mappedFile).isNotNull(); + assertThat(mappedFile.appendMessage(fixedMsg.getBytes())).isTrue(); } - assertEquals(fixedMsg.getBytes().length * 1024, mappedFileQueue.getMappedMemorySize()); + assertThat(mappedFileQueue.getMappedMemorySize()).isEqualTo(fixedMsg.getBytes().length * 1024); MappedFile mappedFile = mappedFileQueue.findMappedFileByOffset(0); - assertTrue(mappedFile != null); - assertEquals(mappedFile.getFileFromOffset(), 0); + assertThat(mappedFile).isNotNull(); + assertThat(mappedFile.getFileFromOffset()).isEqualTo(0); mappedFile = mappedFileQueue.findMappedFileByOffset(100); - assertTrue(mappedFile != null); - assertEquals(mappedFile.getFileFromOffset(), 0); + assertThat(mappedFile).isNotNull(); + assertThat(mappedFile.getFileFromOffset()).isEqualTo(0); mappedFile = mappedFileQueue.findMappedFileByOffset(1024); - assertTrue(mappedFile != null); - assertEquals(mappedFile.getFileFromOffset(), 1024); + assertThat(mappedFile).isNotNull(); + assertThat(mappedFile.getFileFromOffset()).isEqualTo(1024); mappedFile = mappedFileQueue.findMappedFileByOffset(1024 + 100); - assertTrue(mappedFile != null); - assertEquals(mappedFile.getFileFromOffset(), 1024); + assertThat(mappedFile).isNotNull(); + assertThat(mappedFile.getFileFromOffset()).isEqualTo(1024); mappedFile = mappedFileQueue.findMappedFileByOffset(1024 * 2); - assertTrue(mappedFile != null); - assertEquals(mappedFile.getFileFromOffset(), 1024 * 2); + assertThat(mappedFile).isNotNull(); + assertThat(mappedFile.getFileFromOffset()).isEqualTo(1024 * 2); mappedFile = mappedFileQueue.findMappedFileByOffset(1024 * 2 + 100); - assertTrue(mappedFile != null); - assertEquals(mappedFile.getFileFromOffset(), 1024 * 2); + assertThat(mappedFile).isNotNull(); + assertThat(mappedFile.getFileFromOffset()).isEqualTo(1024 * 2); // over mapped memory size. mappedFile = mappedFileQueue.findMappedFileByOffset(1024 * 4); - assertTrue(mappedFile == null); + assertThat(mappedFile).isNull(); mappedFile = mappedFileQueue.findMappedFileByOffset(1024 * 4 + 100); - assertTrue(mappedFile == null); + assertThat(mappedFile).isNull(); mappedFileQueue.shutdown(1000); mappedFileQueue.destroy(); - logger.debug("MappedFileQueue.findMappedFileByOffset() OK"); } @Test - public void test_commit() { + public void testAppendMessage() { final String fixedMsg = "0123456789abcdef"; - logger.debug("================================================================"); MappedFileQueue mappedFileQueue = new MappedFileQueue("target/unit_test_store/c/", 1024, null); for (int i = 0; i < 1024; i++) { MappedFile mappedFile = mappedFileQueue.getLastMappedFile(0); - assertTrue(mappedFile != null); - - boolean result = mappedFile.appendMessage(fixedMsg.getBytes()); - assertTrue(result); + assertThat(mappedFile).isNotNull(); + assertThat(mappedFile.appendMessage(fixedMsg.getBytes())).isTrue(); } - boolean result = mappedFileQueue.flush(0); - assertFalse(result); - assertEquals(1024 * 1, mappedFileQueue.getFlushedWhere()); + assertThat(mappedFileQueue.flush(0)).isFalse(); + assertThat(mappedFileQueue.getFlushedWhere()).isEqualTo(1024); - result = mappedFileQueue.flush(0); - assertFalse(result); - assertEquals(1024 * 2, mappedFileQueue.getFlushedWhere()); + assertThat(mappedFileQueue.flush(0)).isFalse(); + assertThat(mappedFileQueue.getFlushedWhere()).isEqualTo(1024 * 2); - result = mappedFileQueue.flush(0); - assertFalse(result); - assertEquals(1024 * 3, mappedFileQueue.getFlushedWhere()); + assertThat(mappedFileQueue.flush(0)).isFalse(); + assertThat(mappedFileQueue.getFlushedWhere()).isEqualTo(1024 * 3); - result = mappedFileQueue.flush(0); - assertFalse(result); - assertEquals(1024 * 4, mappedFileQueue.getFlushedWhere()); + assertThat(mappedFileQueue.flush(0)).isFalse(); + assertThat(mappedFileQueue.getFlushedWhere()).isEqualTo(1024 * 4); - result = mappedFileQueue.flush(0); - assertFalse(result); - assertEquals(1024 * 5, mappedFileQueue.getFlushedWhere()); + assertThat(mappedFileQueue.flush(0)).isFalse(); + assertThat(mappedFileQueue.getFlushedWhere()).isEqualTo(1024 * 5); - result = mappedFileQueue.flush(0); - assertFalse(result); - assertEquals(1024 * 6, mappedFileQueue.getFlushedWhere()); + assertThat(mappedFileQueue.flush(0)).isFalse(); + assertThat(mappedFileQueue.getFlushedWhere()).isEqualTo(1024 * 6); mappedFileQueue.shutdown(1000); mappedFileQueue.destroy(); - logger.debug("MappedFileQueue.flush() OK"); } @Test - public void test_getMappedMemorySize() { + public void testGetMappedMemorySize() { final String fixedMsg = "abcd"; - logger.debug("================================================================"); MappedFileQueue mappedFileQueue = new MappedFileQueue("target/unit_test_store/d/", 1024, null); for (int i = 0; i < 1024; i++) { MappedFile mappedFile = mappedFileQueue.getLastMappedFile(0); - assertTrue(mappedFile != null); - - boolean result = mappedFile.appendMessage(fixedMsg.getBytes()); - assertTrue(result); + assertThat(mappedFile).isNotNull(); + assertThat(mappedFile.appendMessage(fixedMsg.getBytes())).isTrue(); } - assertEquals(fixedMsg.length() * 1024, mappedFileQueue.getMappedMemorySize()); - + assertThat(mappedFileQueue.getMappedMemorySize()).isEqualTo(fixedMsg.length() * 1024); mappedFileQueue.shutdown(1000); mappedFileQueue.destroy(); - logger.debug("MappedFileQueue.getMappedMemorySize() OK"); } - @Test - public void test_deleteExpiredFileByOffset() { - - logger.debug("================================================================"); + public void testDeleteExpiredFileByOffset() { MappedFileQueue mappedFileQueue = new MappedFileQueue("target/unit_test_store/e", 5120, null); for (int i = 0; i < 2048; i++) { MappedFile mappedFile = mappedFileQueue.getLastMappedFile(0); - assertNotNull(mappedFile); - + assertThat(mappedFile).isNotNull(); ByteBuffer byteBuffer = ByteBuffer.allocate(ConsumeQueue.CQ_STORE_UNIT_SIZE); byteBuffer.putLong(i); byte[] padding = new byte[12]; - Arrays.fill(padding, (byte)'0'); + Arrays.fill(padding, (byte) '0'); byteBuffer.put(padding); byteBuffer.flip(); - boolean result = mappedFile.appendMessage(byteBuffer.array()); - - assertTrue(result); + assertThat(mappedFile.appendMessage(byteBuffer.array())).isTrue(); } MappedFile first = mappedFileQueue.getFirstMappedFile(); first.hold(); - int count = mappedFileQueue.deleteExpiredFileByOffset(20480, ConsumeQueue.CQ_STORE_UNIT_SIZE); - assertEquals(0, count); + assertThat(mappedFileQueue.deleteExpiredFileByOffset(20480, ConsumeQueue.CQ_STORE_UNIT_SIZE)).isEqualTo(0); first.release(); - count = mappedFileQueue.deleteExpiredFileByOffset(20480, ConsumeQueue.CQ_STORE_UNIT_SIZE); - assertTrue(count > 0); + assertThat(mappedFileQueue.deleteExpiredFileByOffset(20480, ConsumeQueue.CQ_STORE_UNIT_SIZE)).isGreaterThan(0); first = mappedFileQueue.getFirstMappedFile(); - assertTrue(first.getFileFromOffset() > 0); + assertThat(first.getFileFromOffset()).isGreaterThan(0); mappedFileQueue.shutdown(1000); mappedFileQueue.destroy(); - logger.debug("MappedFileQueue.deleteExpiredFileByOffset() OK"); } } diff --git a/store/src/test/java/org/apache/rocketmq/store/MappedFileTest.java b/store/src/test/java/org/apache/rocketmq/store/MappedFileTest.java index 1dd2043c..d2736acd 100644 --- a/store/src/test/java/org/apache/rocketmq/store/MappedFileTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/MappedFileTest.java @@ -21,66 +21,30 @@ package org.apache.rocketmq.store; import java.io.IOException; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Ignore; import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import static org.junit.Assert.assertTrue; +import static org.assertj.core.api.Assertions.assertThat; public class MappedFileTest { - - private static final Logger logger = LoggerFactory.getLogger(MappedFileTest.class); - - private static final String StoreMessage = "Once, there was a chance for me!"; - - @BeforeClass - public static void setUpBeforeClass() throws Exception { - - } - - @AfterClass - public static void tearDownAfterClass() throws Exception { - } + private final String storeMessage = "Once, there was a chance for me!"; @Test - public void test_write_read() throws IOException { + public void testSelectMappedBuffer() throws IOException { MappedFile mappedFile = new MappedFile("target/unit_test_store/MappedFileTest/000", 1024 * 64); - boolean result = mappedFile.appendMessage(StoreMessage.getBytes()); - assertTrue(result); - logger.debug("write OK"); + boolean result = mappedFile.appendMessage(storeMessage.getBytes()); + assertThat(result).isTrue(); SelectMappedBufferResult selectMappedBufferResult = mappedFile.selectMappedBuffer(0); - byte[] data = new byte[StoreMessage.length()]; + byte[] data = new byte[storeMessage.length()]; selectMappedBufferResult.getByteBuffer().get(data); String readString = new String(data); - logger.debug("Read: " + readString); - assertTrue(readString.equals(StoreMessage)); + assertThat(readString).isEqualTo(storeMessage); mappedFile.shutdown(1000); - assertTrue(!mappedFile.isAvailable()); - selectMappedBufferResult.release(); - assertTrue(mappedFile.isCleanupOver()); - assertTrue(mappedFile.destroy(1000)); - } - - @Ignore - public void test_jvm_crashed() throws IOException { - MappedFile mappedFile = new MappedFile("target/unit_test_store/MappedFileTest/10086", 1024 * 64); - boolean result = mappedFile.appendMessage(StoreMessage.getBytes()); - assertTrue(result); - logger.debug("write OK"); - - SelectMappedBufferResult selectMappedBufferResult = mappedFile.selectMappedBuffer(0); + assertThat(mappedFile.isAvailable()).isFalse(); selectMappedBufferResult.release(); - mappedFile.shutdown(1000); - - byte[] data = new byte[StoreMessage.length()]; - selectMappedBufferResult.getByteBuffer().get(data); - String readString = new String(data); - logger.debug(readString); + assertThat(mappedFile.isCleanupOver()).isTrue(); + assertThat(mappedFile.destroy(1000)).isTrue(); } } diff --git a/store/src/test/java/org/apache/rocketmq/store/StoreCheckpointTest.java b/store/src/test/java/org/apache/rocketmq/store/StoreCheckpointTest.java index e87da961..1af63d4f 100644 --- a/store/src/test/java/org/apache/rocketmq/store/StoreCheckpointTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/StoreCheckpointTest.java @@ -21,24 +21,13 @@ package org.apache.rocketmq.store; import java.io.IOException; -import org.junit.AfterClass; -import org.junit.BeforeClass; import org.junit.Test; -import static org.junit.Assert.assertTrue; +import static org.assertj.core.api.Assertions.assertThat; public class StoreCheckpointTest { - @BeforeClass - public static void setUpBeforeClass() throws Exception { - - } - - @AfterClass - public static void tearDownAfterClass() throws Exception { - } - @Test - public void test_write_read() throws IOException { + public void testWriteAndRead() throws IOException { StoreCheckpoint storeCheckpoint = new StoreCheckpoint("target/checkpoint_test/0000"); long physicMsgTimestamp = 0xAABB; long logicsMsgTimestamp = 0xCCDD; @@ -47,10 +36,10 @@ public class StoreCheckpointTest { storeCheckpoint.flush(); long diff = physicMsgTimestamp - storeCheckpoint.getMinTimestamp(); - assertTrue(diff == 3000); + assertThat(diff).isEqualTo(3000); storeCheckpoint.shutdown(); storeCheckpoint = new StoreCheckpoint("target/checkpoint_test/0000"); - assertTrue(physicMsgTimestamp == storeCheckpoint.getPhysicMsgTimestamp()); - assertTrue(logicsMsgTimestamp == storeCheckpoint.getLogicsMsgTimestamp()); + assertThat(storeCheckpoint.getPhysicMsgTimestamp()).isEqualTo(physicMsgTimestamp); + assertThat(storeCheckpoint.getLogicsMsgTimestamp()).isEqualTo(logicsMsgTimestamp); } } diff --git a/store/src/test/java/org/apache/rocketmq/store/index/IndexFileTest.java b/store/src/test/java/org/apache/rocketmq/store/index/IndexFileTest.java index 956f8674..f172e65a 100644 --- a/store/src/test/java/org/apache/rocketmq/store/index/IndexFileTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/index/IndexFileTest.java @@ -24,47 +24,43 @@ import java.util.ArrayList; import java.util.List; import org.junit.Test; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; +import static org.assertj.core.api.Assertions.assertThat; public class IndexFileTest { - private static final int HASH_SLOT_NUM = 100; - private static final int INDEX_NUM = 400; + private final int HASH_SLOT_NUM = 100; + private final int INDEX_NUM = 400; @Test - public void test_put_index() throws Exception { + public void testPutKey() throws Exception { IndexFile indexFile = new IndexFile("100", HASH_SLOT_NUM, INDEX_NUM, 0, 0); for (long i = 0; i < (INDEX_NUM - 1); i++) { boolean putResult = indexFile.putKey(Long.toString(i), i, System.currentTimeMillis()); - assertTrue(putResult); + assertThat(putResult).isTrue(); } // put over index file capacity. boolean putResult = indexFile.putKey(Long.toString(400), 400, System.currentTimeMillis()); - assertFalse(putResult); - + assertThat(putResult).isFalse(); indexFile.destroy(0); } @Test - public void test_put_get_index() throws Exception { + public void testSelectPhyOffset() throws Exception { IndexFile indexFile = new IndexFile("200", HASH_SLOT_NUM, INDEX_NUM, 0, 0); for (long i = 0; i < (INDEX_NUM - 1); i++) { boolean putResult = indexFile.putKey(Long.toString(i), i, System.currentTimeMillis()); - assertTrue(putResult); + assertThat(putResult).isTrue(); } // put over index file capacity. boolean putResult = indexFile.putKey(Long.toString(400), 400, System.currentTimeMillis()); - assertFalse(putResult); + assertThat(putResult).isFalse(); final List phyOffsets = new ArrayList(); indexFile.selectPhyOffset(phyOffsets, "60", 10, 0, Long.MAX_VALUE, true); - assertFalse(phyOffsets.isEmpty()); - assertEquals(1, phyOffsets.size()); - + assertThat(phyOffsets).isNotEmpty(); + assertThat(phyOffsets.size()).isEqualTo(1); indexFile.destroy(0); } } diff --git a/store/src/test/java/org/apache/rocketmq/store/schedule/ScheduleMessageTest.java b/store/src/test/java/org/apache/rocketmq/store/schedule/ScheduleMessageTest.java deleted file mode 100644 index 692dfc0d..00000000 --- a/store/src/test/java/org/apache/rocketmq/store/schedule/ScheduleMessageTest.java +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * $Id: ScheduleMessageTest.java 1831 2013-05-16 01:39:51Z vintagewang@apache.org $ - */ -package org.apache.rocketmq.store.schedule; - -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.util.concurrent.atomic.AtomicInteger; -import org.apache.rocketmq.store.DefaultMessageStore; -import org.apache.rocketmq.store.GetMessageResult; -import org.apache.rocketmq.store.MessageExtBrokerInner; -import org.apache.rocketmq.store.MessageStore; -import org.apache.rocketmq.store.PutMessageResult; -import org.apache.rocketmq.store.config.MessageStoreConfig; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Ignore; -import org.junit.Test; - -import static org.junit.Assert.assertTrue; - -@Ignore -public class ScheduleMessageTest { - private static final String StoreMessage = "Once, there was a chance for me!"; - - private static int QUEUE_TOTAL = 100; - - private static AtomicInteger QueueId = new AtomicInteger(0); - - private static SocketAddress BornHost; - - private static SocketAddress StoreHost; - - private static byte[] MessageBody; - - @BeforeClass - public static void setUpBeforeClass() throws Exception { - StoreHost = new InetSocketAddress(InetAddress.getLocalHost(), 8123); - BornHost = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 0); - } - - @AfterClass - public static void tearDownAfterClass() throws Exception { - } - - @Test - public void test_delay_message() throws Exception { - System.out.println("================================================================"); - long totalMsgs = 10000; - QUEUE_TOTAL = 32; - - MessageBody = StoreMessage.getBytes(); - - MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); - messageStoreConfig.setMapedFileSizeCommitLog(1024 * 32); - messageStoreConfig.setMapedFileSizeConsumeQueue(1024 * 16); - messageStoreConfig.setMaxHashSlotNum(100); - messageStoreConfig.setMaxIndexNum(1000 * 10); - - MessageStore master = new DefaultMessageStore(messageStoreConfig, null, null, null); - - boolean load = master.load(); - assertTrue(load); - - master.start(); - for (int i = 0; i < totalMsgs; i++) { - MessageExtBrokerInner msg = buildMessage(); - msg.setDelayTimeLevel(i % 4); - - PutMessageResult result = master.putMessage(msg); - System.out.println(i + "\t" + result.getAppendMessageResult().getMsgId()); - } - - System.out.println("write message over, wait time up"); - Thread.sleep(1000 * 20); - - for (long i = 0; i < totalMsgs; i++) { - try { - GetMessageResult result = master.getMessage("GROUP_A", "TOPIC_A", 0, i, 1024 * 1024, null); - if (result == null) { - System.out.println("result == null " + i); - } - assertTrue(result != null); - result.release(); - System.out.println("read " + i + " OK"); - } catch (Exception e) { - e.printStackTrace(); - } - - } - - Thread.sleep(1000 * 15); - - master.shutdown(); - - master.destroy(); - System.out.println("================================================================"); - } - - public MessageExtBrokerInner buildMessage() { - MessageExtBrokerInner msg = new MessageExtBrokerInner(); - msg.setTopic("AAA"); - msg.setTags("TAG1"); - msg.setKeys("Hello"); - msg.setBody(MessageBody); - msg.setKeys(String.valueOf(System.currentTimeMillis())); - msg.setQueueId(Math.abs(QueueId.getAndIncrement()) % QUEUE_TOTAL); - msg.setSysFlag(4); - msg.setBornTimestamp(System.currentTimeMillis()); - msg.setStoreHost(StoreHost); - msg.setBornHost(BornHost); - - return msg; - } -} -- GitLab