未验证 提交 4dbdbf0a 编写于 作者: youlixishia's avatar youlixishia 提交者: GitHub

Merge pull request #3357 from Jason918/RIP-7

RIP-7 Multiple Directories Storage Support
...@@ -56,6 +56,7 @@ import org.apache.rocketmq.remoting.netty.RemotingResponseCallback; ...@@ -56,6 +56,7 @@ import org.apache.rocketmq.remoting.netty.RemotingResponseCallback;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.MessageExtBrokerInner; import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.PutMessageResult; import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.config.StorePathConfigHelper; import org.apache.rocketmq.store.config.StorePathConfigHelper;
import org.apache.rocketmq.store.stats.BrokerStatsManager; import org.apache.rocketmq.store.stats.BrokerStatsManager;
...@@ -186,7 +187,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement ...@@ -186,7 +187,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
} }
} }
if (msgExt.getReconsumeTimes() >= maxReconsumeTimes if (msgExt.getReconsumeTimes() >= maxReconsumeTimes
|| delayLevel < 0) { || delayLevel < 0) {
newTopic = MixAll.getDLQTopic(requestHeader.getGroup()); newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
queueIdInt = ThreadLocalRandom.current().nextInt(99999999) % DLQ_NUMS_PER_GROUP; queueIdInt = ThreadLocalRandom.current().nextInt(99999999) % DLQ_NUMS_PER_GROUP;
...@@ -636,8 +637,12 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement ...@@ -636,8 +637,12 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
} }
private String diskUtil() { private String diskUtil() {
String storePathPhysic = this.brokerController.getMessageStoreConfig().getStorePathCommitLog(); double physicRatio = 100;
double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic); String storePath = this.brokerController.getMessageStoreConfig().getStorePathCommitLog();
String[] paths = storePath.trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER);
for (String storePathPhysic : paths) {
physicRatio = Math.min(physicRatio, UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic));
}
String storePathLogis = String storePathLogis =
StorePathConfigHelper.getStorePathConsumeQueue(this.brokerController.getMessageStoreConfig().getStorePathRootDir()); StorePathConfigHelper.getStorePathConsumeQueue(this.brokerController.getMessageStoreConfig().getStorePathRootDir());
......
...@@ -198,6 +198,11 @@ public class UtilAll { ...@@ -198,6 +198,11 @@ public class UtilAll {
cal.get(Calendar.SECOND)); cal.get(Calendar.SECOND));
} }
public static boolean isPathExists(final String path) {
File file = new File(path);
return file.exists();
}
public static double getDiskPartitionSpaceUsedPercent(final String path) { public static double getDiskPartitionSpaceUsedPercent(final String path) {
if (null == path || path.isEmpty()) { if (null == path || path.isEmpty()) {
log.error("Error when measuring disk space usage, path is null or empty, path : {}", path); log.error("Error when measuring disk space usage, path is null or empty, path : {}", path);
......
...@@ -22,10 +22,12 @@ import java.net.InetAddress; ...@@ -22,10 +22,12 @@ import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier; import java.util.function.Supplier;
...@@ -43,6 +45,7 @@ import org.apache.rocketmq.logging.InternalLogger; ...@@ -43,6 +45,7 @@ import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.store.config.BrokerRole; import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.config.FlushDiskType; import org.apache.rocketmq.store.config.FlushDiskType;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.ha.HAService; import org.apache.rocketmq.store.ha.HAService;
import org.apache.rocketmq.store.schedule.ScheduleMessageService; import org.apache.rocketmq.store.schedule.ScheduleMessageService;
...@@ -71,9 +74,20 @@ public class CommitLog { ...@@ -71,9 +74,20 @@ public class CommitLog {
protected final PutMessageLock putMessageLock; protected final PutMessageLock putMessageLock;
private volatile Set<String> fullStorePaths = Collections.emptySet();
public CommitLog(final DefaultMessageStore defaultMessageStore) { public CommitLog(final DefaultMessageStore defaultMessageStore) {
this.mappedFileQueue = new MappedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(), String storePath = defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog();
defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(), defaultMessageStore.getAllocateMappedFileService()); if (storePath.contains(MessageStoreConfig.MULTI_PATH_SPLITTER)) {
this.mappedFileQueue = new MultiPathMappedFileQueue(defaultMessageStore.getMessageStoreConfig(),
defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(),
defaultMessageStore.getAllocateMappedFileService(), this::getFullStorePaths);
} else {
this.mappedFileQueue = new MappedFileQueue(storePath,
defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(),
defaultMessageStore.getAllocateMappedFileService());
}
this.defaultMessageStore = defaultMessageStore; this.defaultMessageStore = defaultMessageStore;
if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
...@@ -95,6 +109,14 @@ public class CommitLog { ...@@ -95,6 +109,14 @@ public class CommitLog {
} }
public void setFullStorePaths(Set<String> fullStorePaths) {
this.fullStorePaths = fullStorePaths;
}
public Set<String> getFullStorePaths() {
return fullStorePaths;
}
public boolean load() { public boolean load() {
boolean result = this.mappedFileQueue.load(); boolean result = this.mappedFileQueue.load();
log.info("load commit log " + (result ? "OK" : "Failed")); log.info("load commit log " + (result ? "OK" : "Failed"));
......
...@@ -26,6 +26,7 @@ import java.nio.ByteBuffer; ...@@ -26,6 +26,7 @@ import java.nio.ByteBuffer;
import java.nio.channels.FileLock; import java.nio.channels.FileLock;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.Map; import java.util.Map;
...@@ -784,15 +785,22 @@ public class DefaultMessageStore implements MessageStore { ...@@ -784,15 +785,22 @@ public class DefaultMessageStore implements MessageStore {
HashMap<String, String> result = this.storeStatsService.getRuntimeInfo(); HashMap<String, String> result = this.storeStatsService.getRuntimeInfo();
{ {
double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(getStorePathPhysic()); double minPhysicsUsedRatio = Double.MAX_VALUE;
result.put(RunningStats.commitLogDiskRatio.name(), String.valueOf(physicRatio)); String commitLogStorePath = getStorePathPhysic();
String[] paths = commitLogStorePath.trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER);
for (String clPath : paths) {
double physicRatio = UtilAll.isPathExists(clPath) ?
UtilAll.getDiskPartitionSpaceUsedPercent(clPath) : -1;
result.put(RunningStats.commitLogDiskRatio.name() + "_" + clPath, String.valueOf(physicRatio));
minPhysicsUsedRatio = Math.min(minPhysicsUsedRatio, physicRatio);
}
result.put(RunningStats.commitLogDiskRatio.name(), String.valueOf(minPhysicsUsedRatio));
} }
{ {
String storePathLogics = StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()); String storePathLogics = StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir());
double logicsRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogics); double logicsRatio = UtilAll.isPathExists(storePathLogics) ?
UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogics) : -1;
result.put(RunningStats.consumeQueueDiskRatio.name(), String.valueOf(logicsRatio)); result.put(RunningStats.consumeQueueDiskRatio.name(), String.valueOf(logicsRatio));
} }
...@@ -1651,25 +1659,43 @@ public class DefaultMessageStore implements MessageStore { ...@@ -1651,25 +1659,43 @@ public class DefaultMessageStore implements MessageStore {
cleanImmediately = false; cleanImmediately = false;
{ {
double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(getStorePathPhysic()); String commitLogStorePath = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog();
if (physicRatio > diskSpaceWarningLevelRatio) { String[] storePaths = commitLogStorePath.trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER);
Set<String> fullStorePath = new HashSet<>();
double minPhysicRatio = 100;
String minStorePath = null;
for (String storePathPhysic : storePaths) {
double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
if (minPhysicRatio > physicRatio) {
minPhysicRatio = physicRatio;
minStorePath = storePathPhysic;
}
if (physicRatio > diskSpaceCleanForciblyRatio) {
fullStorePath.add(storePathPhysic);
}
}
DefaultMessageStore.this.commitLog.setFullStorePaths(fullStorePath);
if (minPhysicRatio > diskSpaceWarningLevelRatio) {
boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull(); boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();
if (diskok) { if (diskok) {
DefaultMessageStore.log.error("physic disk maybe full soon " + physicRatio + ", so mark disk full"); DefaultMessageStore.log.error("physic disk maybe full soon " + minPhysicRatio +
", so mark disk full, storePathPhysic=" + minStorePath);
} }
cleanImmediately = true; cleanImmediately = true;
} else if (physicRatio > diskSpaceCleanForciblyRatio) { } else if (minPhysicRatio > diskSpaceCleanForciblyRatio) {
cleanImmediately = true; cleanImmediately = true;
} else { } else {
boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK(); boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();
if (!diskok) { if (!diskok) {
DefaultMessageStore.log.info("physic disk space OK " + physicRatio + ", so mark disk ok"); DefaultMessageStore.log.info("physic disk space OK " + minPhysicRatio +
", so mark disk ok, storePathPhysic=" + minStorePath);
} }
} }
if (physicRatio < 0 || physicRatio > ratio) { if (minPhysicRatio < 0 || minPhysicRatio > ratio) {
DefaultMessageStore.log.info("physic disk maybe full soon, so reclaim space, " + physicRatio); DefaultMessageStore.log.info("physic disk maybe full soon, so reclaim space, "
+ minPhysicRatio + ", storePathPhysic=" + minStorePath);
return true; return true;
} }
} }
...@@ -1710,8 +1736,27 @@ public class DefaultMessageStore implements MessageStore { ...@@ -1710,8 +1736,27 @@ public class DefaultMessageStore implements MessageStore {
public void setManualDeleteFileSeveralTimes(int manualDeleteFileSeveralTimes) { public void setManualDeleteFileSeveralTimes(int manualDeleteFileSeveralTimes) {
this.manualDeleteFileSeveralTimes = manualDeleteFileSeveralTimes; this.manualDeleteFileSeveralTimes = manualDeleteFileSeveralTimes;
} }
public double calcStorePathPhysicRatio() {
Set<String> fullStorePath = new HashSet<>();
String storePath = getStorePathPhysic();
String[] paths = storePath.trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER);
double minPhysicRatio = 100;
for (String path : paths) {
double physicRatio = UtilAll.isPathExists(path) ?
UtilAll.getDiskPartitionSpaceUsedPercent(path) : -1;
minPhysicRatio = Math.min(minPhysicRatio, physicRatio);
if (physicRatio > diskSpaceCleanForciblyRatio) {
fullStorePath.add(path);
}
}
DefaultMessageStore.this.commitLog.setFullStorePaths(fullStorePath);
return minPhysicRatio;
}
public boolean isSpaceFull() { public boolean isSpaceFull() {
double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(getStorePathPhysic()); double physicRatio = calcStorePathPhysicRatio();
double ratio = DefaultMessageStore.this.getMessageStoreConfig().getDiskMaxUsedSpaceRatio() / 100.0; double ratio = DefaultMessageStore.this.getMessageStoreConfig().getDiskMaxUsedSpaceRatio() / 100.0;
if (physicRatio > ratio) { if (physicRatio > ratio) {
DefaultMessageStore.log.info("physic disk of commitLog used: " + physicRatio); DefaultMessageStore.log.info("physic disk of commitLog used: " + physicRatio);
......
...@@ -20,6 +20,7 @@ import java.io.File; ...@@ -20,6 +20,7 @@ import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Comparator;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.ListIterator; import java.util.ListIterator;
...@@ -37,13 +38,13 @@ public class MappedFileQueue { ...@@ -37,13 +38,13 @@ public class MappedFileQueue {
private final String storePath; private final String storePath;
private final int mappedFileSize; protected final int mappedFileSize;
private final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>(); protected final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();
private final AllocateMappedFileService allocateMappedFileService; private final AllocateMappedFileService allocateMappedFileService;
private long flushedWhere = 0; protected long flushedWhere = 0;
private long committedWhere = 0; private long committedWhere = 0;
private volatile long storeTimestamp = 0; private volatile long storeTimestamp = 0;
...@@ -144,35 +145,40 @@ public class MappedFileQueue { ...@@ -144,35 +145,40 @@ public class MappedFileQueue {
} }
} }
public boolean load() { public boolean load() {
File dir = new File(this.storePath); File dir = new File(this.storePath);
File[] files = dir.listFiles(); File[] ls = dir.listFiles();
if (files != null) { if (ls != null) {
// ascending order return doLoad(Arrays.asList(ls));
Arrays.sort(files); }
for (File file : files) { return true;
}
if (file.length() != this.mappedFileSize) {
log.warn(file + "\t" + file.length()
+ " length not matched message store config value, please check it manually");
return false;
}
try { public boolean doLoad(List<File> files) {
MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize); // ascending order
files.sort(Comparator.comparing(File::getName));
mappedFile.setWrotePosition(this.mappedFileSize);
mappedFile.setFlushedPosition(this.mappedFileSize); for (File file : files) {
mappedFile.setCommittedPosition(this.mappedFileSize); if (file.length() != this.mappedFileSize) {
this.mappedFiles.add(mappedFile); log.warn(file + "\t" + file.length()
log.info("load " + file.getPath() + " OK"); + " length not matched message store config value, ignore it");
} catch (IOException e) { return true;
log.error("load file " + file + " error", e);
return false;
}
} }
}
try {
MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize);
mappedFile.setWrotePosition(this.mappedFileSize);
mappedFile.setFlushedPosition(this.mappedFileSize);
mappedFile.setCommittedPosition(this.mappedFileSize);
this.mappedFiles.add(mappedFile);
log.info("load " + file.getPath() + " OK");
} catch (IOException e) {
log.error("load file " + file + " error", e);
return false;
}
}
return true; return true;
} }
...@@ -204,33 +210,41 @@ public class MappedFileQueue { ...@@ -204,33 +210,41 @@ public class MappedFileQueue {
} }
if (createOffset != -1 && needCreate) { if (createOffset != -1 && needCreate) {
String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset); return tryCreateMappedFile(createOffset);
String nextNextFilePath = this.storePath + File.separator }
+ UtilAll.offset2FileName(createOffset + this.mappedFileSize);
MappedFile mappedFile = null;
if (this.allocateMappedFileService != null) { return mappedFileLast;
mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath, }
protected MappedFile tryCreateMappedFile(long createOffset) {
String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
String nextNextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset
+ this.mappedFileSize);
return doCreateMappedFile(nextFilePath, nextNextFilePath);
}
protected MappedFile doCreateMappedFile(String nextFilePath, String nextNextFilePath) {
MappedFile mappedFile = null;
if (this.allocateMappedFileService != null) {
mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
nextNextFilePath, this.mappedFileSize); nextNextFilePath, this.mappedFileSize);
} else { } else {
try { try {
mappedFile = new MappedFile(nextFilePath, this.mappedFileSize); mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
} catch (IOException e) { } catch (IOException e) {
log.error("create mappedFile exception", e); log.error("create mappedFile exception", e);
}
} }
}
if (mappedFile != null) { if (mappedFile != null) {
if (this.mappedFiles.isEmpty()) { if (this.mappedFiles.isEmpty()) {
mappedFile.setFirstCreateInQueue(true); mappedFile.setFirstCreateInQueue(true);
}
this.mappedFiles.add(mappedFile);
} }
this.mappedFiles.add(mappedFile);
return mappedFile;
} }
return mappedFileLast; return mappedFile;
} }
public MappedFile getLastMappedFile(final long startOffset) { public MappedFile getLastMappedFile(final long startOffset) {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.store;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.function.Supplier;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
public class MultiPathMappedFileQueue extends MappedFileQueue {
private final MessageStoreConfig config;
private final Supplier<Set<String>> fullStorePathsSupplier;
public MultiPathMappedFileQueue(MessageStoreConfig messageStoreConfig, int mappedFileSize,
AllocateMappedFileService allocateMappedFileService,
Supplier<Set<String>> fullStorePathsSupplier) {
super(messageStoreConfig.getStorePathCommitLog(), mappedFileSize, allocateMappedFileService);
this.config = messageStoreConfig;
this.fullStorePathsSupplier = fullStorePathsSupplier;
}
private Set<String> getPaths() {
String[] paths = config.getStorePathCommitLog().trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER);
return new HashSet<>(Arrays.asList(paths));
}
private Set<String> getReadonlyPaths() {
String pathStr = config.getReadOnlyCommitLogStorePaths();
if (StringUtils.isBlank(pathStr)) {
return Collections.emptySet();
}
String[] paths = pathStr.trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER);
return new HashSet<>(Arrays.asList(paths));
}
@Override
public boolean load() {
Set<String> storePathSet = getPaths();
storePathSet.addAll(getReadonlyPaths());
List<File> files = new ArrayList<>();
for (String path : storePathSet) {
File dir = new File(path);
File[] ls = dir.listFiles();
if (ls != null) {
Collections.addAll(files, ls);
}
}
return doLoad(files);
}
@Override
protected MappedFile tryCreateMappedFile(long createOffset) {
long fileIdx = createOffset / this.mappedFileSize;
Set<String> storePath = getPaths();
Set<String> readonlyPathSet = getReadonlyPaths();
Set<String> fullStorePaths =
fullStorePathsSupplier == null ? Collections.emptySet() : fullStorePathsSupplier.get();
HashSet<String> availableStorePath = new HashSet<>(storePath);
//do not create file in readonly store path.
availableStorePath.removeAll(readonlyPathSet);
//do not create file is space is nearly full.
availableStorePath.removeAll(fullStorePaths);
//if no store path left, fall back to writable store path.
if (availableStorePath.isEmpty()) {
availableStorePath = new HashSet<>(storePath);
availableStorePath.removeAll(readonlyPathSet);
}
String[] paths = availableStorePath.toArray(new String[]{});
Arrays.sort(paths);
String nextFilePath = paths[(int) (fileIdx % paths.length)] + File.separator
+ UtilAll.offset2FileName(createOffset);
String nextNextFilePath = paths[(int) ((fileIdx + 1) % paths.length)] + File.separator
+ UtilAll.offset2FileName(createOffset + this.mappedFileSize);
return doCreateMappedFile(nextFilePath, nextNextFilePath);
}
@Override
public void destroy() {
for (MappedFile mf : this.mappedFiles) {
mf.destroy(1000 * 3);
}
this.mappedFiles.clear();
this.flushedWhere = 0;
Set<String> storePathSet = getPaths();
storePathSet.addAll(getReadonlyPaths());
for (String path : storePathSet) {
File file = new File(path);
if (file.isDirectory()) {
file.delete();
}
}
}
}
...@@ -17,10 +17,14 @@ ...@@ -17,10 +17,14 @@
package org.apache.rocketmq.store.config; package org.apache.rocketmq.store.config;
import java.io.File; import java.io.File;
import org.apache.rocketmq.common.annotation.ImportantField; import org.apache.rocketmq.common.annotation.ImportantField;
import org.apache.rocketmq.store.ConsumeQueue; import org.apache.rocketmq.store.ConsumeQueue;
public class MessageStoreConfig { public class MessageStoreConfig {
public static final String MULTI_PATH_SPLITTER = System.getProperty("rocketmq.broker.multiPathSplitter", ",");
//The root directory in which the log data is kept //The root directory in which the log data is kept
@ImportantField @ImportantField
private String storePathRootDir = System.getProperty("user.home") + File.separator + "store"; private String storePathRootDir = System.getProperty("user.home") + File.separator + "store";
...@@ -30,6 +34,8 @@ public class MessageStoreConfig { ...@@ -30,6 +34,8 @@ public class MessageStoreConfig {
private String storePathCommitLog = System.getProperty("user.home") + File.separator + "store" private String storePathCommitLog = System.getProperty("user.home") + File.separator + "store"
+ File.separator + "commitlog"; + File.separator + "commitlog";
private String readOnlyCommitLogStorePaths = null;
// CommitLog file size,default is 1G // CommitLog file size,default is 1G
private int mappedFileSizeCommitLog = 1024 * 1024 * 1024; private int mappedFileSizeCommitLog = 1024 * 1024 * 1024;
// ConsumeQueue file size,default is 30W // ConsumeQueue file size,default is 30W
...@@ -676,6 +682,13 @@ public class MessageStoreConfig { ...@@ -676,6 +682,13 @@ public class MessageStoreConfig {
this.commitCommitLogThoroughInterval = commitCommitLogThoroughInterval; this.commitCommitLogThoroughInterval = commitCommitLogThoroughInterval;
} }
public String getReadOnlyCommitLogStorePaths() {
return readOnlyCommitLogStorePaths;
}
public void setReadOnlyCommitLogStorePaths(String readOnlyCommitLogStorePaths) {
this.readOnlyCommitLogStorePaths = readOnlyCommitLogStorePaths;
}
public String getdLegerGroup() { public String getdLegerGroup() {
return dLegerGroup; return dLegerGroup;
} }
......
...@@ -94,6 +94,41 @@ public class DefaultMessageStoreCleanFilesTest { ...@@ -94,6 +94,41 @@ public class DefaultMessageStoreCleanFilesTest {
} }
@Test
public void testIsSpaceFullMultiCommitLogStorePath() throws Exception {
String deleteWhen = "04";
// the min value of diskMaxUsedSpaceRatio.
int diskMaxUsedSpaceRatio = 1;
// used to set disk-full flag
double diskSpaceCleanForciblyRatio = 0.01D;
MessageStoreConfig config = genMessageStoreConfig(deleteWhen, diskMaxUsedSpaceRatio);
String storePath = config.getStorePathCommitLog();
StringBuilder storePathBuilder = new StringBuilder();
for (int i = 0; i < 3; i++) {
storePathBuilder.append(storePath).append(i).append(MessageStoreConfig.MULTI_PATH_SPLITTER);
}
config.setStorePathCommitLog(storePathBuilder.toString());
String[] paths = config.getStorePathCommitLog().trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER);
assertEquals(3, paths.length);
initMessageStore(config, diskSpaceCleanForciblyRatio);
// build and put 55 messages, exactly one message per CommitLog file.
buildAndPutMessagesToMessageStore(msgCount);
MappedFileQueue commitLogQueue = getMappedFileQueueCommitLog();
assertEquals(fileCountCommitLog, commitLogQueue.getMappedFiles().size());
int fileCountConsumeQueue = getFileCountConsumeQueue();
MappedFileQueue consumeQueue = getMappedFileQueueConsumeQueue();
assertEquals(fileCountConsumeQueue, consumeQueue.getMappedFiles().size());
cleanCommitLogService.isSpaceFull();
assertEquals(1 << 4, messageStore.getRunningFlags().getFlagBits() & (1 << 4));
messageStore.shutdown();
messageStore.destroy();
}
@Test @Test
public void testIsSpaceFullFunctionFull2Empty() throws Exception { public void testIsSpaceFullFunctionFull2Empty() throws Exception {
String deleteWhen = "04"; String deleteWhen = "04";
...@@ -421,6 +456,10 @@ public class DefaultMessageStoreCleanFilesTest { ...@@ -421,6 +456,10 @@ public class DefaultMessageStoreCleanFilesTest {
} }
private void initMessageStore(String deleteWhen, int diskMaxUsedSpaceRatio, double diskSpaceCleanForciblyRatio) throws Exception { private void initMessageStore(String deleteWhen, int diskMaxUsedSpaceRatio, double diskSpaceCleanForciblyRatio) throws Exception {
initMessageStore(genMessageStoreConfig(deleteWhen,diskMaxUsedSpaceRatio), diskSpaceCleanForciblyRatio);
}
private MessageStoreConfig genMessageStoreConfig(String deleteWhen, int diskMaxUsedSpaceRatio) {
MessageStoreConfig messageStoreConfig = new MessageStoreConfigForTest(); MessageStoreConfig messageStoreConfig = new MessageStoreConfigForTest();
messageStoreConfig.setMappedFileSizeCommitLog(mappedFileSize); messageStoreConfig.setMappedFileSizeCommitLog(mappedFileSize);
messageStoreConfig.setMappedFileSizeConsumeQueue(mappedFileSize); messageStoreConfig.setMappedFileSizeConsumeQueue(mappedFileSize);
...@@ -442,7 +481,10 @@ public class DefaultMessageStoreCleanFilesTest { ...@@ -442,7 +481,10 @@ public class DefaultMessageStoreCleanFilesTest {
String storePathCommitLog = storePathRootDir + File.separator + "commitlog"; String storePathCommitLog = storePathRootDir + File.separator + "commitlog";
messageStoreConfig.setStorePathRootDir(storePathRootDir); messageStoreConfig.setStorePathRootDir(storePathRootDir);
messageStoreConfig.setStorePathCommitLog(storePathCommitLog); messageStoreConfig.setStorePathCommitLog(storePathCommitLog);
return messageStoreConfig;
}
private void initMessageStore(MessageStoreConfig messageStoreConfig, double diskSpaceCleanForciblyRatio) throws Exception {
messageStore = new DefaultMessageStore(messageStoreConfig, messageStore = new DefaultMessageStore(messageStoreConfig,
new BrokerStatsManager("test"), new MyMessageArrivingListener(), new BrokerConfig()); new BrokerStatsManager("test"), new MyMessageArrivingListener(), new BrokerConfig());
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.store;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.HashSet;
import java.util.Set;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.junit.Test;
public class MultiPathMappedFileQueueTest {
@Test
public void testGetLastMappedFile() {
final byte[] fixedMsg = new byte[1024];
MessageStoreConfig config = new MessageStoreConfig();
config.setStorePathCommitLog("target/unit_test_store/a/" + MessageStoreConfig.MULTI_PATH_SPLITTER
+ "target/unit_test_store/b/" + MessageStoreConfig.MULTI_PATH_SPLITTER
+ "target/unit_test_store/c/");
MappedFileQueue mappedFileQueue = new MultiPathMappedFileQueue(config, 1024, null, null);
String[] storePaths = config.getStorePathCommitLog().trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER);
for (int i = 0; i < 1024; i++) {
MappedFile mappedFile = mappedFileQueue.getLastMappedFile(fixedMsg.length * i);
assertThat(mappedFile).isNotNull();
assertThat(mappedFile.appendMessage(fixedMsg)).isTrue();
int idx = i % storePaths.length;
assertThat(mappedFile.getFileName().startsWith(storePaths[idx])).isTrue();
}
mappedFileQueue.shutdown(1000);
mappedFileQueue.destroy();
}
@Test
public void testLoadReadOnlyMappedFiles() {
{
//create old mapped files
final byte[] fixedMsg = new byte[1024];
MessageStoreConfig config = new MessageStoreConfig();
config.setStorePathCommitLog("target/unit_test_store/a/" + MessageStoreConfig.MULTI_PATH_SPLITTER
+ "target/unit_test_store/b/" + MessageStoreConfig.MULTI_PATH_SPLITTER
+ "target/unit_test_store/c/");
MappedFileQueue mappedFileQueue = new MultiPathMappedFileQueue(config, 1024, null, null);
String[] storePaths = config.getStorePathCommitLog().trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER);
for (int i = 0; i < 1024; i++) {
MappedFile mappedFile = mappedFileQueue.getLastMappedFile(fixedMsg.length * i);
assertThat(mappedFile).isNotNull();
assertThat(mappedFile.appendMessage(fixedMsg)).isTrue();
int idx = i % storePaths.length;
assertThat(mappedFile.getFileName().startsWith(storePaths[idx])).isTrue();
}
mappedFileQueue.shutdown(1000);
}
// test load and readonly
MessageStoreConfig config = new MessageStoreConfig();
config.setStorePathCommitLog("target/unit_test_store/b/");
config.setReadOnlyCommitLogStorePaths("target/unit_test_store/a" + MessageStoreConfig.MULTI_PATH_SPLITTER
+ "target/unit_test_store/c");
MultiPathMappedFileQueue mappedFileQueue = new MultiPathMappedFileQueue(config, 1024, null, null);
mappedFileQueue.load();
assertThat(mappedFileQueue.mappedFiles.size()).isEqualTo(1024);
for (int i = 0; i < 1024; i++) {
assertThat(mappedFileQueue.mappedFiles.get(i).getFile().getName())
.isEqualTo(UtilAll.offset2FileName(1024 * i));
}
mappedFileQueue.destroy();
}
@Test
public void testUpdatePathsOnline() {
final byte[] fixedMsg = new byte[1024];
MessageStoreConfig config = new MessageStoreConfig();
config.setStorePathCommitLog("target/unit_test_store/a/" + MessageStoreConfig.MULTI_PATH_SPLITTER
+ "target/unit_test_store/b/" + MessageStoreConfig.MULTI_PATH_SPLITTER
+ "target/unit_test_store/c/");
MappedFileQueue mappedFileQueue = new MultiPathMappedFileQueue(config, 1024, null, null);
String[] storePaths = config.getStorePathCommitLog().trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER);
for (int i = 0; i < 1024; i++) {
MappedFile mappedFile = mappedFileQueue.getLastMappedFile(fixedMsg.length * i);
assertThat(mappedFile).isNotNull();
assertThat(mappedFile.appendMessage(fixedMsg)).isTrue();
int idx = i % storePaths.length;
assertThat(mappedFile.getFileName().startsWith(storePaths[idx])).isTrue();
if (i == 500) {
config.setStorePathCommitLog("target/unit_test_store/a/" + MessageStoreConfig.MULTI_PATH_SPLITTER
+ "target/unit_test_store/b/");
storePaths = config.getStorePathCommitLog().trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER);
}
}
mappedFileQueue.shutdown(1000);
mappedFileQueue.destroy();
}
@Test
public void testFullStorePath() {
final byte[] fixedMsg = new byte[1024];
Set<String> fullStorePath = new HashSet<>();
MessageStoreConfig config = new MessageStoreConfig();
config.setStorePathCommitLog("target/unit_test_store/a/" + MessageStoreConfig.MULTI_PATH_SPLITTER
+ "target/unit_test_store/b/" + MessageStoreConfig.MULTI_PATH_SPLITTER
+ "target/unit_test_store/c/");
MappedFileQueue mappedFileQueue = new MultiPathMappedFileQueue(config, 1024, null, () -> fullStorePath);
String[] storePaths = config.getStorePathCommitLog().trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER);
assertThat(storePaths.length).isEqualTo(3);
MappedFile mappedFile = mappedFileQueue.getLastMappedFile(0);
assertThat(mappedFile).isNotNull();
assertThat(mappedFile.appendMessage(fixedMsg)).isTrue();
assertThat(mappedFile.getFileName().startsWith(storePaths[0])).isTrue();
mappedFile = mappedFileQueue.getLastMappedFile(fixedMsg.length);
assertThat(mappedFile.getFileName().startsWith(storePaths[1])).isTrue();
assertThat(mappedFile.appendMessage(fixedMsg)).isTrue();
mappedFile = mappedFileQueue.getLastMappedFile(fixedMsg.length * 2);
assertThat(mappedFile.appendMessage(fixedMsg)).isTrue();
assertThat(mappedFile.getFileName().startsWith(storePaths[2])).isTrue();
fullStorePath.add("target/unit_test_store/b/");
mappedFile = mappedFileQueue.getLastMappedFile(fixedMsg.length * 3);
assertThat(mappedFile.appendMessage(fixedMsg)).isTrue();
assertThat(mappedFile.getFileName().startsWith(storePaths[2])).isTrue();
mappedFile = mappedFileQueue.getLastMappedFile(fixedMsg.length * 4);
assertThat(mappedFile.appendMessage(fixedMsg)).isTrue();
assertThat(mappedFile.getFileName().startsWith(storePaths[0])).isTrue();
mappedFileQueue.shutdown(1000);
mappedFileQueue.destroy();
}
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册