From 466bb98eef5731132836b7ddc37366c860958503 Mon Sep 17 00:00:00 2001 From: Jiang Haiting Date: Fri, 17 Sep 2021 11:16:28 +0800 Subject: [PATCH] fix error in org.apache.rocketmq.store.DefaultMessageStore.CleanCommitLogService#isSpaceFull --- .../rocketmq/store/DefaultMessageStore.java | 23 +++++++++- .../DefaultMessageStoreCleanFilesTest.java | 42 +++++++++++++++++++ 2 files changed, 64 insertions(+), 1 deletion(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index 0af4c3c0..8543ae4d 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -1744,8 +1744,29 @@ public class DefaultMessageStore implements MessageStore { public void setManualDeleteFileSeveralTimes(int manualDeleteFileSeveralTimes) { this.manualDeleteFileSeveralTimes = manualDeleteFileSeveralTimes; } + + public double calcStorePathPhysicRatio() { + String storePath = getStorePathPhysic(); + if (storePath.contains(MessageStoreConfig.MULTI_PATH_SPLITTER)) { + Set fullStorePath = new HashSet<>(); + String[] paths = storePath.trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER); + double minPhysicRatio = 100; + for (String path : paths) { + double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(path); + minPhysicRatio = Math.min(minPhysicRatio, physicRatio); + if (physicRatio > diskSpaceCleanForciblyRatio) { + fullStorePath.add(path); + } + } + DefaultMessageStore.this.commitLog.setFullStorePaths(fullStorePath); + return minPhysicRatio; + } else { + return UtilAll.getDiskPartitionSpaceUsedPercent(storePath); + } + } + public boolean isSpaceFull() { - double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(getStorePathPhysic()); + double physicRatio = calcStorePathPhysicRatio(); double ratio = DefaultMessageStore.this.getMessageStoreConfig().getDiskMaxUsedSpaceRatio() / 100.0; if (physicRatio > ratio) { DefaultMessageStore.log.info("physic disk of commitLog used: " + physicRatio); diff --git a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java index 7455dfea..f4b8267c 100644 --- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java @@ -94,6 +94,41 @@ public class DefaultMessageStoreCleanFilesTest { } + @Test + public void testIsSpaceFullMultiCommitLogStorePath() throws Exception { + String deleteWhen = "04"; + // the min value of diskMaxUsedSpaceRatio. + int diskMaxUsedSpaceRatio = 1; + // used to set disk-full flag + double diskSpaceCleanForciblyRatio = 0.01D; + MessageStoreConfig config = genMessageStoreConfig(deleteWhen, diskMaxUsedSpaceRatio); + String storePath = config.getStorePathCommitLog(); + StringBuilder storePathBuilder = new StringBuilder(); + for (int i = 0; i < 3; i++) { + storePathBuilder.append(storePath).append(i).append(":"); + } + config.setStorePathCommitLog(storePathBuilder.toString()); + String[] paths = config.getStorePathCommitLog().trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER); + assertEquals(3, paths.length); + initMessageStore(config, diskSpaceCleanForciblyRatio); + + + + // build and put 55 messages, exactly one message per CommitLog file. + buildAndPutMessagesToMessageStore(msgCount); + MappedFileQueue commitLogQueue = getMappedFileQueueCommitLog(); + assertEquals(fileCountCommitLog, commitLogQueue.getMappedFiles().size()); + int fileCountConsumeQueue = getFileCountConsumeQueue(); + MappedFileQueue consumeQueue = getMappedFileQueueConsumeQueue(); + assertEquals(fileCountConsumeQueue, consumeQueue.getMappedFiles().size()); + cleanCommitLogService.isSpaceFull(); + + assertEquals(1 << 4, messageStore.getRunningFlags().getFlagBits() & (1 << 4)); + messageStore.shutdown(); + messageStore.destroy(); + + } + @Test public void testIsSpaceFullFunctionFull2Empty() throws Exception { String deleteWhen = "04"; @@ -421,6 +456,10 @@ public class DefaultMessageStoreCleanFilesTest { } private void initMessageStore(String deleteWhen, int diskMaxUsedSpaceRatio, double diskSpaceCleanForciblyRatio) throws Exception { + initMessageStore(genMessageStoreConfig(deleteWhen,diskMaxUsedSpaceRatio), diskSpaceCleanForciblyRatio); + } + + private MessageStoreConfig genMessageStoreConfig(String deleteWhen, int diskMaxUsedSpaceRatio) { MessageStoreConfig messageStoreConfig = new MessageStoreConfigForTest(); messageStoreConfig.setMappedFileSizeCommitLog(mappedFileSize); messageStoreConfig.setMappedFileSizeConsumeQueue(mappedFileSize); @@ -442,7 +481,10 @@ public class DefaultMessageStoreCleanFilesTest { String storePathCommitLog = storePathRootDir + File.separator + "commitlog"; messageStoreConfig.setStorePathRootDir(storePathRootDir); messageStoreConfig.setStorePathCommitLog(storePathCommitLog); + return messageStoreConfig; + } + private void initMessageStore(MessageStoreConfig messageStoreConfig, double diskSpaceCleanForciblyRatio) throws Exception { messageStore = new DefaultMessageStore(messageStoreConfig, new BrokerStatsManager("test"), new MyMessageArrivingListener(), new BrokerConfig()); -- GitLab