diff --git a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java index ad4ca9192e58f1aabec9d9a597e268feb57e450b..d0f729340bdafc2bd82d040e82ca51234b457b7d 100644 --- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java @@ -40,6 +40,7 @@ import org.apache.rocketmq.store.config.StorePathConfigHelper; import org.apache.rocketmq.store.stats.BrokerStatsManager; import org.junit.After; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.junit.MockitoJUnitRunner; @@ -123,6 +124,8 @@ public class DefaultMessageStoreTest { messageStore.putMessage(buildMessage()); } + StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore); + for (long i = 0; i < totalMsgs; i++) { GetMessageResult result = messageStore.getMessage("GROUP_A", "TOPIC_A", 0, i, 1024 * 1024, null); assertThat(result).isNotNull(); @@ -180,7 +183,8 @@ public class DefaultMessageStoreTest { int queueId = 0; String topic = "FooBar"; AppendMessageResult[] appendMessageResults = putMessages(totalCount, topic, queueId, true); - Thread.sleep(10); + //Thread.sleep(10); + StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore); ConsumeQueue consumeQueue = getDefaultMessageStore().findConsumeQueue(topic, queueId); for (AppendMessageResult appendMessageResult : appendMessageResults) { @@ -198,7 +202,8 @@ public class DefaultMessageStoreTest { int queueId = 0; String topic = "FooBar"; AppendMessageResult[] appendMessageResults = putMessages(totalCount, topic, queueId, true); - Thread.sleep(10); + //Thread.sleep(10); + StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore); int skewing = 2; ConsumeQueue consumeQueue = getDefaultMessageStore().findConsumeQueue(topic, queueId); @@ -222,7 +227,8 @@ public class DefaultMessageStoreTest { int queueId = 0; String topic = "FooBar"; AppendMessageResult[] appendMessageResults = putMessages(totalCount, topic, queueId, true); - Thread.sleep(10); + //Thread.sleep(10); + StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore); int skewing = 20000; ConsumeQueue consumeQueue = getDefaultMessageStore().findConsumeQueue(topic, queueId); @@ -235,6 +241,9 @@ public class DefaultMessageStoreTest { assertThat(indexBuffer.getByteBuffer().getInt()).isEqualTo(appendMessageResults[totalCount - 1].getWroteBytes()); assertThat(indexBuffer2.getByteBuffer().getLong()).isEqualTo(appendMessageResults[0].getWroteOffset()); assertThat(indexBuffer2.getByteBuffer().getInt()).isEqualTo(appendMessageResults[0].getWroteBytes()); + + indexBuffer.release(); + indexBuffer2.release(); } } @@ -245,7 +254,9 @@ public class DefaultMessageStoreTest { int wrongQueueId = 1; String topic = "FooBar"; AppendMessageResult[] appendMessageResults = putMessages(totalCount, topic, queueId, false); - Thread.sleep(10); + //Thread.sleep(10); + + StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore); long offset = messageStore.getOffsetInQueueByTime(topic, wrongQueueId, appendMessageResults[0].getStoreTimestamp()); @@ -259,7 +270,8 @@ public class DefaultMessageStoreTest { int wrongQueueId = 1; String topic = "FooBar"; putMessages(totalCount, topic, queueId, false); - Thread.sleep(10); + //Thread.sleep(10); + StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore); long messageStoreTimeStamp = messageStore.getMessageStoreTimeStamp(topic, wrongQueueId, 0); @@ -273,7 +285,9 @@ public class DefaultMessageStoreTest { int wrongQueueId = 1; String topic = "FooBar"; putMessages(totalCount, topic, queueId, true); - Thread.sleep(10); + //Thread.sleep(10); + + StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore); long messageStoreTimeStamp = messageStore.getMessageStoreTimeStamp(topic, wrongQueueId, -1); @@ -287,7 +301,8 @@ public class DefaultMessageStoreTest { int queueId = 0; String topic = "FooBar"; AppendMessageResult[] appendMessageResults = putMessages(totalCount, topic, queueId, false); - Thread.sleep(10); + //Thread.sleep(10); + StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore); ConsumeQueue consumeQueue = getDefaultMessageStore().findConsumeQueue(topic, queueId); int minOffsetInQueue = (int)consumeQueue.getMinOffsetInQueue(); @@ -310,7 +325,8 @@ public class DefaultMessageStoreTest { int queueId = 0; String topic = "FooBar"; AppendMessageResult[] appendMessageResults = putMessages(totalCount, topic, queueId, false); - Thread.sleep(10); + //Thread.sleep(10); + StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore); ConsumeQueue consumeQueue = messageStore.getConsumeQueue(topic, queueId); for (int i = 0; i < totalCount; i++) { @@ -412,6 +428,8 @@ public class DefaultMessageStoreTest { master.putMessage(buildMessage()); } + StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore); + for (long i = 0; i < totalMsgs; i++) { GetMessageResult result = master.getMessage("GROUP_A", "TOPIC_A", 0, i, 1024 * 1024, null); assertThat(result).isNotNull(); @@ -432,16 +450,21 @@ public class DefaultMessageStoreTest { } // wait for consume queue build // the sleep time should be great than consume queue flush interval - Thread.sleep(100); + //Thread.sleep(100); + StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore); String group = "simple"; GetMessageResult getMessageResult32 = messageStore.getMessage(group, topic, 0, 0, 32, null); assertThat(getMessageResult32.getMessageBufferList().size()).isEqualTo(32); + getMessageResult32.release(); GetMessageResult getMessageResult20 = messageStore.getMessage(group, topic, 0, 0, 20, null); assertThat(getMessageResult20.getMessageBufferList().size()).isEqualTo(20); + getMessageResult20.release(); GetMessageResult getMessageResult45 = messageStore.getMessage(group, topic, 0, 0, 10, null); assertThat(getMessageResult45.getMessageBufferList().size()).isEqualTo(10); + getMessageResult45.release(); + } @Test @@ -455,7 +478,9 @@ public class DefaultMessageStoreTest { messageStore.putMessage(messageExtBrokerInner); } - Thread.sleep(100);//wait for build consumer queue + // Thread.sleep(100);//wait for build consumer queue + StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore); + long maxPhyOffset = messageStore.getMaxPhyOffset(); long maxCqOffset = messageStore.getMaxOffsetInQueue(topic, 0); @@ -475,7 +500,8 @@ public class DefaultMessageStoreTest { messageExtBrokerInner.setQueueId(0); messageStore.putMessage(messageExtBrokerInner); } - Thread.sleep(100); + //Thread.sleep(100); + StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore); long secondLastPhyOffset = messageStore.getMaxPhyOffset(); long secondLastCqOffset = messageStore.getMaxOffsetInQueue(topic, 0); @@ -504,7 +530,8 @@ public class DefaultMessageStoreTest { messageExtBrokerInner.setQueueId(0); messageStore.putMessage(messageExtBrokerInner); } - Thread.sleep(100); + //Thread.sleep(100); + StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore); secondLastPhyOffset = messageStore.getMaxPhyOffset(); secondLastCqOffset = messageStore.getMaxOffsetInQueue(topic, 0);