From 1128a5ad3757906d81fe6f5de710106245091a75 Mon Sep 17 00:00:00 2001 From: Frank Lin Date: Wed, 20 Feb 2019 16:25:24 +0800 Subject: [PATCH] [RIP-10]Add test cases for DefaultMessageStore which methods read from CommitLog/ConsumeQueue (#778) [RIP-10]Add test cases for DefaultMessageStore which methods read from CommitLog/ConsumeQueue --- .../store/DefaultMessageStoreTest.java | 283 +++++++++++++++++- 1 file changed, 276 insertions(+), 7 deletions(-) diff --git a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java index 57b6999c..ad4ca919 100644 --- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java @@ -19,28 +19,36 @@ package org.apache.rocketmq.store; import java.io.File; import java.io.RandomAccessFile; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; +import java.nio.ByteBuffer; import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; import java.nio.channels.OverlappingFileLockException; import java.util.Map; +import java.util.Random; import java.util.concurrent.atomic.AtomicInteger; - import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.store.config.FlushDiskType; import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.config.StorePathConfigHelper; -import org.junit.After; import org.apache.rocketmq.store.stats.BrokerStatsManager; +import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.junit.MockitoJUnitRunner; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +@RunWith(MockitoJUnitRunner.class) public class DefaultMessageStoreTest { private final String StoreMessage = "Once, there was a chance for me!"; private int QUEUE_TOTAL = 100; @@ -62,7 +70,7 @@ public class DefaultMessageStoreTest { } @Test(expected = OverlappingFileLockException.class) - public void test_repate_restart() throws Exception { + public void test_repeat_restart() throws Exception { QUEUE_TOTAL = 1; MessageBody = StoreMessage.getBytes(); @@ -86,7 +94,7 @@ public class DefaultMessageStoreTest { } @After - public void destory() { + public void destroy() { messageStore.shutdown(); messageStore.destroy(); @@ -123,12 +131,269 @@ public class DefaultMessageStoreTest { verifyThatMasterIsFunctional(totalMsgs, messageStore); } - private MessageExtBrokerInner buildMessage() { + @Test + public void should_look_message_successfully_when_offset_is_first() { + final int totalCount = 10; + int queueId = new Random().nextInt(10); + String topic = "FooBar"; + int firstOffset = 0; + AppendMessageResult[] appendMessageResultArray = putMessages(totalCount, topic, queueId); + AppendMessageResult firstResult = appendMessageResultArray[0]; + + MessageExt messageExt = messageStore.lookMessageByOffset(firstResult.getWroteOffset()); + MessageExt messageExt1 = getDefaultMessageStore().lookMessageByOffset(firstResult.getWroteOffset(), firstResult.getWroteBytes()); + + assertThat(new String(messageExt.getBody())).isEqualTo(buildMessageBodyByOffset(StoreMessage, firstOffset)); + assertThat(new String(messageExt1.getBody())).isEqualTo(buildMessageBodyByOffset(StoreMessage, firstOffset)); + } + + @Test + public void should_look_message_successfully_when_offset_is_last() { + final int totalCount = 10; + int queueId = new Random().nextInt(10); + String topic = "FooBar"; + AppendMessageResult[] appendMessageResultArray = putMessages(totalCount, topic, queueId); + int lastIndex = totalCount - 1; + AppendMessageResult lastResult = appendMessageResultArray[lastIndex]; + + MessageExt messageExt = getDefaultMessageStore().lookMessageByOffset(lastResult.getWroteOffset(), lastResult.getWroteBytes()); + + assertThat(new String(messageExt.getBody())).isEqualTo(buildMessageBodyByOffset(StoreMessage, lastIndex)); + } + + @Test + public void should_look_message_failed_and_return_null_when_offset_is_out_of_bound() { + final int totalCount = 10; + int queueId = new Random().nextInt(10); + String topic = "FooBar"; + AppendMessageResult[] appendMessageResultArray = putMessages(totalCount, topic, queueId); + long lastOffset = getMaxOffset(appendMessageResultArray); + + MessageExt messageExt = getDefaultMessageStore().lookMessageByOffset(lastOffset); + + assertThat(messageExt).isNull(); + } + + @Test + public void should_get_consume_queue_offset_successfully_when_incomming_by_timestamp() throws InterruptedException { + final int totalCount = 10; + int queueId = 0; + String topic = "FooBar"; + AppendMessageResult[] appendMessageResults = putMessages(totalCount, topic, queueId, true); + Thread.sleep(10); + + ConsumeQueue consumeQueue = getDefaultMessageStore().findConsumeQueue(topic, queueId); + for (AppendMessageResult appendMessageResult : appendMessageResults) { + long offset = messageStore.getOffsetInQueueByTime(topic, queueId, appendMessageResult.getStoreTimestamp()); + SelectMappedBufferResult indexBuffer = consumeQueue.getIndexBuffer(offset); + assertThat(indexBuffer.getByteBuffer().getLong()).isEqualTo(appendMessageResult.getWroteOffset()); + assertThat(indexBuffer.getByteBuffer().getInt()).isEqualTo(appendMessageResult.getWroteBytes()); + indexBuffer.release(); + } + } + + @Test + public void should_get_consume_queue_offset_successfully_when_timestamp_is_skewing() throws InterruptedException { + final int totalCount = 10; + int queueId = 0; + String topic = "FooBar"; + AppendMessageResult[] appendMessageResults = putMessages(totalCount, topic, queueId, true); + Thread.sleep(10); + int skewing = 2; + + ConsumeQueue consumeQueue = getDefaultMessageStore().findConsumeQueue(topic, queueId); + for (AppendMessageResult appendMessageResult : appendMessageResults) { + long offset = messageStore.getOffsetInQueueByTime(topic, queueId, appendMessageResult.getStoreTimestamp() + skewing); + long offset2 = messageStore.getOffsetInQueueByTime(topic, queueId, appendMessageResult.getStoreTimestamp() - skewing); + SelectMappedBufferResult indexBuffer = consumeQueue.getIndexBuffer(offset); + SelectMappedBufferResult indexBuffer2 = consumeQueue.getIndexBuffer(offset2); + assertThat(indexBuffer.getByteBuffer().getLong()).isEqualTo(appendMessageResult.getWroteOffset()); + assertThat(indexBuffer.getByteBuffer().getInt()).isEqualTo(appendMessageResult.getWroteBytes()); + assertThat(indexBuffer2.getByteBuffer().getLong()).isEqualTo(appendMessageResult.getWroteOffset()); + assertThat(indexBuffer2.getByteBuffer().getInt()).isEqualTo(appendMessageResult.getWroteBytes()); + indexBuffer.release(); + indexBuffer2.release(); + } + } + + @Test + public void should_get_min_of_max_consume_queue_offset_when_timestamp_s_skewing_is_large() throws InterruptedException { + final int totalCount = 10; + int queueId = 0; + String topic = "FooBar"; + AppendMessageResult[] appendMessageResults = putMessages(totalCount, topic, queueId, true); + Thread.sleep(10); + int skewing = 20000; + + ConsumeQueue consumeQueue = getDefaultMessageStore().findConsumeQueue(topic, queueId); + for (AppendMessageResult appendMessageResult : appendMessageResults) { + long offset = messageStore.getOffsetInQueueByTime(topic, queueId, appendMessageResult.getStoreTimestamp() + skewing); + long offset2 = messageStore.getOffsetInQueueByTime(topic, queueId, appendMessageResult.getStoreTimestamp() - skewing); + SelectMappedBufferResult indexBuffer = consumeQueue.getIndexBuffer(offset); + SelectMappedBufferResult indexBuffer2 = consumeQueue.getIndexBuffer(offset2); + assertThat(indexBuffer.getByteBuffer().getLong()).isEqualTo(appendMessageResults[totalCount - 1].getWroteOffset()); + assertThat(indexBuffer.getByteBuffer().getInt()).isEqualTo(appendMessageResults[totalCount - 1].getWroteBytes()); + assertThat(indexBuffer2.getByteBuffer().getLong()).isEqualTo(appendMessageResults[0].getWroteOffset()); + assertThat(indexBuffer2.getByteBuffer().getInt()).isEqualTo(appendMessageResults[0].getWroteBytes()); + } + } + + @Test + public void should_return_zero_when_consume_queue_not_found() throws InterruptedException { + final int totalCount = 10; + int queueId = 0; + int wrongQueueId = 1; + String topic = "FooBar"; + AppendMessageResult[] appendMessageResults = putMessages(totalCount, topic, queueId, false); + Thread.sleep(10); + + long offset = messageStore.getOffsetInQueueByTime(topic, wrongQueueId, appendMessageResults[0].getStoreTimestamp()); + + assertThat(offset).isEqualTo(0); + } + + @Test + public void should_return_negative_one_when_invoke_getMessageStoreTimeStamp_if_consume_queue_not_found() throws InterruptedException { + final int totalCount = 10; + int queueId = 0; + int wrongQueueId = 1; + String topic = "FooBar"; + putMessages(totalCount, topic, queueId, false); + Thread.sleep(10); + + long messageStoreTimeStamp = messageStore.getMessageStoreTimeStamp(topic, wrongQueueId, 0); + + assertThat(messageStoreTimeStamp).isEqualTo(-1); + } + + @Test + public void should_return_negative_one_when_invoke_getMessageStoreTimeStamp_if_consumeQueueOffset_not_exist() throws InterruptedException { + final int totalCount = 10; + int queueId = 0; + int wrongQueueId = 1; + String topic = "FooBar"; + putMessages(totalCount, topic, queueId, true); + Thread.sleep(10); + + long messageStoreTimeStamp = messageStore.getMessageStoreTimeStamp(topic, wrongQueueId, -1); + + assertThat(messageStoreTimeStamp).isEqualTo(-1); + } + + + @Test + public void should_get_message_store_timestamp_successfully_when_incomming_by_topic_queueId_and_consumeQueueOffset() throws InterruptedException { + final int totalCount = 10; + int queueId = 0; + String topic = "FooBar"; + AppendMessageResult[] appendMessageResults = putMessages(totalCount, topic, queueId, false); + Thread.sleep(10); + + ConsumeQueue consumeQueue = getDefaultMessageStore().findConsumeQueue(topic, queueId); + int minOffsetInQueue = (int)consumeQueue.getMinOffsetInQueue(); + for (int i = minOffsetInQueue; i < consumeQueue.getMaxOffsetInQueue(); i++) { + long messageStoreTimeStamp = messageStore.getMessageStoreTimeStamp(topic, queueId, i); + assertThat(messageStoreTimeStamp).isEqualTo(appendMessageResults[i].getStoreTimestamp()); + } + } + + @Test + public void should_return_negative_one_when_invoke_getStoreTime_if_incomming_param_is_null() { + long storeTime = getStoreTime(null); + + assertThat(storeTime).isEqualTo(-1); + } + + @Test + public void should_get_store_time_successfully_when_invoke_getStoreTime_if_everything_is_ok() throws InterruptedException { + final int totalCount = 10; + int queueId = 0; + String topic = "FooBar"; + AppendMessageResult[] appendMessageResults = putMessages(totalCount, topic, queueId, false); + Thread.sleep(10); + ConsumeQueue consumeQueue = messageStore.getConsumeQueue(topic, queueId); + + for (int i = 0; i < totalCount; i++) { + SelectMappedBufferResult indexBuffer = consumeQueue.getIndexBuffer(i); + long storeTime = getStoreTime(indexBuffer); + assertThat(storeTime).isEqualTo(appendMessageResults[i].getStoreTimestamp()); + indexBuffer.release(); + } + } + + @Test + public void should_return_negative_one_when_invoke_getStoreTime_if_phyOffset_is_less_than_commitLog_s_minOffset() { + long phyOffset = -10; + int size = 138; + ByteBuffer byteBuffer = ByteBuffer.allocate(100); + byteBuffer.putLong(phyOffset); + byteBuffer.putInt(size); + byteBuffer.flip(); + MappedFile mappedFile = mock(MappedFile.class); + SelectMappedBufferResult result = new SelectMappedBufferResult(0, byteBuffer, size, mappedFile); + + long storeTime = getStoreTime(result); + result.release(); + + assertThat(storeTime).isEqualTo(-1); + } + + private DefaultMessageStore getDefaultMessageStore() { + return (DefaultMessageStore)this.messageStore; + } + + private AppendMessageResult[] putMessages(int totalCount, String topic, int queueId) { + return putMessages(totalCount, topic, queueId, false); + } + + private AppendMessageResult[] putMessages(int totalCount, String topic, int queueId, boolean interval) { + AppendMessageResult[] appendMessageResultArray = new AppendMessageResult[totalCount]; + for (int i = 0; i < totalCount; i++) { + String messageBody = buildMessageBodyByOffset(StoreMessage, i); + MessageExtBrokerInner msgInner = buildMessage(messageBody.getBytes(), topic); + msgInner.setQueueId(queueId); + PutMessageResult result = messageStore.putMessage(msgInner); + appendMessageResultArray[i] = result.getAppendMessageResult(); + assertThat(result.getPutMessageStatus()).isEqualTo(PutMessageStatus.PUT_OK); + if (interval) { + try { + Thread.sleep(10); + } catch (InterruptedException e) { + throw new RuntimeException("Thread sleep ERROR"); + } + } + } + return appendMessageResultArray; + } + + private long getMaxOffset(AppendMessageResult[] appendMessageResultArray) { + if (appendMessageResultArray == null) { + return 0; + } + AppendMessageResult last = appendMessageResultArray[appendMessageResultArray.length - 1]; + return last.getWroteOffset() + last.getWroteBytes(); + } + + private String buildMessageBodyByOffset(String message, long i) { + return String.format("%s offset %d", message, i); + } + + private long getStoreTime(SelectMappedBufferResult result) { + try { + Method getStoreTime = getDefaultMessageStore().getClass().getDeclaredMethod("getStoreTime", SelectMappedBufferResult.class); + getStoreTime.setAccessible(true); + return (long)getStoreTime.invoke(getDefaultMessageStore(), result); + } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); + } + } + + private MessageExtBrokerInner buildMessage(byte[] messageBody, String topic) { MessageExtBrokerInner msg = new MessageExtBrokerInner(); - msg.setTopic("FooBar"); + msg.setTopic(topic); msg.setTags("TAG1"); msg.setKeys("Hello"); - msg.setBody(MessageBody); + msg.setBody(messageBody); msg.setKeys(String.valueOf(System.currentTimeMillis())); msg.setQueueId(Math.abs(QueueId.getAndIncrement()) % QUEUE_TOTAL); msg.setSysFlag(0); @@ -138,6 +403,10 @@ public class DefaultMessageStoreTest { return msg; } + private MessageExtBrokerInner buildMessage() { + return buildMessage(MessageBody, "FooBar"); + } + private void verifyThatMasterIsFunctional(long totalMsgs, MessageStore master) { for (long i = 0; i < totalMsgs; i++) { master.putMessage(buildMessage()); -- GitLab