提交 2aaf036e 编写于 作者: W Willem Jiang

ROCKETMQ-3 Clean up the unit test code of rocketmq-store

上级 fe4ecaa2
......@@ -347,12 +347,7 @@
<artifactId>maven-resources-plugin</artifactId>
<version>3.0.2</version>
<configuration>
<resources>
<resource>
<directory>src/main/resources</directory>
<filtering>false</filtering>
</resource>
</resources>
<!-- We are not suppose to setup the customer resources here-->
<encoding>${project.build.sourceEncoding}</encoding>
</configuration>
</plugin>
......
......@@ -29,18 +29,28 @@
<name>rocketmq-store ${project.version}</name>
<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>rocketmq-common</artifactId>
</dependency>
<dependency>
<groupId>net.java.dev.jna</groupId>
<artifactId>jna</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>rocketmq-common</artifactId>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>net.java.dev.jna</groupId>
<artifactId>jna</artifactId>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
......@@ -17,11 +17,14 @@
package com.alibaba.rocketmq.store;
import com.alibaba.rocketmq.common.BrokerConfig;
import com.alibaba.rocketmq.store.config.FlushDiskType;
import com.alibaba.rocketmq.store.config.MessageStoreConfig;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetAddress;
import java.net.InetSocketAddress;
......@@ -35,6 +38,8 @@ import static org.junit.Assert.assertTrue;
* @author shijia.wxr
*/
public class DefaultMessageStoreTest {
private static final Logger logger = LoggerFactory.getLogger(DefaultMessageStoreTest.class);
private static final String StoreMessage = "Once, there was a chance for me!";
private static int QUEUE_TOTAL = 100;
......@@ -59,7 +64,7 @@ public class DefaultMessageStoreTest {
@Test
public void test_write_read() throws Exception {
System.out.println("================================================================");
logger.debug("================================================================");
long totalMsgs = 100;
QUEUE_TOTAL = 1;
MessageBody = StoreMessage.getBytes();
......@@ -69,34 +74,32 @@ public class DefaultMessageStoreTest {
messageStoreConfig.setMapedFileSizeConsumeQueue(1024 * 4);
messageStoreConfig.setMaxHashSlotNum(100);
messageStoreConfig.setMaxIndexNum(100 * 10);
MessageStore master = new DefaultMessageStore(messageStoreConfig, null, null, null);
MessageStore master = new DefaultMessageStore(messageStoreConfig, null, new MyMessageArrivingListener(), new BrokerConfig());
boolean load = master.load();
assertTrue(load);
master.start();
for (long i = 0; i < totalMsgs; i++) {
PutMessageResult result = master.putMessage(buildMessage());
System.out.println(i + "\t" + result.getAppendMessageResult().getMsgId());
}
for (long i = 0; i < totalMsgs; i++) {
try {
try {
for (long i = 0; i < totalMsgs; i++) {
PutMessageResult result = master.putMessage(buildMessage());
logger.debug(i + "\t" + result.getAppendMessageResult().getMsgId());
}
for (long i = 0; i < totalMsgs; i++) {
GetMessageResult result = master.getMessage("GROUP_A", "TOPIC_A", 0, i, 1024 * 1024, null);
if (result == null) {
System.out.println("result == null " + i);
logger.debug("result == null " + i);
}
assertTrue(result != null);
result.release();
System.out.println("read " + i + " OK");
} catch (Exception e) {
e.printStackTrace();
logger.debug("read " + i + " OK");
}
} finally {
master.shutdown();
master.destroy();
}
master.shutdown();
master.destroy();
System.out.println("================================================================");
logger.debug("================================================================");
}
public MessageExtBrokerInner buildMessage() {
......@@ -116,39 +119,46 @@ public class DefaultMessageStoreTest {
@Test
public void test_group_commit() throws Exception {
System.out.println("================================================================");
logger.debug("================================================================");
long totalMsgs = 100;
QUEUE_TOTAL = 1;
MessageBody = StoreMessage.getBytes();
MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
messageStoreConfig.setMapedFileSizeCommitLog(1024 * 8);
messageStoreConfig.setFlushDiskType(FlushDiskType.SYNC_FLUSH);
MessageStore master = new DefaultMessageStore(messageStoreConfig, null, null, null);
MessageStore master = new DefaultMessageStore(messageStoreConfig, null, new MyMessageArrivingListener(), new BrokerConfig());
boolean load = master.load();
assertTrue(load);
master.start();
for (long i = 0; i < totalMsgs; i++) {
PutMessageResult result = master.putMessage(buildMessage());
System.out.println(i + "\t" + result.getAppendMessageResult().getMsgId());
}
for (long i = 0; i < totalMsgs; i++) {
try {
try {
for (long i = 0; i < totalMsgs; i++) {
PutMessageResult result = master.putMessage(buildMessage());
logger.debug(i + "\t" + result.getAppendMessageResult().getMsgId());
}
for (long i = 0; i < totalMsgs; i++) {
GetMessageResult result = master.getMessage("GROUP_A", "TOPIC_A", 0, i, 1024 * 1024, null);
if (result == null) {
System.out.println("result == null " + i);
logger.debug("result == null " + i);
}
assertTrue(result != null);
result.release();
System.out.println("read " + i + " OK");
} catch (Exception e) {
e.printStackTrace();
logger.debug("read " + i + " OK");
}
} finally {
master.shutdown();
master.destroy();
}
logger.debug("================================================================");
}
private class MyMessageArrivingListener implements MessageArrivingListener {
@Override
public void arriving(String topic, int queueId, long logicOffset, long tagsCode) {
// Do nothing here
}
master.shutdown();
master.destroy();
System.out.println("================================================================");
}
}
......@@ -21,11 +21,14 @@
package com.alibaba.rocketmq.store;
import org.junit.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.junit.Assert.*;
public class MappedFileQueueTest {
private static final Logger logger = LoggerFactory.getLogger(MappedFileQueueTest.class);
// private static final String StoreMessage =
// "Once, there was a chance for me! but I did not treasure it. if";
......@@ -49,7 +52,7 @@ public class MappedFileQueueTest {
@Test
public void test_getLastMapedFile() {
final String fixedMsg = "0123456789abcdef";
System.out.println("================================================================");
logger.debug("================================================================");
MappedFileQueue mappedFileQueue =
new MappedFileQueue("target/unit_test_store/a/", 1024, null);
......@@ -58,21 +61,21 @@ public class MappedFileQueueTest {
assertTrue(mappedFile != null);
boolean result = mappedFile.appendMessage(fixedMsg.getBytes());
if (!result) {
System.out.println("appendMessage " + i);
logger.debug("appendMessage " + i);
}
assertTrue(result);
}
mappedFileQueue.shutdown(1000);
mappedFileQueue.destroy();
System.out.println("MappedFileQueue.getLastMappedFile() OK");
logger.debug("MappedFileQueue.getLastMappedFile() OK");
}
@Test
public void test_findMapedFileByOffset() {
final String fixedMsg = "abcd";
System.out.println("================================================================");
logger.debug("================================================================");
MappedFileQueue mappedFileQueue =
new MappedFileQueue("target/unit_test_store/b/", 1024, null);
......@@ -80,40 +83,34 @@ public class MappedFileQueueTest {
MappedFile mappedFile = mappedFileQueue.getLastMappedFile(0);
assertTrue(mappedFile != null);
boolean result = mappedFile.appendMessage(fixedMsg.getBytes());
// System.out.println("appendMessage " + bytes);
// logger.debug("appendMessage " + bytes);
assertTrue(result);
}
MappedFile mappedFile = mappedFileQueue.findMappedFileByOffset(0);
assertTrue(mappedFile != null);
assertEquals(mappedFile.getFileFromOffset(), 0);
System.out.println(mappedFile.getFileFromOffset());
mappedFile = mappedFileQueue.findMappedFileByOffset(100);
assertTrue(mappedFile != null);
assertEquals(mappedFile.getFileFromOffset(), 0);
System.out.println(mappedFile.getFileFromOffset());
mappedFile = mappedFileQueue.findMappedFileByOffset(1024);
assertTrue(mappedFile != null);
assertEquals(mappedFile.getFileFromOffset(), 1024);
System.out.println(mappedFile.getFileFromOffset());
mappedFile = mappedFileQueue.findMappedFileByOffset(1024 + 100);
assertTrue(mappedFile != null);
assertEquals(mappedFile.getFileFromOffset(), 1024);
System.out.println(mappedFile.getFileFromOffset());
mappedFile = mappedFileQueue.findMappedFileByOffset(1024 * 2);
assertTrue(mappedFile != null);
assertEquals(mappedFile.getFileFromOffset(), 1024 * 2);
System.out.println(mappedFile.getFileFromOffset());
mappedFile = mappedFileQueue.findMappedFileByOffset(1024 * 2 + 100);
assertTrue(mappedFile != null);
assertEquals(mappedFile.getFileFromOffset(), 1024 * 2);
System.out.println(mappedFile.getFileFromOffset());
mappedFile = mappedFileQueue.findMappedFileByOffset(1024 * 4);
assertTrue(mappedFile == null);
......@@ -122,13 +119,13 @@ public class MappedFileQueueTest {
mappedFileQueue.shutdown(1000);
mappedFileQueue.destroy();
System.out.println("MappedFileQueue.findMappedFileByOffset() OK");
logger.debug("MappedFileQueue.findMappedFileByOffset() OK");
}
@Test
public void test_commit() {
final String fixedMsg = "0123456789abcdef";
System.out.println("================================================================");
logger.debug("================================================================");
MappedFileQueue mappedFileQueue =
new MappedFileQueue("target/unit_test_store/c/", 1024, null);
......@@ -142,42 +139,36 @@ public class MappedFileQueueTest {
boolean result = mappedFileQueue.flush(0);
assertFalse(result);
assertEquals(1024 * 1, mappedFileQueue.getFlushedWhere());
System.out.println("1 " + result + " " + mappedFileQueue.getFlushedWhere());
result = mappedFileQueue.flush(0);
assertFalse(result);
assertEquals(1024 * 2, mappedFileQueue.getFlushedWhere());
System.out.println("2 " + result + " " + mappedFileQueue.getFlushedWhere());
result = mappedFileQueue.flush(0);
assertFalse(result);
assertEquals(1024 * 3, mappedFileQueue.getFlushedWhere());
System.out.println("3 " + result + " " + mappedFileQueue.getFlushedWhere());
result = mappedFileQueue.flush(0);
assertFalse(result);
assertEquals(1024 * 4, mappedFileQueue.getFlushedWhere());
System.out.println("4 " + result + " " + mappedFileQueue.getFlushedWhere());
result = mappedFileQueue.flush(0);
assertFalse(result);
assertEquals(1024 * 5, mappedFileQueue.getFlushedWhere());
System.out.println("5 " + result + " " + mappedFileQueue.getFlushedWhere());
result = mappedFileQueue.flush(0);
assertFalse(result);
assertEquals(1024 * 6, mappedFileQueue.getFlushedWhere());
System.out.println("6 " + result + " " + mappedFileQueue.getFlushedWhere());
mappedFileQueue.shutdown(1000);
mappedFileQueue.destroy();
System.out.println("MappedFileQueue.flush() OK");
logger.debug("MappedFileQueue.flush() OK");
}
@Test
public void test_getMapedMemorySize() {
final String fixedMsg = "abcd";
System.out.println("================================================================");
logger.debug("================================================================");
MappedFileQueue mappedFileQueue =
new MappedFileQueue("target/unit_test_store/d/", 1024, null);
......@@ -191,7 +182,7 @@ public class MappedFileQueueTest {
assertEquals(fixedMsg.length() * 1024, mappedFileQueue.getMappedMemorySize());
mappedFileQueue.shutdown(1000);
mappedFileQueue.destroy();
System.out.println("MappedFileQueue.getMappedMemorySize() OK");
logger.debug("MappedFileQueue.getMappedMemorySize() OK");
}
}
......@@ -24,6 +24,8 @@ import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
......@@ -31,7 +33,10 @@ import static org.junit.Assert.assertTrue;
public class MappedFileTest {
private static final Logger logger = LoggerFactory.getLogger(MappedFileTest.class);
private static final String StoreMessage = "Once, there was a chance for me!";
@BeforeClass
......@@ -44,50 +49,41 @@ public class MappedFileTest {
}
@Test
public void test_write_read() {
try {
MappedFile mappedFile = new MappedFile("target/unit_test_store/MappedFileTest/000", 1024 * 64);
boolean result = mappedFile.appendMessage(StoreMessage.getBytes());
assertTrue(result);
System.out.println("write OK");
SelectMappedBufferResult selectMappedBufferResult = mappedFile.selectMappedBuffer(0);
byte[] data = new byte[StoreMessage.length()];
selectMappedBufferResult.getByteBuffer().get(data);
String readString = new String(data);
System.out.println("Read: " + readString);
assertTrue(readString.equals(StoreMessage));
mappedFile.shutdown(1000);
assertTrue(!mappedFile.isAvailable());
selectMappedBufferResult.release();
assertTrue(mappedFile.isCleanupOver());
assertTrue(mappedFile.destroy(1000));
} catch (IOException e) {
e.printStackTrace();
}
public void test_write_read() throws IOException {
MappedFile mappedFile = new MappedFile("target/unit_test_store/MappedFileTest/000", 1024 * 64);
boolean result = mappedFile.appendMessage(StoreMessage.getBytes());
assertTrue(result);
logger.debug("write OK");
SelectMappedBufferResult selectMappedBufferResult = mappedFile.selectMappedBuffer(0);
byte[] data = new byte[StoreMessage.length()];
selectMappedBufferResult.getByteBuffer().get(data);
String readString = new String(data);
logger.debug("Read: " + readString);
assertTrue(readString.equals(StoreMessage));
mappedFile.shutdown(1000);
assertTrue(!mappedFile.isAvailable());
selectMappedBufferResult.release();
assertTrue(mappedFile.isCleanupOver());
assertTrue(mappedFile.destroy(1000));
}
@Ignore
public void test_jvm_crashed() {
try {
MappedFile mappedFile = new MappedFile("target/unit_test_store/MappedFileTest/10086", 1024 * 64);
boolean result = mappedFile.appendMessage(StoreMessage.getBytes());
assertTrue(result);
System.out.println("write OK");
SelectMappedBufferResult selectMappedBufferResult = mappedFile.selectMappedBuffer(0);
selectMappedBufferResult.release();
mappedFile.shutdown(1000);
byte[] data = new byte[StoreMessage.length()];
selectMappedBufferResult.getByteBuffer().get(data);
String readString = new String(data);
System.out.println(readString);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
public void test_jvm_crashed() throws IOException {
MappedFile mappedFile = new MappedFile("target/unit_test_store/MappedFileTest/10086", 1024 * 64);
boolean result = mappedFile.appendMessage(StoreMessage.getBytes());
assertTrue(result);
logger.debug("write OK");
SelectMappedBufferResult selectMappedBufferResult = mappedFile.selectMappedBuffer(0);
selectMappedBufferResult.release();
mappedFile.shutdown(1000);
byte[] data = new byte[StoreMessage.length()];
selectMappedBufferResult.getByteBuffer().get(data);
String readString = new String(data);
logger.debug(readString);
}
}
......@@ -25,6 +25,7 @@ import com.alibaba.rocketmq.common.message.MessageExt;
import com.alibaba.rocketmq.store.config.MessageStoreConfig;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import java.net.InetAddress;
......@@ -36,7 +37,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.assertTrue;
@Ignore("This test need to be fixed!")
public class RecoverTest {
private static final String StoreMessage = "Once, there was a chance for me!aaaaaaaaaaaaaaaaaaaaaaaa";
......
......@@ -24,6 +24,8 @@ import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
import static org.junit.Assert.assertTrue;
......@@ -38,24 +40,19 @@ public class StoreCheckpointTest {
}
@Test
public void test_write_read() {
try {
StoreCheckpoint storeCheckpoint = new StoreCheckpoint("target/checkpoint_test/0000");
long physicMsgTimestamp = 0xAABB;
long logicsMsgTimestamp = 0xCCDD;
storeCheckpoint.setPhysicMsgTimestamp(physicMsgTimestamp);
storeCheckpoint.setLogicsMsgTimestamp(logicsMsgTimestamp);
storeCheckpoint.flush();
long diff = physicMsgTimestamp - storeCheckpoint.getMinTimestamp();
assertTrue(diff == 3000);
storeCheckpoint.shutdown();
storeCheckpoint = new StoreCheckpoint("target/checkpoint_test/0000");
assertTrue(physicMsgTimestamp == storeCheckpoint.getPhysicMsgTimestamp());
assertTrue(logicsMsgTimestamp == storeCheckpoint.getLogicsMsgTimestamp());
} catch (Throwable e) {
e.printStackTrace();
assertTrue(false);
}
public void test_write_read() throws IOException {
StoreCheckpoint storeCheckpoint = new StoreCheckpoint("target/checkpoint_test/0000");
long physicMsgTimestamp = 0xAABB;
long logicsMsgTimestamp = 0xCCDD;
storeCheckpoint.setPhysicMsgTimestamp(physicMsgTimestamp);
storeCheckpoint.setLogicsMsgTimestamp(logicsMsgTimestamp);
storeCheckpoint.flush();
long diff = physicMsgTimestamp - storeCheckpoint.getMinTimestamp();
assertTrue(diff == 3000);
storeCheckpoint.shutdown();
storeCheckpoint = new StoreCheckpoint("target/checkpoint_test/0000");
assertTrue(physicMsgTimestamp == storeCheckpoint.getPhysicMsgTimestamp());
assertTrue(logicsMsgTimestamp == storeCheckpoint.getLogicsMsgTimestamp());
}
}
......@@ -25,6 +25,7 @@ import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
......@@ -34,47 +35,35 @@ public class IndexFileTest {
private static final int indexNum = 400;
@Test
public void test_put_index() {
try {
IndexFile indexFile = new IndexFile("100", hashSlotNum, indexNum, 0, 0);
for (long i = 0; i < (indexNum - 1); i++) {
boolean putResult = indexFile.putKey(Long.toString(i), i, System.currentTimeMillis());
assertTrue(putResult);
}
boolean putResult = indexFile.putKey(Long.toString(400), 400, System.currentTimeMillis());
assertFalse(putResult);
indexFile.destroy(0);
} catch (Exception e) {
e.printStackTrace();
assertTrue(false);
public void test_put_index() throws Exception {
IndexFile indexFile = new IndexFile("100", hashSlotNum, indexNum, 0, 0);
for (long i = 0; i < (indexNum - 1); i++) {
boolean putResult = indexFile.putKey(Long.toString(i), i, System.currentTimeMillis());
assertTrue(putResult);
}
boolean putResult = indexFile.putKey(Long.toString(400), 400, System.currentTimeMillis());
assertFalse(putResult);
indexFile.destroy(0);
}
@Test
public void test_put_get_index() {
try {
IndexFile indexFile = new IndexFile("200", hashSlotNum, indexNum, 0, 0);
for (long i = 0; i < (indexNum - 1); i++) {
boolean putResult = indexFile.putKey(Long.toString(i), i, System.currentTimeMillis());
assertTrue(putResult);
}
boolean putResult = indexFile.putKey(Long.toString(400), 400, System.currentTimeMillis());
assertFalse(putResult);
final List<Long> phyOffsets = new ArrayList<Long>();
indexFile.selectPhyOffset(phyOffsets, "60", 10, 0, Long.MAX_VALUE, true);
for (Long offset : phyOffsets) {
System.out.println(offset);
}
assertFalse(phyOffsets.isEmpty());
indexFile.destroy(0);
} catch (Exception e) {
e.printStackTrace();
assertTrue(false);
public void test_put_get_index() throws Exception {
IndexFile indexFile = new IndexFile("200", hashSlotNum, indexNum, 0, 0);
for (long i = 0; i < (indexNum - 1); i++) {
boolean putResult = indexFile.putKey(Long.toString(i), i, System.currentTimeMillis());
assertTrue(putResult);
}
boolean putResult = indexFile.putKey(Long.toString(400), 400, System.currentTimeMillis());
assertFalse(putResult);
final List<Long> phyOffsets = new ArrayList<Long>();
indexFile.selectPhyOffset(phyOffsets, "60", 10, 0, Long.MAX_VALUE, true);
assertFalse(phyOffsets.isEmpty());
assertEquals(1, phyOffsets.size());
indexFile.destroy(0);
}
}
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain producerGroup copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{5} - %msg%n</pattern>
</encoder>
</appender>
<logger name="com.alibaba.rocketmq.store" level="WARN" additivity="false">
<appender-ref ref="STDOUT" />
</logger>
<root level="WARN">
<appender-ref ref="STDOUT" />
</root>
</configuration>
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册