From ede33c1d17f7c71fa216e7dba49bbc8b301f2429 Mon Sep 17 00:00:00 2001 From: hdchen <14106312@qq.com> Date: Wed, 6 Mar 2019 21:44:31 +0800 Subject: [PATCH] [RIP-10] optimization test case of DefaultMessageStore (#995) --- .../store/DefaultMessageStoreTest.java | 51 ++++++++++++++----- 1 file changed, 39 insertions(+), 12 deletions(-) diff --git a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java index ad4ca919..d0f72934 100644 --- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java @@ -40,6 +40,7 @@ import org.apache.rocketmq.store.config.StorePathConfigHelper; import org.apache.rocketmq.store.stats.BrokerStatsManager; import org.junit.After; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.junit.MockitoJUnitRunner; @@ -123,6 +124,8 @@ public class DefaultMessageStoreTest { messageStore.putMessage(buildMessage()); } + StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore); + for (long i = 0; i < totalMsgs; i++) { GetMessageResult result = messageStore.getMessage("GROUP_A", "TOPIC_A", 0, i, 1024 * 1024, null); assertThat(result).isNotNull(); @@ -180,7 +183,8 @@ public class DefaultMessageStoreTest { int queueId = 0; String topic = "FooBar"; AppendMessageResult[] appendMessageResults = putMessages(totalCount, topic, queueId, true); - Thread.sleep(10); + //Thread.sleep(10); + StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore); ConsumeQueue consumeQueue = getDefaultMessageStore().findConsumeQueue(topic, queueId); for (AppendMessageResult appendMessageResult : appendMessageResults) { @@ -198,7 +202,8 @@ public class DefaultMessageStoreTest { int queueId = 0; String topic = "FooBar"; AppendMessageResult[] appendMessageResults = putMessages(totalCount, topic, queueId, true); - Thread.sleep(10); + //Thread.sleep(10); + StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore); int skewing = 2; ConsumeQueue consumeQueue = getDefaultMessageStore().findConsumeQueue(topic, queueId); @@ -222,7 +227,8 @@ public class DefaultMessageStoreTest { int queueId = 0; String topic = "FooBar"; AppendMessageResult[] appendMessageResults = putMessages(totalCount, topic, queueId, true); - Thread.sleep(10); + //Thread.sleep(10); + StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore); int skewing = 20000; ConsumeQueue consumeQueue = getDefaultMessageStore().findConsumeQueue(topic, queueId); @@ -235,6 +241,9 @@ public class DefaultMessageStoreTest { assertThat(indexBuffer.getByteBuffer().getInt()).isEqualTo(appendMessageResults[totalCount - 1].getWroteBytes()); assertThat(indexBuffer2.getByteBuffer().getLong()).isEqualTo(appendMessageResults[0].getWroteOffset()); assertThat(indexBuffer2.getByteBuffer().getInt()).isEqualTo(appendMessageResults[0].getWroteBytes()); + + indexBuffer.release(); + indexBuffer2.release(); } } @@ -245,7 +254,9 @@ public class DefaultMessageStoreTest { int wrongQueueId = 1; String topic = "FooBar"; AppendMessageResult[] appendMessageResults = putMessages(totalCount, topic, queueId, false); - Thread.sleep(10); + //Thread.sleep(10); + + StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore); long offset = messageStore.getOffsetInQueueByTime(topic, wrongQueueId, appendMessageResults[0].getStoreTimestamp()); @@ -259,7 +270,8 @@ public class DefaultMessageStoreTest { int wrongQueueId = 1; String topic = "FooBar"; putMessages(totalCount, topic, queueId, false); - Thread.sleep(10); + //Thread.sleep(10); + StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore); long messageStoreTimeStamp = messageStore.getMessageStoreTimeStamp(topic, wrongQueueId, 0); @@ -273,7 +285,9 @@ public class DefaultMessageStoreTest { int wrongQueueId = 1; String topic = "FooBar"; putMessages(totalCount, topic, queueId, true); - Thread.sleep(10); + //Thread.sleep(10); + + StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore); long messageStoreTimeStamp = messageStore.getMessageStoreTimeStamp(topic, wrongQueueId, -1); @@ -287,7 +301,8 @@ public class DefaultMessageStoreTest { int queueId = 0; String topic = "FooBar"; AppendMessageResult[] appendMessageResults = putMessages(totalCount, topic, queueId, false); - Thread.sleep(10); + //Thread.sleep(10); + StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore); ConsumeQueue consumeQueue = getDefaultMessageStore().findConsumeQueue(topic, queueId); int minOffsetInQueue = (int)consumeQueue.getMinOffsetInQueue(); @@ -310,7 +325,8 @@ public class DefaultMessageStoreTest { int queueId = 0; String topic = "FooBar"; AppendMessageResult[] appendMessageResults = putMessages(totalCount, topic, queueId, false); - Thread.sleep(10); + //Thread.sleep(10); + StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore); ConsumeQueue consumeQueue = messageStore.getConsumeQueue(topic, queueId); for (int i = 0; i < totalCount; i++) { @@ -412,6 +428,8 @@ public class DefaultMessageStoreTest { master.putMessage(buildMessage()); } + StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore); + for (long i = 0; i < totalMsgs; i++) { GetMessageResult result = master.getMessage("GROUP_A", "TOPIC_A", 0, i, 1024 * 1024, null); assertThat(result).isNotNull(); @@ -432,16 +450,21 @@ public class DefaultMessageStoreTest { } // wait for consume queue build // the sleep time should be great than consume queue flush interval - Thread.sleep(100); + //Thread.sleep(100); + StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore); String group = "simple"; GetMessageResult getMessageResult32 = messageStore.getMessage(group, topic, 0, 0, 32, null); assertThat(getMessageResult32.getMessageBufferList().size()).isEqualTo(32); + getMessageResult32.release(); GetMessageResult getMessageResult20 = messageStore.getMessage(group, topic, 0, 0, 20, null); assertThat(getMessageResult20.getMessageBufferList().size()).isEqualTo(20); + getMessageResult20.release(); GetMessageResult getMessageResult45 = messageStore.getMessage(group, topic, 0, 0, 10, null); assertThat(getMessageResult45.getMessageBufferList().size()).isEqualTo(10); + getMessageResult45.release(); + } @Test @@ -455,7 +478,9 @@ public class DefaultMessageStoreTest { messageStore.putMessage(messageExtBrokerInner); } - Thread.sleep(100);//wait for build consumer queue + // Thread.sleep(100);//wait for build consumer queue + StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore); + long maxPhyOffset = messageStore.getMaxPhyOffset(); long maxCqOffset = messageStore.getMaxOffsetInQueue(topic, 0); @@ -475,7 +500,8 @@ public class DefaultMessageStoreTest { messageExtBrokerInner.setQueueId(0); messageStore.putMessage(messageExtBrokerInner); } - Thread.sleep(100); + //Thread.sleep(100); + StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore); long secondLastPhyOffset = messageStore.getMaxPhyOffset(); long secondLastCqOffset = messageStore.getMaxOffsetInQueue(topic, 0); @@ -504,7 +530,8 @@ public class DefaultMessageStoreTest { messageExtBrokerInner.setQueueId(0); messageStore.putMessage(messageExtBrokerInner); } - Thread.sleep(100); + //Thread.sleep(100); + StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore); secondLastPhyOffset = messageStore.getMaxPhyOffset(); secondLastCqOffset = messageStore.getMaxOffsetInQueue(topic, 0); -- GitLab