提交 d0b0fa93 编写于 作者: C chenhoudao

modify test case,cover FlushRealTimeService

上级 45a78aee
......@@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.store.config.FlushDiskType;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.config.StorePathConfigHelper;
......@@ -55,7 +56,7 @@ public class DefaultMessageStoreTest {
StoreHost = new InetSocketAddress(InetAddress.getLocalHost(), 8123);
BornHost = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 0);
messageStore = buildMessageStore();
messageStore = buildMessageStore(FlushDiskType.SYNC_FLUSH);
boolean load = messageStore.load();
assertTrue(load);
messageStore.start();
......@@ -95,19 +96,19 @@ public class DefaultMessageStoreTest {
UtilAll.deleteFile(file);
}
private MessageStore buildMessageStore() throws Exception {
private MessageStore buildMessageStore(FlushDiskType flushType) throws Exception {
MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
messageStoreConfig.setMapedFileSizeCommitLog(1024 * 1024 * 10);
messageStoreConfig.setMapedFileSizeConsumeQueue(1024 * 1024 * 10);
messageStoreConfig.setMaxHashSlotNum(10000);
messageStoreConfig.setMaxIndexNum(100 * 100);
messageStoreConfig.setFlushDiskType(FlushDiskType.SYNC_FLUSH);
messageStoreConfig.setFlushDiskType(flushType);
messageStoreConfig.setFlushIntervalConsumeQueue(1);
return new DefaultMessageStore(messageStoreConfig, new BrokerStatsManager("simpleTest"), new MyMessageArrivingListener(), new BrokerConfig());
}
@Test
public void testWriteAndRead() {
public void testWriteAndRead() throws Exception {
long totalMsgs = 10;
QUEUE_TOTAL = 1;
MessageBody = StoreMessage.getBytes();
......@@ -115,12 +116,38 @@ public class DefaultMessageStoreTest {
messageStore.putMessage(buildMessage());
}
Thread.sleep(100);//wait for build consumer queue
//reboot,messageStore start,flushType is ASYNC_FLUSH
messageStore.shutdown();
messageStore = buildMessageStore(FlushDiskType.ASYNC_FLUSH);
boolean load = messageStore.load();
assertTrue(load);
messageStore.start();
MessageBody = StoreMessage.getBytes();
for (long i = 0; i < totalMsgs; i++) {
messageStore.putMessage(buildMessage());
}
Thread.sleep(200);//wait for build consumer queue
//reboot,messageStore start,flushType is SYNC_FLUSH
messageStore.shutdown();
messageStore = buildMessageStore(FlushDiskType.SYNC_FLUSH);
load = messageStore.load();
assertTrue(load);
messageStore.start();
totalMsgs = 2 * totalMsgs;
for (long i = 0; i < totalMsgs; i++) {
GetMessageResult result = messageStore.getMessage("GROUP_A", "TOPIC_A", 0, i, 1024 * 1024, null);
GetMessageResult result = messageStore.getMessage("GROUP_A", "FooBar", 0, i, 1024 * 1024, null);
assertThat(result).isNotNull();
result.release();
}
verifyThatMasterIsFunctional(totalMsgs, messageStore);
verifyThatMasterIsFunctional( totalMsgs, messageStore);
}
private MessageExtBrokerInner buildMessage() {
......@@ -135,6 +162,8 @@ public class DefaultMessageStoreTest {
msg.setBornTimestamp(System.currentTimeMillis());
msg.setStoreHost(StoreHost);
msg.setBornHost(BornHost);
//setKeys
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
return msg;
}
......@@ -144,7 +173,7 @@ public class DefaultMessageStoreTest {
}
for (long i = 0; i < totalMsgs; i++) {
GetMessageResult result = master.getMessage("GROUP_A", "TOPIC_A", 0, i, 1024 * 1024, null);
GetMessageResult result = master.getMessage("GROUP_A", "FooBar", 0, i, 1024 * 1024, null);
assertThat(result).isNotNull();
result.release();
......@@ -192,7 +221,7 @@ public class DefaultMessageStoreTest {
//1.just reboot
messageStore.shutdown();
messageStore = buildMessageStore();
messageStore = buildMessageStore(FlushDiskType.SYNC_FLUSH);
boolean load = messageStore.load();
assertTrue(load);
messageStore.start();
......@@ -221,7 +250,7 @@ public class DefaultMessageStoreTest {
damageCommitlog(secondLastPhyOffset);
//reboot
messageStore = buildMessageStore();
messageStore = buildMessageStore(FlushDiskType.SYNC_FLUSH);
load = messageStore.load();
assertTrue(load);
messageStore.start();
......@@ -253,7 +282,7 @@ public class DefaultMessageStoreTest {
MappedFile.ensureDirOK(file.getParent());
file.createNewFile();
messageStore = buildMessageStore();
messageStore = buildMessageStore(FlushDiskType.SYNC_FLUSH);
load = messageStore.load();
assertTrue(load);
messageStore.start();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册