提交 474c4eb1 编写于 作者: J Jiang Haiting

Fix bug for multi path file loading. Opt code checking for multi-path...

Fix bug for multi path file loading. Opt code checking for multi-path commitlog path in storePathCommitLog.
上级 f32406d6
...@@ -638,13 +638,9 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement ...@@ -638,13 +638,9 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
private String diskUtil() { private String diskUtil() {
double physicRatio = 100; double physicRatio = 100;
String storePath = this.brokerController.getMessageStoreConfig().getStorePathCommitLog(); String storePath = this.brokerController.getMessageStoreConfig().getStorePathCommitLog();
if (storePath.contains(MessageStoreConfig.MULTI_PATH_SPLITTER)) { String[] paths = storePath.trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER);
String[] paths = storePath.trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER); for (String storePathPhysic : paths) {
for (String storePathPhysic : paths) { physicRatio = Math.min(physicRatio, UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic));
physicRatio = Math.min(physicRatio, UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic));
}
} else {
physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePath);
} }
String storePathLogis = String storePathLogis =
......
...@@ -783,9 +783,9 @@ public class DefaultMessageStore implements MessageStore { ...@@ -783,9 +783,9 @@ public class DefaultMessageStore implements MessageStore {
public HashMap<String, String> getRuntimeInfo() { public HashMap<String, String> getRuntimeInfo() {
HashMap<String, String> result = this.storeStatsService.getRuntimeInfo(); HashMap<String, String> result = this.storeStatsService.getRuntimeInfo();
String commitLogStorePath = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog(); {
if (commitLogStorePath.contains(MessageStoreConfig.MULTI_PATH_SPLITTER)) {
double minPhysicsUsedRatio = Double.MAX_VALUE; double minPhysicsUsedRatio = Double.MAX_VALUE;
String commitLogStorePath = getStorePathPhysic();
String[] paths = commitLogStorePath.trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER); String[] paths = commitLogStorePath.trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER);
for (String clPath : paths) { for (String clPath : paths) {
double physicRatio = UtilAll.isPathExists(clPath) ? double physicRatio = UtilAll.isPathExists(clPath) ?
...@@ -794,11 +794,6 @@ public class DefaultMessageStore implements MessageStore { ...@@ -794,11 +794,6 @@ public class DefaultMessageStore implements MessageStore {
minPhysicsUsedRatio = Math.min(minPhysicsUsedRatio, physicRatio); minPhysicsUsedRatio = Math.min(minPhysicsUsedRatio, physicRatio);
} }
result.put(RunningStats.commitLogDiskRatio.name(), String.valueOf(minPhysicsUsedRatio)); result.put(RunningStats.commitLogDiskRatio.name(), String.valueOf(minPhysicsUsedRatio));
} else {
String storePathPhysic = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog();
double physicRatio = UtilAll.isPathExists(storePathPhysic) ?
UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic) : -1;
result.put(RunningStats.commitLogDiskRatio.name(), String.valueOf(physicRatio));
} }
{ {
...@@ -1663,14 +1658,8 @@ public class DefaultMessageStore implements MessageStore { ...@@ -1663,14 +1658,8 @@ public class DefaultMessageStore implements MessageStore {
cleanImmediately = false; cleanImmediately = false;
{ {
String[] storePaths;
String commitLogStorePath = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog(); String commitLogStorePath = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog();
if (commitLogStorePath.contains(MessageStoreConfig.MULTI_PATH_SPLITTER)) { String[] storePaths = commitLogStorePath.trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER);
storePaths = commitLogStorePath.trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER);
} else {
storePaths = new String[]{commitLogStorePath};
}
Set<String> fullStorePath = new HashSet<>(); Set<String> fullStorePath = new HashSet<>();
double minPhysicRatio = 100; double minPhysicRatio = 100;
String minStorePath = null; String minStorePath = null;
...@@ -1748,25 +1737,21 @@ public class DefaultMessageStore implements MessageStore { ...@@ -1748,25 +1737,21 @@ public class DefaultMessageStore implements MessageStore {
} }
public double calcStorePathPhysicRatio() { public double calcStorePathPhysicRatio() {
Set<String> fullStorePath = new HashSet<>();
String storePath = getStorePathPhysic(); String storePath = getStorePathPhysic();
if (storePath.contains(MessageStoreConfig.MULTI_PATH_SPLITTER)) { String[] paths = storePath.trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER);
Set<String> fullStorePath = new HashSet<>(); double minPhysicRatio = 100;
String[] paths = storePath.trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER); for (String path : paths) {
double minPhysicRatio = 100; double physicRatio = UtilAll.isPathExists(path) ?
for (String path : paths) { UtilAll.getDiskPartitionSpaceUsedPercent(path) : -1;
double physicRatio = UtilAll.isPathExists(path) ? minPhysicRatio = Math.min(minPhysicRatio, physicRatio);
UtilAll.getDiskPartitionSpaceUsedPercent(path) : -1; if (physicRatio > diskSpaceCleanForciblyRatio) {
minPhysicRatio = Math.min(minPhysicRatio, physicRatio); fullStorePath.add(path);
if (physicRatio > diskSpaceCleanForciblyRatio) {
fullStorePath.add(path);
}
} }
DefaultMessageStore.this.commitLog.setFullStorePaths(fullStorePath);
return minPhysicRatio;
} else {
return UtilAll.isPathExists(storePath) ?
UtilAll.getDiskPartitionSpaceUsedPercent(storePath) : -1;
} }
DefaultMessageStore.this.commitLog.setFullStorePaths(fullStorePath);
return minPhysicRatio;
} }
public boolean isSpaceFull() { public boolean isSpaceFull() {
......
...@@ -21,6 +21,7 @@ import java.io.IOException; ...@@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.ListIterator; import java.util.ListIterator;
...@@ -157,7 +158,8 @@ public class MappedFileQueue { ...@@ -157,7 +158,8 @@ public class MappedFileQueue {
public boolean doLoad(List<File> files) { public boolean doLoad(List<File> files) {
// ascending order // ascending order
Collections.sort(files); files.sort(Comparator.comparing(File::getName));
for (File file : files) { for (File file : files) {
if (file.length() != this.mappedFileSize) { if (file.length() != this.mappedFileSize) {
log.warn(file + "\t" + file.length() log.warn(file + "\t" + file.length()
......
...@@ -20,6 +20,7 @@ package org.apache.rocketmq.store; ...@@ -20,6 +20,7 @@ package org.apache.rocketmq.store;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.junit.Test; import org.junit.Test;
...@@ -78,6 +79,10 @@ public class MultiPathMappedFileQueueTest { ...@@ -78,6 +79,10 @@ public class MultiPathMappedFileQueueTest {
mappedFileQueue.load(); mappedFileQueue.load();
assertThat(mappedFileQueue.mappedFiles.size()).isEqualTo(1024); assertThat(mappedFileQueue.mappedFiles.size()).isEqualTo(1024);
for (int i = 0; i < 1024; i++) {
assertThat(mappedFileQueue.mappedFiles.get(i).getFile().getName())
.isEqualTo(UtilAll.offset2FileName(1024 * i));
}
mappedFileQueue.destroy(); mappedFileQueue.destroy();
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册