提交 fe724b30 编写于 作者: rushsky518's avatar rushsky518

add delete IndexFile

上级 f8f6fbe4
......@@ -19,8 +19,12 @@ package org.apache.rocketmq.store;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.store.config.FlushDiskType;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.index.IndexFile;
import org.apache.rocketmq.store.index.IndexService;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.junit.After;
import org.junit.Before;
......@@ -31,7 +35,9 @@ import java.lang.reflect.Field;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
......@@ -52,6 +58,7 @@ public class DefaultMessageStoreCleanFilesTest {
private SocketAddress storeHost;
private String topic = "test";
private String keys = "hello";
private int queueId = 0;
private int fileCountCommitLog = 55;
// exactly one message per CommitLog file.
......@@ -87,6 +94,9 @@ public class DefaultMessageStoreCleanFilesTest {
MappedFileQueue consumeQueue = getMappedFileQueueConsumeQueue();
assertEquals(fileCountConsumeQueue, consumeQueue.getMappedFiles().size());
int fileCountIndexFile = getFileCountIndexFile();
assertEquals(fileCountIndexFile, getIndexFileList().size());
int expireFileCount = 15;
expireFiles(commitLogQueue, expireFileCount);
......@@ -101,6 +111,10 @@ public class DefaultMessageStoreCleanFilesTest {
int msgCountPerFile = getMsgCountPerConsumeQueueMappedFile();
int expectDeleteCountConsumeQueue = (int) Math.floor((double) expectDeletedCount / msgCountPerFile);
assertEquals(fileCountConsumeQueue - expectDeleteCountConsumeQueue, consumeQueue.getMappedFiles().size());
int msgCountPerIndexFile = getMsgCountPerIndexFile();
int expectDeleteCountIndexFile = (int) Math.floor((double) expectDeletedCount / msgCountPerIndexFile);
assertEquals(fileCountIndexFile - expectDeleteCountIndexFile, getIndexFileList().size());
}
}
......@@ -126,6 +140,9 @@ public class DefaultMessageStoreCleanFilesTest {
MappedFileQueue consumeQueue = getMappedFileQueueConsumeQueue();
assertEquals(fileCountConsumeQueue, consumeQueue.getMappedFiles().size());
int fileCountIndexFile = getFileCountIndexFile();
assertEquals(fileCountIndexFile, getIndexFileList().size());
int expireFileCount = 15;
expireFiles(commitLogQueue, expireFileCount);
......@@ -140,6 +157,10 @@ public class DefaultMessageStoreCleanFilesTest {
int msgCountPerFile = getMsgCountPerConsumeQueueMappedFile();
int expectDeleteCountConsumeQueue = (int) Math.floor((double) expectDeletedCount / msgCountPerFile);
assertEquals(fileCountConsumeQueue - expectDeleteCountConsumeQueue, consumeQueue.getMappedFiles().size());
int msgCountPerIndexFile = getMsgCountPerIndexFile();
int expectDeleteCountIndexFile = (int) Math.floor((double) expectDeletedCount / msgCountPerIndexFile);
assertEquals(fileCountIndexFile - expectDeleteCountIndexFile, getIndexFileList().size());
}
}
......@@ -165,6 +186,9 @@ public class DefaultMessageStoreCleanFilesTest {
MappedFileQueue consumeQueue = getMappedFileQueueConsumeQueue();
assertEquals(fileCountConsumeQueue, consumeQueue.getMappedFiles().size());
int fileCountIndexFile = getFileCountIndexFile();
assertEquals(fileCountIndexFile, getIndexFileList().size());
// In this case, there is no need to expire the files.
// int expireFileCount = 15;
// expireFiles(commitLogQueue, expireFileCount);
......@@ -181,6 +205,10 @@ public class DefaultMessageStoreCleanFilesTest {
int msgCountPerFile = getMsgCountPerConsumeQueueMappedFile();
int expectDeleteCountConsumeQueue = (int) Math.floor((double) (a * 10) / msgCountPerFile);
assertEquals(fileCountConsumeQueue - expectDeleteCountConsumeQueue, consumeQueue.getMappedFiles().size());
int msgCountPerIndexFile = getMsgCountPerIndexFile();
int expectDeleteCountIndexFile = (int) Math.floor((double) (a * 10) / msgCountPerIndexFile);
assertEquals(fileCountIndexFile - expectDeleteCountIndexFile, getIndexFileList().size());
}
}
......@@ -208,6 +236,9 @@ public class DefaultMessageStoreCleanFilesTest {
MappedFileQueue consumeQueue = getMappedFileQueueConsumeQueue();
assertEquals(fileCountConsumeQueue, consumeQueue.getMappedFiles().size());
int fileCountIndexFile = getFileCountIndexFile();
assertEquals(fileCountIndexFile, getIndexFileList().size());
int expireFileCount = 15;
expireFiles(commitLogQueue, expireFileCount);
......@@ -222,6 +253,10 @@ public class DefaultMessageStoreCleanFilesTest {
int msgCountPerFile = getMsgCountPerConsumeQueueMappedFile();
int expectDeleteCountConsumeQueue = (int) Math.floor((double) expectDeletedCount / msgCountPerFile);
assertEquals(fileCountConsumeQueue - expectDeleteCountConsumeQueue, consumeQueue.getMappedFiles().size());
int msgCountPerIndexFile = getMsgCountPerIndexFile();
int expectDeleteCountIndexFile = (int) Math.floor((double) (a * 10) / msgCountPerIndexFile);
assertEquals(fileCountIndexFile - expectDeleteCountIndexFile, getIndexFileList().size());
}
}
......@@ -274,32 +309,60 @@ public class DefaultMessageStoreCleanFilesTest {
return fileQueue;
}
private ArrayList<IndexFile> getIndexFileList() throws Exception {
Field indexServiceField = messageStore.getClass().getDeclaredField("indexService");
indexServiceField.setAccessible(true);
IndexService indexService = (IndexService) indexServiceField.get(messageStore);
Field indexFileListField = indexService.getClass().getDeclaredField("indexFileList");
indexFileListField.setAccessible(true);
ArrayList<IndexFile> indexFileList = (ArrayList<IndexFile>) indexFileListField.get(indexService);
return indexFileList;
}
private int getFileCountConsumeQueue() {
int countPerFile = getMsgCountPerConsumeQueueMappedFile();
double fileCount = (double) msgCount / countPerFile;
return (int) Math.ceil(fileCount);
}
private int getFileCountIndexFile() {
int countPerFile = getMsgCountPerIndexFile();
double fileCount = (double) msgCount / countPerFile;
return (int) Math.ceil(fileCount);
}
private int getMsgCountPerConsumeQueueMappedFile() {
int size = messageStore.getMessageStoreConfig().getMappedFileSizeConsumeQueue();
return size / CQ_STORE_UNIT_SIZE;// 7 in this case
}
private int getMsgCountPerIndexFile() {
// 7 in this case
return messageStore.getMessageStoreConfig().getMaxIndexNum() - 1;
}
private void buildAndPutMessagesToMessageStore(int msgCount) throws Exception {
int msgLen = topic.getBytes(CHARSET_UTF8).length + 91;
Map<String, String> properties = new HashMap<>(4);
properties.put(MessageConst.PROPERTY_KEYS, keys);
String s = MessageDecoder.messageProperties2String(properties);
int propertiesLen = s.getBytes(CHARSET_UTF8).length;
int commitLogEndFileMinBlankLength = 4 + 4;
int singleMsgBodyLen = mappedFileSize - msgLen - commitLogEndFileMinBlankLength;
int singleMsgBodyLen = mappedFileSize - msgLen - propertiesLen - commitLogEndFileMinBlankLength;
for (int i = 0; i < msgCount; i++) {
MessageExtBrokerInner msg = new MessageExtBrokerInner();
msg.setTopic(topic);
msg.setBody(new byte[singleMsgBodyLen]);
msg.setKeys(String.valueOf(System.currentTimeMillis()));
msg.setKeys(keys);
msg.setQueueId(queueId);
msg.setSysFlag(0);
msg.setBornTimestamp(System.currentTimeMillis());
msg.setStoreHost(storeHost);
msg.setBornHost(bornHost);
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
PutMessageResult result = messageStore.putMessage(msg);
assertTrue(result != null && result.isOk());
}
......@@ -324,8 +387,8 @@ public class DefaultMessageStoreCleanFilesTest {
MessageStoreConfig messageStoreConfig = new MessageStoreConfigForTest();
messageStoreConfig.setMappedFileSizeCommitLog(mappedFileSize);
messageStoreConfig.setMappedFileSizeConsumeQueue(mappedFileSize);
messageStoreConfig.setMaxHashSlotNum(10000);
messageStoreConfig.setMaxIndexNum(100 * 100);
messageStoreConfig.setMaxHashSlotNum(100);
messageStoreConfig.setMaxIndexNum(8);
messageStoreConfig.setFlushDiskType(FlushDiskType.SYNC_FLUSH);
messageStoreConfig.setFlushIntervalConsumeQueue(1);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册