提交 1128a5ad 编写于 作者: F Frank Lin 提交者: dinglei

[RIP-10]Add test cases for DefaultMessageStore which methods read from...

[RIP-10]Add test cases for DefaultMessageStore which methods read from CommitLog/ConsumeQueue (#778)

[RIP-10]Add test cases for DefaultMessageStore which methods read from CommitLog/ConsumeQueue
上级 95db64dc
...@@ -19,28 +19,36 @@ package org.apache.rocketmq.store; ...@@ -19,28 +19,36 @@ package org.apache.rocketmq.store;
import java.io.File; import java.io.File;
import java.io.RandomAccessFile; import java.io.RandomAccessFile;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer; import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel; import java.nio.channels.FileChannel;
import java.nio.channels.OverlappingFileLockException; import java.nio.channels.OverlappingFileLockException;
import java.util.Map; import java.util.Map;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.store.config.FlushDiskType; import org.apache.rocketmq.store.config.FlushDiskType;
import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.config.StorePathConfigHelper; import org.apache.rocketmq.store.config.StorePathConfigHelper;
import org.junit.After;
import org.apache.rocketmq.store.stats.BrokerStatsManager; import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
@RunWith(MockitoJUnitRunner.class)
public class DefaultMessageStoreTest { public class DefaultMessageStoreTest {
private final String StoreMessage = "Once, there was a chance for me!"; private final String StoreMessage = "Once, there was a chance for me!";
private int QUEUE_TOTAL = 100; private int QUEUE_TOTAL = 100;
...@@ -62,7 +70,7 @@ public class DefaultMessageStoreTest { ...@@ -62,7 +70,7 @@ public class DefaultMessageStoreTest {
} }
@Test(expected = OverlappingFileLockException.class) @Test(expected = OverlappingFileLockException.class)
public void test_repate_restart() throws Exception { public void test_repeat_restart() throws Exception {
QUEUE_TOTAL = 1; QUEUE_TOTAL = 1;
MessageBody = StoreMessage.getBytes(); MessageBody = StoreMessage.getBytes();
...@@ -86,7 +94,7 @@ public class DefaultMessageStoreTest { ...@@ -86,7 +94,7 @@ public class DefaultMessageStoreTest {
} }
@After @After
public void destory() { public void destroy() {
messageStore.shutdown(); messageStore.shutdown();
messageStore.destroy(); messageStore.destroy();
...@@ -123,12 +131,269 @@ public class DefaultMessageStoreTest { ...@@ -123,12 +131,269 @@ public class DefaultMessageStoreTest {
verifyThatMasterIsFunctional(totalMsgs, messageStore); verifyThatMasterIsFunctional(totalMsgs, messageStore);
} }
private MessageExtBrokerInner buildMessage() { @Test
public void should_look_message_successfully_when_offset_is_first() {
final int totalCount = 10;
int queueId = new Random().nextInt(10);
String topic = "FooBar";
int firstOffset = 0;
AppendMessageResult[] appendMessageResultArray = putMessages(totalCount, topic, queueId);
AppendMessageResult firstResult = appendMessageResultArray[0];
MessageExt messageExt = messageStore.lookMessageByOffset(firstResult.getWroteOffset());
MessageExt messageExt1 = getDefaultMessageStore().lookMessageByOffset(firstResult.getWroteOffset(), firstResult.getWroteBytes());
assertThat(new String(messageExt.getBody())).isEqualTo(buildMessageBodyByOffset(StoreMessage, firstOffset));
assertThat(new String(messageExt1.getBody())).isEqualTo(buildMessageBodyByOffset(StoreMessage, firstOffset));
}
@Test
public void should_look_message_successfully_when_offset_is_last() {
final int totalCount = 10;
int queueId = new Random().nextInt(10);
String topic = "FooBar";
AppendMessageResult[] appendMessageResultArray = putMessages(totalCount, topic, queueId);
int lastIndex = totalCount - 1;
AppendMessageResult lastResult = appendMessageResultArray[lastIndex];
MessageExt messageExt = getDefaultMessageStore().lookMessageByOffset(lastResult.getWroteOffset(), lastResult.getWroteBytes());
assertThat(new String(messageExt.getBody())).isEqualTo(buildMessageBodyByOffset(StoreMessage, lastIndex));
}
@Test
public void should_look_message_failed_and_return_null_when_offset_is_out_of_bound() {
final int totalCount = 10;
int queueId = new Random().nextInt(10);
String topic = "FooBar";
AppendMessageResult[] appendMessageResultArray = putMessages(totalCount, topic, queueId);
long lastOffset = getMaxOffset(appendMessageResultArray);
MessageExt messageExt = getDefaultMessageStore().lookMessageByOffset(lastOffset);
assertThat(messageExt).isNull();
}
@Test
public void should_get_consume_queue_offset_successfully_when_incomming_by_timestamp() throws InterruptedException {
final int totalCount = 10;
int queueId = 0;
String topic = "FooBar";
AppendMessageResult[] appendMessageResults = putMessages(totalCount, topic, queueId, true);
Thread.sleep(10);
ConsumeQueue consumeQueue = getDefaultMessageStore().findConsumeQueue(topic, queueId);
for (AppendMessageResult appendMessageResult : appendMessageResults) {
long offset = messageStore.getOffsetInQueueByTime(topic, queueId, appendMessageResult.getStoreTimestamp());
SelectMappedBufferResult indexBuffer = consumeQueue.getIndexBuffer(offset);
assertThat(indexBuffer.getByteBuffer().getLong()).isEqualTo(appendMessageResult.getWroteOffset());
assertThat(indexBuffer.getByteBuffer().getInt()).isEqualTo(appendMessageResult.getWroteBytes());
indexBuffer.release();
}
}
@Test
public void should_get_consume_queue_offset_successfully_when_timestamp_is_skewing() throws InterruptedException {
final int totalCount = 10;
int queueId = 0;
String topic = "FooBar";
AppendMessageResult[] appendMessageResults = putMessages(totalCount, topic, queueId, true);
Thread.sleep(10);
int skewing = 2;
ConsumeQueue consumeQueue = getDefaultMessageStore().findConsumeQueue(topic, queueId);
for (AppendMessageResult appendMessageResult : appendMessageResults) {
long offset = messageStore.getOffsetInQueueByTime(topic, queueId, appendMessageResult.getStoreTimestamp() + skewing);
long offset2 = messageStore.getOffsetInQueueByTime(topic, queueId, appendMessageResult.getStoreTimestamp() - skewing);
SelectMappedBufferResult indexBuffer = consumeQueue.getIndexBuffer(offset);
SelectMappedBufferResult indexBuffer2 = consumeQueue.getIndexBuffer(offset2);
assertThat(indexBuffer.getByteBuffer().getLong()).isEqualTo(appendMessageResult.getWroteOffset());
assertThat(indexBuffer.getByteBuffer().getInt()).isEqualTo(appendMessageResult.getWroteBytes());
assertThat(indexBuffer2.getByteBuffer().getLong()).isEqualTo(appendMessageResult.getWroteOffset());
assertThat(indexBuffer2.getByteBuffer().getInt()).isEqualTo(appendMessageResult.getWroteBytes());
indexBuffer.release();
indexBuffer2.release();
}
}
@Test
public void should_get_min_of_max_consume_queue_offset_when_timestamp_s_skewing_is_large() throws InterruptedException {
final int totalCount = 10;
int queueId = 0;
String topic = "FooBar";
AppendMessageResult[] appendMessageResults = putMessages(totalCount, topic, queueId, true);
Thread.sleep(10);
int skewing = 20000;
ConsumeQueue consumeQueue = getDefaultMessageStore().findConsumeQueue(topic, queueId);
for (AppendMessageResult appendMessageResult : appendMessageResults) {
long offset = messageStore.getOffsetInQueueByTime(topic, queueId, appendMessageResult.getStoreTimestamp() + skewing);
long offset2 = messageStore.getOffsetInQueueByTime(topic, queueId, appendMessageResult.getStoreTimestamp() - skewing);
SelectMappedBufferResult indexBuffer = consumeQueue.getIndexBuffer(offset);
SelectMappedBufferResult indexBuffer2 = consumeQueue.getIndexBuffer(offset2);
assertThat(indexBuffer.getByteBuffer().getLong()).isEqualTo(appendMessageResults[totalCount - 1].getWroteOffset());
assertThat(indexBuffer.getByteBuffer().getInt()).isEqualTo(appendMessageResults[totalCount - 1].getWroteBytes());
assertThat(indexBuffer2.getByteBuffer().getLong()).isEqualTo(appendMessageResults[0].getWroteOffset());
assertThat(indexBuffer2.getByteBuffer().getInt()).isEqualTo(appendMessageResults[0].getWroteBytes());
}
}
@Test
public void should_return_zero_when_consume_queue_not_found() throws InterruptedException {
final int totalCount = 10;
int queueId = 0;
int wrongQueueId = 1;
String topic = "FooBar";
AppendMessageResult[] appendMessageResults = putMessages(totalCount, topic, queueId, false);
Thread.sleep(10);
long offset = messageStore.getOffsetInQueueByTime(topic, wrongQueueId, appendMessageResults[0].getStoreTimestamp());
assertThat(offset).isEqualTo(0);
}
@Test
public void should_return_negative_one_when_invoke_getMessageStoreTimeStamp_if_consume_queue_not_found() throws InterruptedException {
final int totalCount = 10;
int queueId = 0;
int wrongQueueId = 1;
String topic = "FooBar";
putMessages(totalCount, topic, queueId, false);
Thread.sleep(10);
long messageStoreTimeStamp = messageStore.getMessageStoreTimeStamp(topic, wrongQueueId, 0);
assertThat(messageStoreTimeStamp).isEqualTo(-1);
}
@Test
public void should_return_negative_one_when_invoke_getMessageStoreTimeStamp_if_consumeQueueOffset_not_exist() throws InterruptedException {
final int totalCount = 10;
int queueId = 0;
int wrongQueueId = 1;
String topic = "FooBar";
putMessages(totalCount, topic, queueId, true);
Thread.sleep(10);
long messageStoreTimeStamp = messageStore.getMessageStoreTimeStamp(topic, wrongQueueId, -1);
assertThat(messageStoreTimeStamp).isEqualTo(-1);
}
@Test
public void should_get_message_store_timestamp_successfully_when_incomming_by_topic_queueId_and_consumeQueueOffset() throws InterruptedException {
final int totalCount = 10;
int queueId = 0;
String topic = "FooBar";
AppendMessageResult[] appendMessageResults = putMessages(totalCount, topic, queueId, false);
Thread.sleep(10);
ConsumeQueue consumeQueue = getDefaultMessageStore().findConsumeQueue(topic, queueId);
int minOffsetInQueue = (int)consumeQueue.getMinOffsetInQueue();
for (int i = minOffsetInQueue; i < consumeQueue.getMaxOffsetInQueue(); i++) {
long messageStoreTimeStamp = messageStore.getMessageStoreTimeStamp(topic, queueId, i);
assertThat(messageStoreTimeStamp).isEqualTo(appendMessageResults[i].getStoreTimestamp());
}
}
@Test
public void should_return_negative_one_when_invoke_getStoreTime_if_incomming_param_is_null() {
long storeTime = getStoreTime(null);
assertThat(storeTime).isEqualTo(-1);
}
@Test
public void should_get_store_time_successfully_when_invoke_getStoreTime_if_everything_is_ok() throws InterruptedException {
final int totalCount = 10;
int queueId = 0;
String topic = "FooBar";
AppendMessageResult[] appendMessageResults = putMessages(totalCount, topic, queueId, false);
Thread.sleep(10);
ConsumeQueue consumeQueue = messageStore.getConsumeQueue(topic, queueId);
for (int i = 0; i < totalCount; i++) {
SelectMappedBufferResult indexBuffer = consumeQueue.getIndexBuffer(i);
long storeTime = getStoreTime(indexBuffer);
assertThat(storeTime).isEqualTo(appendMessageResults[i].getStoreTimestamp());
indexBuffer.release();
}
}
@Test
public void should_return_negative_one_when_invoke_getStoreTime_if_phyOffset_is_less_than_commitLog_s_minOffset() {
long phyOffset = -10;
int size = 138;
ByteBuffer byteBuffer = ByteBuffer.allocate(100);
byteBuffer.putLong(phyOffset);
byteBuffer.putInt(size);
byteBuffer.flip();
MappedFile mappedFile = mock(MappedFile.class);
SelectMappedBufferResult result = new SelectMappedBufferResult(0, byteBuffer, size, mappedFile);
long storeTime = getStoreTime(result);
result.release();
assertThat(storeTime).isEqualTo(-1);
}
private DefaultMessageStore getDefaultMessageStore() {
return (DefaultMessageStore)this.messageStore;
}
private AppendMessageResult[] putMessages(int totalCount, String topic, int queueId) {
return putMessages(totalCount, topic, queueId, false);
}
private AppendMessageResult[] putMessages(int totalCount, String topic, int queueId, boolean interval) {
AppendMessageResult[] appendMessageResultArray = new AppendMessageResult[totalCount];
for (int i = 0; i < totalCount; i++) {
String messageBody = buildMessageBodyByOffset(StoreMessage, i);
MessageExtBrokerInner msgInner = buildMessage(messageBody.getBytes(), topic);
msgInner.setQueueId(queueId);
PutMessageResult result = messageStore.putMessage(msgInner);
appendMessageResultArray[i] = result.getAppendMessageResult();
assertThat(result.getPutMessageStatus()).isEqualTo(PutMessageStatus.PUT_OK);
if (interval) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException("Thread sleep ERROR");
}
}
}
return appendMessageResultArray;
}
private long getMaxOffset(AppendMessageResult[] appendMessageResultArray) {
if (appendMessageResultArray == null) {
return 0;
}
AppendMessageResult last = appendMessageResultArray[appendMessageResultArray.length - 1];
return last.getWroteOffset() + last.getWroteBytes();
}
private String buildMessageBodyByOffset(String message, long i) {
return String.format("%s offset %d", message, i);
}
private long getStoreTime(SelectMappedBufferResult result) {
try {
Method getStoreTime = getDefaultMessageStore().getClass().getDeclaredMethod("getStoreTime", SelectMappedBufferResult.class);
getStoreTime.setAccessible(true);
return (long)getStoreTime.invoke(getDefaultMessageStore(), result);
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
}
}
private MessageExtBrokerInner buildMessage(byte[] messageBody, String topic) {
MessageExtBrokerInner msg = new MessageExtBrokerInner(); MessageExtBrokerInner msg = new MessageExtBrokerInner();
msg.setTopic("FooBar"); msg.setTopic(topic);
msg.setTags("TAG1"); msg.setTags("TAG1");
msg.setKeys("Hello"); msg.setKeys("Hello");
msg.setBody(MessageBody); msg.setBody(messageBody);
msg.setKeys(String.valueOf(System.currentTimeMillis())); msg.setKeys(String.valueOf(System.currentTimeMillis()));
msg.setQueueId(Math.abs(QueueId.getAndIncrement()) % QUEUE_TOTAL); msg.setQueueId(Math.abs(QueueId.getAndIncrement()) % QUEUE_TOTAL);
msg.setSysFlag(0); msg.setSysFlag(0);
...@@ -138,6 +403,10 @@ public class DefaultMessageStoreTest { ...@@ -138,6 +403,10 @@ public class DefaultMessageStoreTest {
return msg; return msg;
} }
private MessageExtBrokerInner buildMessage() {
return buildMessage(MessageBody, "FooBar");
}
private void verifyThatMasterIsFunctional(long totalMsgs, MessageStore master) { private void verifyThatMasterIsFunctional(long totalMsgs, MessageStore master) {
for (long i = 0; i < totalMsgs; i++) { for (long i = 0; i < totalMsgs; i++) {
master.putMessage(buildMessage()); master.putMessage(buildMessage());
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册