From 474c4eb147b067d72ca49098c9fcb31b08c86977 Mon Sep 17 00:00:00 2001 From: Jiang Haiting Date: Wed, 22 Sep 2021 19:46:12 +0800 Subject: [PATCH] Fix bug for multi path file loading. Opt code checking for multi-path commitlog path in storePathCommitLog. --- .../processor/SendMessageProcessor.java | 10 ++--- .../rocketmq/store/DefaultMessageStore.java | 45 +++++++------------ .../rocketmq/store/MappedFileQueue.java | 4 +- .../store/MultiPathMappedFileQueueTest.java | 5 +++ 4 files changed, 26 insertions(+), 38 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java index 1c7f860b..8f21d06e 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java @@ -638,13 +638,9 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement private String diskUtil() { double physicRatio = 100; String storePath = this.brokerController.getMessageStoreConfig().getStorePathCommitLog(); - if (storePath.contains(MessageStoreConfig.MULTI_PATH_SPLITTER)) { - String[] paths = storePath.trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER); - for (String storePathPhysic : paths) { - physicRatio = Math.min(physicRatio, UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic)); - } - } else { - physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePath); + String[] paths = storePath.trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER); + for (String storePathPhysic : paths) { + physicRatio = Math.min(physicRatio, UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic)); } String storePathLogis = diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index d116c310..ffdac08a 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -783,9 +783,9 @@ public class DefaultMessageStore implements MessageStore { public HashMap getRuntimeInfo() { HashMap result = this.storeStatsService.getRuntimeInfo(); - String commitLogStorePath = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog(); - if (commitLogStorePath.contains(MessageStoreConfig.MULTI_PATH_SPLITTER)) { + { double minPhysicsUsedRatio = Double.MAX_VALUE; + String commitLogStorePath = getStorePathPhysic(); String[] paths = commitLogStorePath.trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER); for (String clPath : paths) { double physicRatio = UtilAll.isPathExists(clPath) ? @@ -794,11 +794,6 @@ public class DefaultMessageStore implements MessageStore { minPhysicsUsedRatio = Math.min(minPhysicsUsedRatio, physicRatio); } result.put(RunningStats.commitLogDiskRatio.name(), String.valueOf(minPhysicsUsedRatio)); - } else { - String storePathPhysic = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog(); - double physicRatio = UtilAll.isPathExists(storePathPhysic) ? - UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic) : -1; - result.put(RunningStats.commitLogDiskRatio.name(), String.valueOf(physicRatio)); } { @@ -1663,14 +1658,8 @@ public class DefaultMessageStore implements MessageStore { cleanImmediately = false; { - String[] storePaths; String commitLogStorePath = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog(); - if (commitLogStorePath.contains(MessageStoreConfig.MULTI_PATH_SPLITTER)) { - storePaths = commitLogStorePath.trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER); - } else { - storePaths = new String[]{commitLogStorePath}; - } - + String[] storePaths = commitLogStorePath.trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER); Set fullStorePath = new HashSet<>(); double minPhysicRatio = 100; String minStorePath = null; @@ -1748,25 +1737,21 @@ public class DefaultMessageStore implements MessageStore { } public double calcStorePathPhysicRatio() { + Set fullStorePath = new HashSet<>(); String storePath = getStorePathPhysic(); - if (storePath.contains(MessageStoreConfig.MULTI_PATH_SPLITTER)) { - Set fullStorePath = new HashSet<>(); - String[] paths = storePath.trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER); - double minPhysicRatio = 100; - for (String path : paths) { - double physicRatio = UtilAll.isPathExists(path) ? - UtilAll.getDiskPartitionSpaceUsedPercent(path) : -1; - minPhysicRatio = Math.min(minPhysicRatio, physicRatio); - if (physicRatio > diskSpaceCleanForciblyRatio) { - fullStorePath.add(path); - } + String[] paths = storePath.trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER); + double minPhysicRatio = 100; + for (String path : paths) { + double physicRatio = UtilAll.isPathExists(path) ? + UtilAll.getDiskPartitionSpaceUsedPercent(path) : -1; + minPhysicRatio = Math.min(minPhysicRatio, physicRatio); + if (physicRatio > diskSpaceCleanForciblyRatio) { + fullStorePath.add(path); } - DefaultMessageStore.this.commitLog.setFullStorePaths(fullStorePath); - return minPhysicRatio; - } else { - return UtilAll.isPathExists(storePath) ? - UtilAll.getDiskPartitionSpaceUsedPercent(storePath) : -1; } + DefaultMessageStore.this.commitLog.setFullStorePaths(fullStorePath); + return minPhysicRatio; + } public boolean isSpaceFull() { diff --git a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java index 9a531881..8ead1798 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.ListIterator; @@ -157,7 +158,8 @@ public class MappedFileQueue { public boolean doLoad(List files) { // ascending order - Collections.sort(files); + files.sort(Comparator.comparing(File::getName)); + for (File file : files) { if (file.length() != this.mappedFileSize) { log.warn(file + "\t" + file.length() diff --git a/store/src/test/java/org/apache/rocketmq/store/MultiPathMappedFileQueueTest.java b/store/src/test/java/org/apache/rocketmq/store/MultiPathMappedFileQueueTest.java index 838b5901..66b3f93b 100644 --- a/store/src/test/java/org/apache/rocketmq/store/MultiPathMappedFileQueueTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/MultiPathMappedFileQueueTest.java @@ -20,6 +20,7 @@ package org.apache.rocketmq.store; import static org.assertj.core.api.Assertions.assertThat; import java.util.HashSet; import java.util.Set; +import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.store.config.MessageStoreConfig; import org.junit.Test; @@ -78,6 +79,10 @@ public class MultiPathMappedFileQueueTest { mappedFileQueue.load(); assertThat(mappedFileQueue.mappedFiles.size()).isEqualTo(1024); + for (int i = 0; i < 1024; i++) { + assertThat(mappedFileQueue.mappedFiles.get(i).getFile().getName()) + .isEqualTo(UtilAll.offset2FileName(1024 * i)); + } mappedFileQueue.destroy(); } -- GitLab