提交 5595fe39 编写于 作者: C chenhoudao

delete unit test releated file

上级 d0b0fa93
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.store;
import java.io.File;
import java.io.RandomAccessFile;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.OverlappingFileLockException;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.store.config.FlushDiskType;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.config.StorePathConfigHelper;
import org.junit.After;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.junit.Before;
import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertTrue;
public class DefaultMessageStoreTest {
private final String StoreMessage = "Once, there was a chance for me!";
private int QUEUE_TOTAL = 100;
private AtomicInteger QueueId = new AtomicInteger(0);
private SocketAddress BornHost;
private SocketAddress StoreHost;
private byte[] MessageBody;
private MessageStore messageStore;
@Before
public void init() throws Exception {
StoreHost = new InetSocketAddress(InetAddress.getLocalHost(), 8123);
BornHost = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 0);
messageStore = buildMessageStore(FlushDiskType.SYNC_FLUSH);
boolean load = messageStore.load();
assertTrue(load);
messageStore.start();
}
@Test(expected = OverlappingFileLockException.class)
public void test_repate_restart() throws Exception {
QUEUE_TOTAL = 1;
MessageBody = StoreMessage.getBytes();
MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
messageStoreConfig.setMapedFileSizeCommitLog(1024 * 8);
messageStoreConfig.setMapedFileSizeConsumeQueue(1024 * 4);
messageStoreConfig.setMaxHashSlotNum(100);
messageStoreConfig.setMaxIndexNum(100 * 10);
MessageStore master = new DefaultMessageStore(messageStoreConfig, null, new MyMessageArrivingListener(), new BrokerConfig());
boolean load = master.load();
assertTrue(load);
try {
master.start();
master.start();
} finally {
master.shutdown();
master.destroy();
}
}
@After
public void destory() {
messageStore.shutdown();
messageStore.destroy();
MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
File file = new File(messageStoreConfig.getStorePathRootDir());
UtilAll.deleteFile(file);
}
private MessageStore buildMessageStore(FlushDiskType flushType) throws Exception {
MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
messageStoreConfig.setMapedFileSizeCommitLog(1024 * 1024 * 10);
messageStoreConfig.setMapedFileSizeConsumeQueue(1024 * 1024 * 10);
messageStoreConfig.setMaxHashSlotNum(10000);
messageStoreConfig.setMaxIndexNum(100 * 100);
messageStoreConfig.setFlushDiskType(flushType);
messageStoreConfig.setFlushIntervalConsumeQueue(1);
return new DefaultMessageStore(messageStoreConfig, new BrokerStatsManager("simpleTest"), new MyMessageArrivingListener(), new BrokerConfig());
}
@Test
public void testWriteAndRead() throws Exception {
long totalMsgs = 10;
QUEUE_TOTAL = 1;
MessageBody = StoreMessage.getBytes();
for (long i = 0; i < totalMsgs; i++) {
messageStore.putMessage(buildMessage());
}
Thread.sleep(100);//wait for build consumer queue
//reboot,messageStore start,flushType is ASYNC_FLUSH
messageStore.shutdown();
messageStore = buildMessageStore(FlushDiskType.ASYNC_FLUSH);
boolean load = messageStore.load();
assertTrue(load);
messageStore.start();
MessageBody = StoreMessage.getBytes();
for (long i = 0; i < totalMsgs; i++) {
messageStore.putMessage(buildMessage());
}
Thread.sleep(200);//wait for build consumer queue
//reboot,messageStore start,flushType is SYNC_FLUSH
messageStore.shutdown();
messageStore = buildMessageStore(FlushDiskType.SYNC_FLUSH);
load = messageStore.load();
assertTrue(load);
messageStore.start();
totalMsgs = 2 * totalMsgs;
for (long i = 0; i < totalMsgs; i++) {
GetMessageResult result = messageStore.getMessage("GROUP_A", "FooBar", 0, i, 1024 * 1024, null);
assertThat(result).isNotNull();
result.release();
}
verifyThatMasterIsFunctional( totalMsgs, messageStore);
}
private MessageExtBrokerInner buildMessage() {
MessageExtBrokerInner msg = new MessageExtBrokerInner();
msg.setTopic("FooBar");
msg.setTags("TAG1");
msg.setKeys("Hello");
msg.setBody(MessageBody);
msg.setKeys(String.valueOf(System.currentTimeMillis()));
msg.setQueueId(Math.abs(QueueId.getAndIncrement()) % QUEUE_TOTAL);
msg.setSysFlag(0);
msg.setBornTimestamp(System.currentTimeMillis());
msg.setStoreHost(StoreHost);
msg.setBornHost(BornHost);
//setKeys
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
return msg;
}
private void verifyThatMasterIsFunctional(long totalMsgs, MessageStore master) {
for (long i = 0; i < totalMsgs; i++) {
master.putMessage(buildMessage());
}
for (long i = 0; i < totalMsgs; i++) {
GetMessageResult result = master.getMessage("GROUP_A", "FooBar", 0, i, 1024 * 1024, null);
assertThat(result).isNotNull();
result.release();
}
}
@Test
public void testPullSize() throws Exception {
String topic = "pullSizeTopic";
for (int i = 0; i < 32; i++) {
MessageExtBrokerInner messageExtBrokerInner = buildMessage();
messageExtBrokerInner.setTopic(topic);
messageExtBrokerInner.setQueueId(0);
messageStore.putMessage(messageExtBrokerInner);
}
// wait for consume queue build
// the sleep time should be great than consume queue flush interval
Thread.sleep(100);
String group = "simple";
GetMessageResult getMessageResult32 = messageStore.getMessage(group, topic, 0, 0, 32, null);
assertThat(getMessageResult32.getMessageBufferList().size()).isEqualTo(32);
GetMessageResult getMessageResult20 = messageStore.getMessage(group, topic, 0, 0, 20, null);
assertThat(getMessageResult20.getMessageBufferList().size()).isEqualTo(20);
GetMessageResult getMessageResult45 = messageStore.getMessage(group, topic, 0, 0, 10, null);
assertThat(getMessageResult45.getMessageBufferList().size()).isEqualTo(10);
}
@Test
public void testRecover() throws Exception {
String topic = "recoverTopic";
MessageBody = StoreMessage.getBytes();
for (int i = 0; i < 100; i++) {
MessageExtBrokerInner messageExtBrokerInner = buildMessage();
messageExtBrokerInner.setTopic(topic);
messageExtBrokerInner.setQueueId(0);
messageStore.putMessage(messageExtBrokerInner);
}
Thread.sleep(100);//wait for build consumer queue
long maxPhyOffset = messageStore.getMaxPhyOffset();
long maxCqOffset = messageStore.getMaxOffsetInQueue(topic, 0);
//1.just reboot
messageStore.shutdown();
messageStore = buildMessageStore(FlushDiskType.SYNC_FLUSH);
boolean load = messageStore.load();
assertTrue(load);
messageStore.start();
assertTrue(maxPhyOffset == messageStore.getMaxPhyOffset());
assertTrue(maxCqOffset == messageStore.getMaxOffsetInQueue(topic, 0));
//2.damage commitlog and reboot normal
for (int i = 0; i < 100; i++) {
MessageExtBrokerInner messageExtBrokerInner = buildMessage();
messageExtBrokerInner.setTopic(topic);
messageExtBrokerInner.setQueueId(0);
messageStore.putMessage(messageExtBrokerInner);
}
Thread.sleep(100);
long secondLastPhyOffset = messageStore.getMaxPhyOffset();
long secondLastCqOffset = messageStore.getMaxOffsetInQueue(topic, 0);
MessageExtBrokerInner messageExtBrokerInner = buildMessage();
messageExtBrokerInner.setTopic(topic);
messageExtBrokerInner.setQueueId(0);
messageStore.putMessage(messageExtBrokerInner);
messageStore.shutdown();
//damage last message
damageCommitlog(secondLastPhyOffset);
//reboot
messageStore = buildMessageStore(FlushDiskType.SYNC_FLUSH);
load = messageStore.load();
assertTrue(load);
messageStore.start();
assertTrue(secondLastPhyOffset == messageStore.getMaxPhyOffset());
assertTrue(secondLastCqOffset == messageStore.getMaxOffsetInQueue(topic, 0));
//3.damage commitlog and reboot abnormal
for (int i = 0; i < 100; i++) {
messageExtBrokerInner = buildMessage();
messageExtBrokerInner.setTopic(topic);
messageExtBrokerInner.setQueueId(0);
messageStore.putMessage(messageExtBrokerInner);
}
Thread.sleep(100);
secondLastPhyOffset = messageStore.getMaxPhyOffset();
secondLastCqOffset = messageStore.getMaxOffsetInQueue(topic, 0);
messageExtBrokerInner = buildMessage();
messageExtBrokerInner.setTopic(topic);
messageExtBrokerInner.setQueueId(0);
messageStore.putMessage(messageExtBrokerInner);
messageStore.shutdown();
//damage last message
damageCommitlog(secondLastPhyOffset);
//add abort file
String fileName = StorePathConfigHelper.getAbortFile(((DefaultMessageStore) messageStore).getMessageStoreConfig().getStorePathRootDir());
File file = new File(fileName);
MappedFile.ensureDirOK(file.getParent());
file.createNewFile();
messageStore = buildMessageStore(FlushDiskType.SYNC_FLUSH);
load = messageStore.load();
assertTrue(load);
messageStore.start();
assertTrue(secondLastPhyOffset == messageStore.getMaxPhyOffset());
assertTrue(secondLastCqOffset == messageStore.getMaxOffsetInQueue(topic, 0));
//message write again
for (int i = 0; i < 100; i++) {
messageExtBrokerInner = buildMessage();
messageExtBrokerInner.setTopic(topic);
messageExtBrokerInner.setQueueId(0);
messageStore.putMessage(messageExtBrokerInner);
}
}
private void damageCommitlog(long offset) throws Exception {
MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
File file = new File(messageStoreConfig.getStorePathCommitLog() + File.separator + "00000000000000000000");
FileChannel fileChannel = new RandomAccessFile(file, "rw").getChannel();
MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 1024 * 1024 * 10);
int bodyLen = mappedByteBuffer.getInt((int) offset + 84);
int topicLenIndex = (int) offset + 84 + bodyLen + 4;
mappedByteBuffer.position(topicLenIndex);
mappedByteBuffer.putInt(0);
mappedByteBuffer.putInt(0);
mappedByteBuffer.putInt(0);
mappedByteBuffer.putInt(0);
mappedByteBuffer.force();
fileChannel.force(true);
fileChannel.close();
}
private class MyMessageArrivingListener implements MessageArrivingListener {
@Override
public void arriving(String topic, int queueId, long logicOffset, long tagsCode, long msgStoreTime,
byte[] filterBitMap, Map<String, String> properties) {
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册