提交 ede33c1d 编写于 作者: H hdchen 提交者: dinglei

[RIP-10] optimization test case of DefaultMessageStore (#995)

上级 690a406e
...@@ -40,6 +40,7 @@ import org.apache.rocketmq.store.config.StorePathConfigHelper; ...@@ -40,6 +40,7 @@ import org.apache.rocketmq.store.config.StorePathConfigHelper;
import org.apache.rocketmq.store.stats.BrokerStatsManager; import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner; import org.mockito.junit.MockitoJUnitRunner;
...@@ -123,6 +124,8 @@ public class DefaultMessageStoreTest { ...@@ -123,6 +124,8 @@ public class DefaultMessageStoreTest {
messageStore.putMessage(buildMessage()); messageStore.putMessage(buildMessage());
} }
StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore);
for (long i = 0; i < totalMsgs; i++) { for (long i = 0; i < totalMsgs; i++) {
GetMessageResult result = messageStore.getMessage("GROUP_A", "TOPIC_A", 0, i, 1024 * 1024, null); GetMessageResult result = messageStore.getMessage("GROUP_A", "TOPIC_A", 0, i, 1024 * 1024, null);
assertThat(result).isNotNull(); assertThat(result).isNotNull();
...@@ -180,7 +183,8 @@ public class DefaultMessageStoreTest { ...@@ -180,7 +183,8 @@ public class DefaultMessageStoreTest {
int queueId = 0; int queueId = 0;
String topic = "FooBar"; String topic = "FooBar";
AppendMessageResult[] appendMessageResults = putMessages(totalCount, topic, queueId, true); AppendMessageResult[] appendMessageResults = putMessages(totalCount, topic, queueId, true);
Thread.sleep(10); //Thread.sleep(10);
StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore);
ConsumeQueue consumeQueue = getDefaultMessageStore().findConsumeQueue(topic, queueId); ConsumeQueue consumeQueue = getDefaultMessageStore().findConsumeQueue(topic, queueId);
for (AppendMessageResult appendMessageResult : appendMessageResults) { for (AppendMessageResult appendMessageResult : appendMessageResults) {
...@@ -198,7 +202,8 @@ public class DefaultMessageStoreTest { ...@@ -198,7 +202,8 @@ public class DefaultMessageStoreTest {
int queueId = 0; int queueId = 0;
String topic = "FooBar"; String topic = "FooBar";
AppendMessageResult[] appendMessageResults = putMessages(totalCount, topic, queueId, true); AppendMessageResult[] appendMessageResults = putMessages(totalCount, topic, queueId, true);
Thread.sleep(10); //Thread.sleep(10);
StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore);
int skewing = 2; int skewing = 2;
ConsumeQueue consumeQueue = getDefaultMessageStore().findConsumeQueue(topic, queueId); ConsumeQueue consumeQueue = getDefaultMessageStore().findConsumeQueue(topic, queueId);
...@@ -222,7 +227,8 @@ public class DefaultMessageStoreTest { ...@@ -222,7 +227,8 @@ public class DefaultMessageStoreTest {
int queueId = 0; int queueId = 0;
String topic = "FooBar"; String topic = "FooBar";
AppendMessageResult[] appendMessageResults = putMessages(totalCount, topic, queueId, true); AppendMessageResult[] appendMessageResults = putMessages(totalCount, topic, queueId, true);
Thread.sleep(10); //Thread.sleep(10);
StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore);
int skewing = 20000; int skewing = 20000;
ConsumeQueue consumeQueue = getDefaultMessageStore().findConsumeQueue(topic, queueId); ConsumeQueue consumeQueue = getDefaultMessageStore().findConsumeQueue(topic, queueId);
...@@ -235,6 +241,9 @@ public class DefaultMessageStoreTest { ...@@ -235,6 +241,9 @@ public class DefaultMessageStoreTest {
assertThat(indexBuffer.getByteBuffer().getInt()).isEqualTo(appendMessageResults[totalCount - 1].getWroteBytes()); assertThat(indexBuffer.getByteBuffer().getInt()).isEqualTo(appendMessageResults[totalCount - 1].getWroteBytes());
assertThat(indexBuffer2.getByteBuffer().getLong()).isEqualTo(appendMessageResults[0].getWroteOffset()); assertThat(indexBuffer2.getByteBuffer().getLong()).isEqualTo(appendMessageResults[0].getWroteOffset());
assertThat(indexBuffer2.getByteBuffer().getInt()).isEqualTo(appendMessageResults[0].getWroteBytes()); assertThat(indexBuffer2.getByteBuffer().getInt()).isEqualTo(appendMessageResults[0].getWroteBytes());
indexBuffer.release();
indexBuffer2.release();
} }
} }
...@@ -245,7 +254,9 @@ public class DefaultMessageStoreTest { ...@@ -245,7 +254,9 @@ public class DefaultMessageStoreTest {
int wrongQueueId = 1; int wrongQueueId = 1;
String topic = "FooBar"; String topic = "FooBar";
AppendMessageResult[] appendMessageResults = putMessages(totalCount, topic, queueId, false); AppendMessageResult[] appendMessageResults = putMessages(totalCount, topic, queueId, false);
Thread.sleep(10); //Thread.sleep(10);
StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore);
long offset = messageStore.getOffsetInQueueByTime(topic, wrongQueueId, appendMessageResults[0].getStoreTimestamp()); long offset = messageStore.getOffsetInQueueByTime(topic, wrongQueueId, appendMessageResults[0].getStoreTimestamp());
...@@ -259,7 +270,8 @@ public class DefaultMessageStoreTest { ...@@ -259,7 +270,8 @@ public class DefaultMessageStoreTest {
int wrongQueueId = 1; int wrongQueueId = 1;
String topic = "FooBar"; String topic = "FooBar";
putMessages(totalCount, topic, queueId, false); putMessages(totalCount, topic, queueId, false);
Thread.sleep(10); //Thread.sleep(10);
StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore);
long messageStoreTimeStamp = messageStore.getMessageStoreTimeStamp(topic, wrongQueueId, 0); long messageStoreTimeStamp = messageStore.getMessageStoreTimeStamp(topic, wrongQueueId, 0);
...@@ -273,7 +285,9 @@ public class DefaultMessageStoreTest { ...@@ -273,7 +285,9 @@ public class DefaultMessageStoreTest {
int wrongQueueId = 1; int wrongQueueId = 1;
String topic = "FooBar"; String topic = "FooBar";
putMessages(totalCount, topic, queueId, true); putMessages(totalCount, topic, queueId, true);
Thread.sleep(10); //Thread.sleep(10);
StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore);
long messageStoreTimeStamp = messageStore.getMessageStoreTimeStamp(topic, wrongQueueId, -1); long messageStoreTimeStamp = messageStore.getMessageStoreTimeStamp(topic, wrongQueueId, -1);
...@@ -287,7 +301,8 @@ public class DefaultMessageStoreTest { ...@@ -287,7 +301,8 @@ public class DefaultMessageStoreTest {
int queueId = 0; int queueId = 0;
String topic = "FooBar"; String topic = "FooBar";
AppendMessageResult[] appendMessageResults = putMessages(totalCount, topic, queueId, false); AppendMessageResult[] appendMessageResults = putMessages(totalCount, topic, queueId, false);
Thread.sleep(10); //Thread.sleep(10);
StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore);
ConsumeQueue consumeQueue = getDefaultMessageStore().findConsumeQueue(topic, queueId); ConsumeQueue consumeQueue = getDefaultMessageStore().findConsumeQueue(topic, queueId);
int minOffsetInQueue = (int)consumeQueue.getMinOffsetInQueue(); int minOffsetInQueue = (int)consumeQueue.getMinOffsetInQueue();
...@@ -310,7 +325,8 @@ public class DefaultMessageStoreTest { ...@@ -310,7 +325,8 @@ public class DefaultMessageStoreTest {
int queueId = 0; int queueId = 0;
String topic = "FooBar"; String topic = "FooBar";
AppendMessageResult[] appendMessageResults = putMessages(totalCount, topic, queueId, false); AppendMessageResult[] appendMessageResults = putMessages(totalCount, topic, queueId, false);
Thread.sleep(10); //Thread.sleep(10);
StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore);
ConsumeQueue consumeQueue = messageStore.getConsumeQueue(topic, queueId); ConsumeQueue consumeQueue = messageStore.getConsumeQueue(topic, queueId);
for (int i = 0; i < totalCount; i++) { for (int i = 0; i < totalCount; i++) {
...@@ -412,6 +428,8 @@ public class DefaultMessageStoreTest { ...@@ -412,6 +428,8 @@ public class DefaultMessageStoreTest {
master.putMessage(buildMessage()); master.putMessage(buildMessage());
} }
StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore);
for (long i = 0; i < totalMsgs; i++) { for (long i = 0; i < totalMsgs; i++) {
GetMessageResult result = master.getMessage("GROUP_A", "TOPIC_A", 0, i, 1024 * 1024, null); GetMessageResult result = master.getMessage("GROUP_A", "TOPIC_A", 0, i, 1024 * 1024, null);
assertThat(result).isNotNull(); assertThat(result).isNotNull();
...@@ -432,16 +450,21 @@ public class DefaultMessageStoreTest { ...@@ -432,16 +450,21 @@ public class DefaultMessageStoreTest {
} }
// wait for consume queue build // wait for consume queue build
// the sleep time should be great than consume queue flush interval // the sleep time should be great than consume queue flush interval
Thread.sleep(100); //Thread.sleep(100);
StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore);
String group = "simple"; String group = "simple";
GetMessageResult getMessageResult32 = messageStore.getMessage(group, topic, 0, 0, 32, null); GetMessageResult getMessageResult32 = messageStore.getMessage(group, topic, 0, 0, 32, null);
assertThat(getMessageResult32.getMessageBufferList().size()).isEqualTo(32); assertThat(getMessageResult32.getMessageBufferList().size()).isEqualTo(32);
getMessageResult32.release();
GetMessageResult getMessageResult20 = messageStore.getMessage(group, topic, 0, 0, 20, null); GetMessageResult getMessageResult20 = messageStore.getMessage(group, topic, 0, 0, 20, null);
assertThat(getMessageResult20.getMessageBufferList().size()).isEqualTo(20); assertThat(getMessageResult20.getMessageBufferList().size()).isEqualTo(20);
getMessageResult20.release();
GetMessageResult getMessageResult45 = messageStore.getMessage(group, topic, 0, 0, 10, null); GetMessageResult getMessageResult45 = messageStore.getMessage(group, topic, 0, 0, 10, null);
assertThat(getMessageResult45.getMessageBufferList().size()).isEqualTo(10); assertThat(getMessageResult45.getMessageBufferList().size()).isEqualTo(10);
getMessageResult45.release();
} }
@Test @Test
...@@ -455,7 +478,9 @@ public class DefaultMessageStoreTest { ...@@ -455,7 +478,9 @@ public class DefaultMessageStoreTest {
messageStore.putMessage(messageExtBrokerInner); messageStore.putMessage(messageExtBrokerInner);
} }
Thread.sleep(100);//wait for build consumer queue // Thread.sleep(100);//wait for build consumer queue
StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore);
long maxPhyOffset = messageStore.getMaxPhyOffset(); long maxPhyOffset = messageStore.getMaxPhyOffset();
long maxCqOffset = messageStore.getMaxOffsetInQueue(topic, 0); long maxCqOffset = messageStore.getMaxOffsetInQueue(topic, 0);
...@@ -475,7 +500,8 @@ public class DefaultMessageStoreTest { ...@@ -475,7 +500,8 @@ public class DefaultMessageStoreTest {
messageExtBrokerInner.setQueueId(0); messageExtBrokerInner.setQueueId(0);
messageStore.putMessage(messageExtBrokerInner); messageStore.putMessage(messageExtBrokerInner);
} }
Thread.sleep(100); //Thread.sleep(100);
StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore);
long secondLastPhyOffset = messageStore.getMaxPhyOffset(); long secondLastPhyOffset = messageStore.getMaxPhyOffset();
long secondLastCqOffset = messageStore.getMaxOffsetInQueue(topic, 0); long secondLastCqOffset = messageStore.getMaxOffsetInQueue(topic, 0);
...@@ -504,7 +530,8 @@ public class DefaultMessageStoreTest { ...@@ -504,7 +530,8 @@ public class DefaultMessageStoreTest {
messageExtBrokerInner.setQueueId(0); messageExtBrokerInner.setQueueId(0);
messageStore.putMessage(messageExtBrokerInner); messageStore.putMessage(messageExtBrokerInner);
} }
Thread.sleep(100); //Thread.sleep(100);
StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore);
secondLastPhyOffset = messageStore.getMaxPhyOffset(); secondLastPhyOffset = messageStore.getMaxPhyOffset();
secondLastCqOffset = messageStore.getMaxOffsetInQueue(topic, 0); secondLastCqOffset = messageStore.getMaxOffsetInQueue(topic, 0);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册