提交 a9a7dfa7 编写于 作者: J Jiang Haiting

RIP-7 Multiple Directories Storage Suppor

上级 147d23e3
......@@ -56,6 +56,7 @@ import org.apache.rocketmq.remoting.netty.RemotingResponseCallback;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.config.StorePathConfigHelper;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
......@@ -635,8 +636,16 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
}
private String diskUtil() {
String storePathPhysic = this.brokerController.getMessageStoreConfig().getStorePathCommitLog();
double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
double physicRatio = -1;
String storePath =this.brokerController.getMessageStoreConfig().getStorePathCommitLog();
if (storePath.contains(MessageStoreConfig.MULTI_PATH_SPLITTER)) {
String[] paths = storePath.trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER);
for (String storePathPhysic : paths) {
physicRatio = Math.min(physicRatio, UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic));
}
} else {
physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePath);
}
String storePathLogis =
StorePathConfigHelper.getStorePathConsumeQueue(this.brokerController.getMessageStoreConfig().getStorePathRootDir());
......
......@@ -22,10 +22,12 @@ import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
......@@ -43,6 +45,7 @@ import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.config.FlushDiskType;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.ha.HAService;
import org.apache.rocketmq.store.schedule.ScheduleMessageService;
......@@ -71,9 +74,20 @@ public class CommitLog {
protected final PutMessageLock putMessageLock;
private volatile Set<String> fullStorePaths = Collections.emptySet();
public CommitLog(final DefaultMessageStore defaultMessageStore) {
this.mappedFileQueue = new MappedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(),
defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(), defaultMessageStore.getAllocateMappedFileService());
String storePath = defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog();
if (storePath.contains(MessageStoreConfig.MULTI_PATH_SPLITTER)) {
this.mappedFileQueue = new MultiPathMappedFileQueue(defaultMessageStore.getMessageStoreConfig(),
defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(),
defaultMessageStore.getAllocateMappedFileService(), this::getFullStorePaths);
} else {
this.mappedFileQueue = new MappedFileQueue(storePath,
defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(),
defaultMessageStore.getAllocateMappedFileService());
}
this.defaultMessageStore = defaultMessageStore;
if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
......@@ -95,6 +109,14 @@ public class CommitLog {
}
public void setFullStorePaths(Set<String> fullStorePaths) {
this.fullStorePaths = fullStorePaths;
}
public Set<String> getFullStorePaths() {
return fullStorePaths;
}
public boolean load() {
boolean result = this.mappedFileQueue.load();
log.info("load commit log " + (result ? "OK" : "Failed"));
......
......@@ -26,6 +26,7 @@ import java.nio.ByteBuffer;
import java.nio.channels.FileLock;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
......@@ -782,10 +783,20 @@ public class DefaultMessageStore implements MessageStore {
public HashMap<String, String> getRuntimeInfo() {
HashMap<String, String> result = this.storeStatsService.getRuntimeInfo();
{
double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(getStorePathPhysic());
String commitLogStorePath = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog();
if (commitLogStorePath.contains(MessageStoreConfig.MULTI_PATH_SPLITTER)) {
double maxValue = Double.MIN_VALUE;
String[] paths = commitLogStorePath.trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER);
for (String clPath : paths) {
double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(clPath);
result.put(RunningStats.commitLogDiskRatio.name() + "_" + clPath, String.valueOf(physicRatio));
maxValue = Math.max(maxValue, physicRatio);
}
result.put(RunningStats.commitLogDiskRatio.name(), String.valueOf(maxValue));
} else {
String storePathPhysic = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog();
double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
result.put(RunningStats.commitLogDiskRatio.name(), String.valueOf(physicRatio));
}
{
......@@ -1650,25 +1661,49 @@ public class DefaultMessageStore implements MessageStore {
cleanImmediately = false;
{
double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(getStorePathPhysic());
if (physicRatio > diskSpaceWarningLevelRatio) {
String[] storePaths;
String commitLogStorePath = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog();
if (commitLogStorePath.contains(MessageStoreConfig.MULTI_PATH_SPLITTER)) {
storePaths = commitLogStorePath.trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER);
} else {
storePaths = new String[]{commitLogStorePath};
}
Set<String> fullStorePath = new HashSet<>();
double minPhysicRatio = 100;
String minStorePath = null;
for (String storePathPhysic : storePaths) {
double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
if (minPhysicRatio > physicRatio) {
minPhysicRatio = physicRatio;
minStorePath = storePathPhysic;
}
if (physicRatio > diskSpaceCleanForciblyRatio) {
fullStorePath.add(storePathPhysic);
}
}
DefaultMessageStore.this.commitLog.setFullStorePaths(fullStorePath);
if (minPhysicRatio > diskSpaceWarningLevelRatio) {
boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();
if (diskok) {
DefaultMessageStore.log.error("physic disk maybe full soon " + physicRatio + ", so mark disk full");
DefaultMessageStore.log.error("physic disk maybe full soon " + minPhysicRatio +
", so mark disk full, storePathPhysic=" + minStorePath);
}
cleanImmediately = true;
} else if (physicRatio > diskSpaceCleanForciblyRatio) {
} else if (minPhysicRatio > diskSpaceCleanForciblyRatio) {
cleanImmediately = true;
} else {
boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();
if (!diskok) {
DefaultMessageStore.log.info("physic disk space OK " + physicRatio + ", so mark disk ok");
DefaultMessageStore.log.info("physic disk space OK " + minPhysicRatio +
", so mark disk ok, storePathPhysic=" + minStorePath);
}
}
if (physicRatio < 0 || physicRatio > ratio) {
DefaultMessageStore.log.info("physic disk maybe full soon, so reclaim space, " + physicRatio);
if (minPhysicRatio < 0 || minPhysicRatio > ratio) {
DefaultMessageStore.log.info("physic disk maybe full soon, so reclaim space, "
+ minPhysicRatio + ", storePathPhysic=" + minStorePath);
return true;
}
}
......
......@@ -20,6 +20,7 @@ import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
......@@ -37,13 +38,13 @@ public class MappedFileQueue {
private final String storePath;
private final int mappedFileSize;
protected final int mappedFileSize;
private final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();
protected final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();
private final AllocateMappedFileService allocateMappedFileService;
private long flushedWhere = 0;
protected long flushedWhere = 0;
private long committedWhere = 0;
private volatile long storeTimestamp = 0;
......@@ -144,18 +145,24 @@ public class MappedFileQueue {
}
}
public boolean load() {
File dir = new File(this.storePath);
File[] files = dir.listFiles();
if (files != null) {
File[] ls = dir.listFiles();
if (ls != null) {
return doLoad(Arrays.asList(ls));
}
return true;
}
public boolean doLoad(List<File> files) {
// ascending order
Arrays.sort(files);
Collections.sort(files);
for (File file : files) {
if (file.length() != this.mappedFileSize) {
log.warn(file + "\t" + file.length()
+ " length not matched message store config value, please check it manually");
return false;
+ " length not matched message store config value, ignore it");
return true;
}
try {
......@@ -171,8 +178,6 @@ public class MappedFileQueue {
return false;
}
}
}
return true;
}
......@@ -204,9 +209,20 @@ public class MappedFileQueue {
}
if (createOffset != -1 && needCreate) {
return tryCreateMappedFile(createOffset);
}
return mappedFileLast;
}
protected MappedFile tryCreateMappedFile(long createOffset) {
String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
String nextNextFilePath = this.storePath + File.separator
+ UtilAll.offset2FileName(createOffset + this.mappedFileSize);
String nextNextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset
+ this.mappedFileSize);
return doCreateMappedFile(nextFilePath, nextNextFilePath);
}
protected MappedFile doCreateMappedFile(String nextFilePath, String nextNextFilePath) {
MappedFile mappedFile = null;
if (this.allocateMappedFileService != null) {
......@@ -230,9 +246,6 @@ public class MappedFileQueue {
return mappedFile;
}
return mappedFileLast;
}
public MappedFile getLastMappedFile(final long startOffset) {
return getLastMappedFile(startOffset, true);
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.store;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import java.util.function.Supplier;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
public class MultiPathMappedFileQueue extends MappedFileQueue {
private final MessageStoreConfig config;
private final Supplier<Set<String>> fullStorePathsSupplier;
public MultiPathMappedFileQueue(MessageStoreConfig messageStoreConfig, int mappedFileSize,
AllocateMappedFileService allocateMappedFileService,
Supplier<Set<String>> fullStorePathsSupplier) {
super(messageStoreConfig.getStorePathCommitLog(), mappedFileSize, allocateMappedFileService);
this.config = messageStoreConfig;
this.fullStorePathsSupplier = fullStorePathsSupplier;
}
private Set<String> getPaths() {
String[] paths = config.getStorePathCommitLog().trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER);
return new HashSet<>(Arrays.asList(paths));
}
private Set<String> getReadonlyPaths() {
String pathStr = config.getReadOnlyCommitLogStorePaths();
if (StringUtils.isBlank(pathStr)) {
return Collections.emptySet();
}
String[] paths = pathStr.trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER);
return new HashSet<>(Arrays.asList(paths));
}
@Override
public boolean load() {
Set<String> storePathSet = getPaths();
storePathSet.addAll(getReadonlyPaths());
List<File> files = new ArrayList<>();
for (String path : storePathSet) {
File dir = new File(path);
File[] ls = dir.listFiles();
if (ls != null) {
Collections.addAll(files, ls);
}
}
return doLoad(files);
}
@Override
protected MappedFile tryCreateMappedFile(long createOffset) {
long fileIdx = createOffset / this.mappedFileSize;
Set<String> storePath = getPaths();
Set<String> readonlyPathSet = getReadonlyPaths();
Set<String> fullStorePaths =
fullStorePathsSupplier == null ? Collections.emptySet() : fullStorePathsSupplier.get();
HashSet<String> availableStorePath = new HashSet<>(storePath);
//do not create file in readonly store path.
availableStorePath.removeAll(readonlyPathSet);
//do not create file is space is nearly full.
availableStorePath.removeAll(fullStorePaths);
//if no store path left, fall back to wriable store path.
if (availableStorePath.isEmpty()) {
availableStorePath = new HashSet<>(storePath);
availableStorePath.removeAll(readonlyPathSet);
}
String[] paths = availableStorePath.toArray(new String[]{});
Arrays.sort(paths);
String nextFilePath = paths[(int) (fileIdx % paths.length)] + File.separator
+ UtilAll.offset2FileName(createOffset);
String nextNextFilePath = paths[(int) ((fileIdx + 1) % paths.length)] + File.separator
+ UtilAll.offset2FileName(createOffset + this.mappedFileSize);
return doCreateMappedFile(nextFilePath, nextNextFilePath);
}
@Override
public void destroy() {
for (MappedFile mf : this.mappedFiles) {
mf.destroy(1000 * 3);
}
this.mappedFiles.clear();
this.flushedWhere = 0;
Set<String> storePathSet = getPaths();
storePathSet.addAll(getReadonlyPaths());
for (String path : storePathSet) {
File file = new File(path);
if (file.isDirectory()) {
file.delete();
}
}
}
}
......@@ -17,10 +17,17 @@
package org.apache.rocketmq.store.config;
import java.io.File;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.rocketmq.common.annotation.ImportantField;
import org.apache.rocketmq.store.ConsumeQueue;
public class MessageStoreConfig {
public static final String MULTI_PATH_SPLITTER = ":";
//The root directory in which the log data is kept
@ImportantField
private String storePathRootDir = System.getProperty("user.home") + File.separator + "store";
......@@ -30,6 +37,8 @@ public class MessageStoreConfig {
private String storePathCommitLog = System.getProperty("user.home") + File.separator + "store"
+ File.separator + "commitlog";
private String readOnlyCommitLogStorePaths = null;
// CommitLog file size,default is 1G
private int mappedFileSizeCommitLog = 1024 * 1024 * 1024;
// ConsumeQueue file size,default is 30W
......@@ -676,6 +685,13 @@ public class MessageStoreConfig {
this.commitCommitLogThoroughInterval = commitCommitLogThoroughInterval;
}
public String getReadOnlyCommitLogStorePaths() {
return readOnlyCommitLogStorePaths;
}
public void setReadOnlyCommitLogStorePaths(String readOnlyCommitLogStorePaths) {
this.readOnlyCommitLogStorePaths = readOnlyCommitLogStorePaths;
}
public String getdLegerGroup() {
return dLegerGroup;
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.store;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.HashSet;
import java.util.Set;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.junit.Test;
public class MultiPathMappedFileQueueTest {
@Test
public void testGetLastMappedFile() {
final byte[] fixedMsg = new byte[1024];
MessageStoreConfig config = new MessageStoreConfig();
config.setStorePathCommitLog("target/unit_test_store/a/:target/unit_test_store/b/:target/unit_test_store/c/");
MappedFileQueue mappedFileQueue = new MultiPathMappedFileQueue(config, 1024, null, null);
String[] storePaths = config.getStorePathCommitLog().trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER);
for (int i = 0; i < 1024; i++) {
MappedFile mappedFile = mappedFileQueue.getLastMappedFile(fixedMsg.length * i);
assertThat(mappedFile).isNotNull();
assertThat(mappedFile.appendMessage(fixedMsg)).isTrue();
int idx = i % storePaths.length;
assertThat(mappedFile.getFileName().startsWith(storePaths[idx])).isTrue();
}
mappedFileQueue.shutdown(1000);
mappedFileQueue.destroy();
}
@Test
public void testLoadReadOnlyMappedFiles() {
{
//create old mapped files
final byte[] fixedMsg = new byte[1024];
MessageStoreConfig config = new MessageStoreConfig();
config.setStorePathCommitLog(
"target/unit_test_store/a/:target/unit_test_store/b/:target/unit_test_store/c/");
MappedFileQueue mappedFileQueue = new MultiPathMappedFileQueue(config, 1024, null, null);
String[] storePaths = config.getStorePathCommitLog().trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER);
for (int i = 0; i < 1024; i++) {
MappedFile mappedFile = mappedFileQueue.getLastMappedFile(fixedMsg.length * i);
assertThat(mappedFile).isNotNull();
assertThat(mappedFile.appendMessage(fixedMsg)).isTrue();
int idx = i % storePaths.length;
assertThat(mappedFile.getFileName().startsWith(storePaths[idx])).isTrue();
}
mappedFileQueue.shutdown(1000);
}
// test load and readonly
MessageStoreConfig config = new MessageStoreConfig();
config.setStorePathCommitLog("target/unit_test_store/b/");
config.setReadOnlyCommitLogStorePaths("target/unit_test_store/a:target/unit_test_store/c");
MultiPathMappedFileQueue mappedFileQueue = new MultiPathMappedFileQueue(config, 1024, null, null);
mappedFileQueue.load();
assertThat(mappedFileQueue.mappedFiles.size()).isEqualTo(1025);
mappedFileQueue.destroy();
}
@Test
public void testUpdatePathsOnline() {
final byte[] fixedMsg = new byte[1024];
MessageStoreConfig config = new MessageStoreConfig();
config.setStorePathCommitLog("target/unit_test_store/a/:target/unit_test_store/b/:target/unit_test_store/c/");
MappedFileQueue mappedFileQueue = new MultiPathMappedFileQueue(config, 1024, null, null);
String[] storePaths = config.getStorePathCommitLog().trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER);
for (int i = 0; i < 1024; i++) {
MappedFile mappedFile = mappedFileQueue.getLastMappedFile(fixedMsg.length * i);
assertThat(mappedFile).isNotNull();
assertThat(mappedFile.appendMessage(fixedMsg)).isTrue();
int idx = i % storePaths.length;
assertThat(mappedFile.getFileName().startsWith(storePaths[idx])).isTrue();
if (i == 500) {
config.setStorePathCommitLog("target/unit_test_store/a/:target/unit_test_store/b/");
storePaths = config.getStorePathCommitLog().trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER);
}
}
mappedFileQueue.shutdown(1000);
mappedFileQueue.destroy();
}
@Test
public void testFullStorePath() {
final byte[] fixedMsg = new byte[1024];
Set<String> fullStorePath = new HashSet<>();
MessageStoreConfig config = new MessageStoreConfig();
config.setStorePathCommitLog("target/unit_test_store/a/:target/unit_test_store/b/:target/unit_test_store/c/");
MappedFileQueue mappedFileQueue = new MultiPathMappedFileQueue(config, 1024, null, () -> fullStorePath);
String[] storePaths = config.getStorePathCommitLog().trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER);
assertThat(storePaths.length).isEqualTo(3);
MappedFile mappedFile = mappedFileQueue.getLastMappedFile(0);
assertThat(mappedFile).isNotNull();
assertThat(mappedFile.appendMessage(fixedMsg)).isTrue();
assertThat(mappedFile.getFileName().startsWith(storePaths[0])).isTrue();
mappedFile = mappedFileQueue.getLastMappedFile(fixedMsg.length);
assertThat(mappedFile.getFileName().startsWith(storePaths[1])).isTrue();
assertThat(mappedFile.appendMessage(fixedMsg)).isTrue();
mappedFile = mappedFileQueue.getLastMappedFile(fixedMsg.length * 2);
assertThat(mappedFile.appendMessage(fixedMsg)).isTrue();
assertThat(mappedFile.getFileName().startsWith(storePaths[2])).isTrue();
fullStorePath.add("target/unit_test_store/b/");
mappedFile = mappedFileQueue.getLastMappedFile(fixedMsg.length * 3);
assertThat(mappedFile.appendMessage(fixedMsg)).isTrue();
assertThat(mappedFile.getFileName().startsWith(storePaths[2])).isTrue();
mappedFile = mappedFileQueue.getLastMappedFile(fixedMsg.length * 4);
assertThat(mappedFile.appendMessage(fixedMsg)).isTrue();
assertThat(mappedFile.getFileName().startsWith(storePaths[0])).isTrue();
mappedFileQueue.shutdown(1000);
mappedFileQueue.destroy();
}
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册