diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java index 1c7f860b04f3b5d0e71ebc54ed45d4306647dfd4..8f21d06e28fa55b7fc0697fe4fe668743c01e1cb 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java @@ -638,13 +638,9 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement private String diskUtil() { double physicRatio = 100; String storePath = this.brokerController.getMessageStoreConfig().getStorePathCommitLog(); - if (storePath.contains(MessageStoreConfig.MULTI_PATH_SPLITTER)) { - String[] paths = storePath.trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER); - for (String storePathPhysic : paths) { - physicRatio = Math.min(physicRatio, UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic)); - } - } else { - physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePath); + String[] paths = storePath.trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER); + for (String storePathPhysic : paths) { + physicRatio = Math.min(physicRatio, UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic)); } String storePathLogis = diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index d116c310c16c88762d09ca00f5613312b322a990..ffdac08aa43e19c2cce1653a5234760565c030a1 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -783,9 +783,9 @@ public class DefaultMessageStore implements MessageStore { public HashMap getRuntimeInfo() { HashMap result = this.storeStatsService.getRuntimeInfo(); - String commitLogStorePath = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog(); - if (commitLogStorePath.contains(MessageStoreConfig.MULTI_PATH_SPLITTER)) { + { double minPhysicsUsedRatio = Double.MAX_VALUE; + String commitLogStorePath = getStorePathPhysic(); String[] paths = commitLogStorePath.trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER); for (String clPath : paths) { double physicRatio = UtilAll.isPathExists(clPath) ? @@ -794,11 +794,6 @@ public class DefaultMessageStore implements MessageStore { minPhysicsUsedRatio = Math.min(minPhysicsUsedRatio, physicRatio); } result.put(RunningStats.commitLogDiskRatio.name(), String.valueOf(minPhysicsUsedRatio)); - } else { - String storePathPhysic = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog(); - double physicRatio = UtilAll.isPathExists(storePathPhysic) ? - UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic) : -1; - result.put(RunningStats.commitLogDiskRatio.name(), String.valueOf(physicRatio)); } { @@ -1663,14 +1658,8 @@ public class DefaultMessageStore implements MessageStore { cleanImmediately = false; { - String[] storePaths; String commitLogStorePath = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog(); - if (commitLogStorePath.contains(MessageStoreConfig.MULTI_PATH_SPLITTER)) { - storePaths = commitLogStorePath.trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER); - } else { - storePaths = new String[]{commitLogStorePath}; - } - + String[] storePaths = commitLogStorePath.trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER); Set fullStorePath = new HashSet<>(); double minPhysicRatio = 100; String minStorePath = null; @@ -1748,25 +1737,21 @@ public class DefaultMessageStore implements MessageStore { } public double calcStorePathPhysicRatio() { + Set fullStorePath = new HashSet<>(); String storePath = getStorePathPhysic(); - if (storePath.contains(MessageStoreConfig.MULTI_PATH_SPLITTER)) { - Set fullStorePath = new HashSet<>(); - String[] paths = storePath.trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER); - double minPhysicRatio = 100; - for (String path : paths) { - double physicRatio = UtilAll.isPathExists(path) ? - UtilAll.getDiskPartitionSpaceUsedPercent(path) : -1; - minPhysicRatio = Math.min(minPhysicRatio, physicRatio); - if (physicRatio > diskSpaceCleanForciblyRatio) { - fullStorePath.add(path); - } + String[] paths = storePath.trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER); + double minPhysicRatio = 100; + for (String path : paths) { + double physicRatio = UtilAll.isPathExists(path) ? + UtilAll.getDiskPartitionSpaceUsedPercent(path) : -1; + minPhysicRatio = Math.min(minPhysicRatio, physicRatio); + if (physicRatio > diskSpaceCleanForciblyRatio) { + fullStorePath.add(path); } - DefaultMessageStore.this.commitLog.setFullStorePaths(fullStorePath); - return minPhysicRatio; - } else { - return UtilAll.isPathExists(storePath) ? - UtilAll.getDiskPartitionSpaceUsedPercent(storePath) : -1; } + DefaultMessageStore.this.commitLog.setFullStorePaths(fullStorePath); + return minPhysicRatio; + } public boolean isSpaceFull() { diff --git a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java index 9a5318812ca575d6db8d24c5fdbd887f9880f7e2..8ead1798a458e3dd0bd6c12657ec9bac71046033 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.ListIterator; @@ -157,7 +158,8 @@ public class MappedFileQueue { public boolean doLoad(List files) { // ascending order - Collections.sort(files); + files.sort(Comparator.comparing(File::getName)); + for (File file : files) { if (file.length() != this.mappedFileSize) { log.warn(file + "\t" + file.length() diff --git a/store/src/test/java/org/apache/rocketmq/store/MultiPathMappedFileQueueTest.java b/store/src/test/java/org/apache/rocketmq/store/MultiPathMappedFileQueueTest.java index 838b59015921fe830a2f4976d2834c1a4b2405b8..66b3f93b01f8db4449b427dfdeaa087523254d7c 100644 --- a/store/src/test/java/org/apache/rocketmq/store/MultiPathMappedFileQueueTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/MultiPathMappedFileQueueTest.java @@ -20,6 +20,7 @@ package org.apache.rocketmq.store; import static org.assertj.core.api.Assertions.assertThat; import java.util.HashSet; import java.util.Set; +import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.store.config.MessageStoreConfig; import org.junit.Test; @@ -78,6 +79,10 @@ public class MultiPathMappedFileQueueTest { mappedFileQueue.load(); assertThat(mappedFileQueue.mappedFiles.size()).isEqualTo(1024); + for (int i = 0; i < 1024; i++) { + assertThat(mappedFileQueue.mappedFiles.get(i).getFile().getName()) + .isEqualTo(UtilAll.offset2FileName(1024 * i)); + } mappedFileQueue.destroy(); }