diff --git a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java index fd510f11a55ae86a54bb919f43693240c0cba2c4..abf7356e8a3ca2eca223b81ccaa575c32ecbb0f0 100644 --- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java @@ -19,8 +19,12 @@ package org.apache.rocketmq.store; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.message.MessageConst; +import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.store.config.FlushDiskType; import org.apache.rocketmq.store.config.MessageStoreConfig; +import org.apache.rocketmq.store.index.IndexFile; +import org.apache.rocketmq.store.index.IndexService; import org.apache.rocketmq.store.stats.BrokerStatsManager; import org.junit.After; import org.junit.Before; @@ -31,7 +35,9 @@ import java.lang.reflect.Field; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; +import java.util.ArrayList; import java.util.Calendar; +import java.util.HashMap; import java.util.Map; import java.util.UUID; @@ -52,6 +58,7 @@ public class DefaultMessageStoreCleanFilesTest { private SocketAddress storeHost; private String topic = "test"; + private String keys = "hello"; private int queueId = 0; private int fileCountCommitLog = 55; // exactly one message per CommitLog file. @@ -87,6 +94,9 @@ public class DefaultMessageStoreCleanFilesTest { MappedFileQueue consumeQueue = getMappedFileQueueConsumeQueue(); assertEquals(fileCountConsumeQueue, consumeQueue.getMappedFiles().size()); + int fileCountIndexFile = getFileCountIndexFile(); + assertEquals(fileCountIndexFile, getIndexFileList().size()); + int expireFileCount = 15; expireFiles(commitLogQueue, expireFileCount); @@ -101,6 +111,10 @@ public class DefaultMessageStoreCleanFilesTest { int msgCountPerFile = getMsgCountPerConsumeQueueMappedFile(); int expectDeleteCountConsumeQueue = (int) Math.floor((double) expectDeletedCount / msgCountPerFile); assertEquals(fileCountConsumeQueue - expectDeleteCountConsumeQueue, consumeQueue.getMappedFiles().size()); + + int msgCountPerIndexFile = getMsgCountPerIndexFile(); + int expectDeleteCountIndexFile = (int) Math.floor((double) expectDeletedCount / msgCountPerIndexFile); + assertEquals(fileCountIndexFile - expectDeleteCountIndexFile, getIndexFileList().size()); } } @@ -126,6 +140,9 @@ public class DefaultMessageStoreCleanFilesTest { MappedFileQueue consumeQueue = getMappedFileQueueConsumeQueue(); assertEquals(fileCountConsumeQueue, consumeQueue.getMappedFiles().size()); + int fileCountIndexFile = getFileCountIndexFile(); + assertEquals(fileCountIndexFile, getIndexFileList().size()); + int expireFileCount = 15; expireFiles(commitLogQueue, expireFileCount); @@ -140,6 +157,10 @@ public class DefaultMessageStoreCleanFilesTest { int msgCountPerFile = getMsgCountPerConsumeQueueMappedFile(); int expectDeleteCountConsumeQueue = (int) Math.floor((double) expectDeletedCount / msgCountPerFile); assertEquals(fileCountConsumeQueue - expectDeleteCountConsumeQueue, consumeQueue.getMappedFiles().size()); + + int msgCountPerIndexFile = getMsgCountPerIndexFile(); + int expectDeleteCountIndexFile = (int) Math.floor((double) expectDeletedCount / msgCountPerIndexFile); + assertEquals(fileCountIndexFile - expectDeleteCountIndexFile, getIndexFileList().size()); } } @@ -165,6 +186,9 @@ public class DefaultMessageStoreCleanFilesTest { MappedFileQueue consumeQueue = getMappedFileQueueConsumeQueue(); assertEquals(fileCountConsumeQueue, consumeQueue.getMappedFiles().size()); + int fileCountIndexFile = getFileCountIndexFile(); + assertEquals(fileCountIndexFile, getIndexFileList().size()); + // In this case, there is no need to expire the files. // int expireFileCount = 15; // expireFiles(commitLogQueue, expireFileCount); @@ -181,6 +205,10 @@ public class DefaultMessageStoreCleanFilesTest { int msgCountPerFile = getMsgCountPerConsumeQueueMappedFile(); int expectDeleteCountConsumeQueue = (int) Math.floor((double) (a * 10) / msgCountPerFile); assertEquals(fileCountConsumeQueue - expectDeleteCountConsumeQueue, consumeQueue.getMappedFiles().size()); + + int msgCountPerIndexFile = getMsgCountPerIndexFile(); + int expectDeleteCountIndexFile = (int) Math.floor((double) (a * 10) / msgCountPerIndexFile); + assertEquals(fileCountIndexFile - expectDeleteCountIndexFile, getIndexFileList().size()); } } @@ -208,6 +236,9 @@ public class DefaultMessageStoreCleanFilesTest { MappedFileQueue consumeQueue = getMappedFileQueueConsumeQueue(); assertEquals(fileCountConsumeQueue, consumeQueue.getMappedFiles().size()); + int fileCountIndexFile = getFileCountIndexFile(); + assertEquals(fileCountIndexFile, getIndexFileList().size()); + int expireFileCount = 15; expireFiles(commitLogQueue, expireFileCount); @@ -222,6 +253,10 @@ public class DefaultMessageStoreCleanFilesTest { int msgCountPerFile = getMsgCountPerConsumeQueueMappedFile(); int expectDeleteCountConsumeQueue = (int) Math.floor((double) expectDeletedCount / msgCountPerFile); assertEquals(fileCountConsumeQueue - expectDeleteCountConsumeQueue, consumeQueue.getMappedFiles().size()); + + int msgCountPerIndexFile = getMsgCountPerIndexFile(); + int expectDeleteCountIndexFile = (int) Math.floor((double) (a * 10) / msgCountPerIndexFile); + assertEquals(fileCountIndexFile - expectDeleteCountIndexFile, getIndexFileList().size()); } } @@ -274,32 +309,60 @@ public class DefaultMessageStoreCleanFilesTest { return fileQueue; } + private ArrayList getIndexFileList() throws Exception { + Field indexServiceField = messageStore.getClass().getDeclaredField("indexService"); + indexServiceField.setAccessible(true); + IndexService indexService = (IndexService) indexServiceField.get(messageStore); + + Field indexFileListField = indexService.getClass().getDeclaredField("indexFileList"); + indexFileListField.setAccessible(true); + ArrayList indexFileList = (ArrayList) indexFileListField.get(indexService); + + return indexFileList; + } + private int getFileCountConsumeQueue() { int countPerFile = getMsgCountPerConsumeQueueMappedFile(); double fileCount = (double) msgCount / countPerFile; return (int) Math.ceil(fileCount); } + private int getFileCountIndexFile() { + int countPerFile = getMsgCountPerIndexFile(); + double fileCount = (double) msgCount / countPerFile; + return (int) Math.ceil(fileCount); + } + private int getMsgCountPerConsumeQueueMappedFile() { int size = messageStore.getMessageStoreConfig().getMappedFileSizeConsumeQueue(); return size / CQ_STORE_UNIT_SIZE;// 7 in this case } + private int getMsgCountPerIndexFile() { + // 7 in this case + return messageStore.getMessageStoreConfig().getMaxIndexNum() - 1; + } + private void buildAndPutMessagesToMessageStore(int msgCount) throws Exception { int msgLen = topic.getBytes(CHARSET_UTF8).length + 91; + Map properties = new HashMap<>(4); + properties.put(MessageConst.PROPERTY_KEYS, keys); + String s = MessageDecoder.messageProperties2String(properties); + int propertiesLen = s.getBytes(CHARSET_UTF8).length; int commitLogEndFileMinBlankLength = 4 + 4; - int singleMsgBodyLen = mappedFileSize - msgLen - commitLogEndFileMinBlankLength; + int singleMsgBodyLen = mappedFileSize - msgLen - propertiesLen - commitLogEndFileMinBlankLength; for (int i = 0; i < msgCount; i++) { MessageExtBrokerInner msg = new MessageExtBrokerInner(); msg.setTopic(topic); msg.setBody(new byte[singleMsgBodyLen]); - msg.setKeys(String.valueOf(System.currentTimeMillis())); + msg.setKeys(keys); msg.setQueueId(queueId); msg.setSysFlag(0); msg.setBornTimestamp(System.currentTimeMillis()); msg.setStoreHost(storeHost); msg.setBornHost(bornHost); + msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); PutMessageResult result = messageStore.putMessage(msg); assertTrue(result != null && result.isOk()); } @@ -324,8 +387,8 @@ public class DefaultMessageStoreCleanFilesTest { MessageStoreConfig messageStoreConfig = new MessageStoreConfigForTest(); messageStoreConfig.setMappedFileSizeCommitLog(mappedFileSize); messageStoreConfig.setMappedFileSizeConsumeQueue(mappedFileSize); - messageStoreConfig.setMaxHashSlotNum(10000); - messageStoreConfig.setMaxIndexNum(100 * 100); + messageStoreConfig.setMaxHashSlotNum(100); + messageStoreConfig.setMaxIndexNum(8); messageStoreConfig.setFlushDiskType(FlushDiskType.SYNC_FLUSH); messageStoreConfig.setFlushIntervalConsumeQueue(1);