提交 0a31de1c 编写于 作者: J Jason918 提交者: Zhendong Liu

[RIP-7] Multiple Directories Storage Support (#751)

* Update issue_template.md

* Update README.md

* RIP-7 Multiple Directories Storage Suppor

* add readonly commit log paths

* update

* finish test

* add test for testUpdatePathsOnline

* bug fix

* code format
上级 5dc64d03
......@@ -4,7 +4,7 @@ about: Describe this issue template's purpose here.
---
The issue tracker is **ONLY** used for bug report and feature request. Keep in mind, please check whether there is an existing same report before your raise a new one.
The issue tracker is **ONLY** used for bug report(feature request need to follow [RIP process](https://github.com/apache/rocketmq/wiki/RocketMQ-Improvement-Proposal)). Keep in mind, please check whether there is an existing same report before your raise a new one.
Alternately (especially if your communication is not a bug report), you can send mail to our [mailing lists](http://rocketmq.apache.org/about/contact/). We welcome any friendly suggestions, bug fixes, collaboration and other improvements.
......
......@@ -32,6 +32,7 @@ It offers a variety of features:
* Home: <https://rocketmq.apache.org>
* Docs: <https://rocketmq.apache.org/docs/quick-start/>
* Issues: <https://github.com/apache/rocketmq/issues>
* Rips: <https://github.com/apache/rocketmq/wiki/RocketMQ-Improvement-Proposal>
* Ask: <https://stackoverflow.com/questions/tagged/rocketmq>
* Slack: <https://rocketmq-invite-automation.herokuapp.com/>
......@@ -43,7 +44,7 @@ It offers a variety of features:
----------
## Contributing
We always welcome new contributions, whether for trivial cleanups, big new features or other material rewards, more details see [here](http://rocketmq.apache.org/docs/how-to-contribute/).
We always welcome new contributions, whether for trivial cleanups, [big new features](https://github.com/apache/rocketmq/wiki/RocketMQ-Improvement-Proposal) or other material rewards, more details see [here](http://rocketmq.apache.org/docs/how-to-contribute/).
----------
## License
......
......@@ -560,8 +560,15 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
}
private String diskUtil() {
double physicRatio = -1;
if (this.brokerController.getMessageStoreConfig().isMultiCommitLogPathEnable()) {
for (String storePathPhysic : this.brokerController.getMessageStoreConfig().getCommitLogStorePaths()) {
physicRatio = Math.max(physicRatio, UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic));
}
} else {
String storePathPhysic = this.brokerController.getMessageStoreConfig().getStorePathCommitLog();
double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
}
String storePathLogis =
StorePathConfigHelper.getStorePathConsumeQueue(this.brokerController.getMessageStoreConfig().getStorePathRootDir());
......
......@@ -64,8 +64,16 @@ public class CommitLog {
private final PutMessageLock putMessageLock;
public CommitLog(final DefaultMessageStore defaultMessageStore) {
if (defaultMessageStore.getMessageStoreConfig().isMultiCommitLogPathEnable()) {
this.mappedFileQueue = new MultiPathMappedFileQueue(defaultMessageStore.getMessageStoreConfig(),
defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog(),
defaultMessageStore.getAllocateMappedFileService());
} else {
this.mappedFileQueue = new MappedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(),
defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog(), defaultMessageStore.getAllocateMappedFileService());
defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog(),
defaultMessageStore.getAllocateMappedFileService());
}
this.defaultMessageStore = defaultMessageStore;
if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
......
......@@ -26,6 +26,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
......@@ -693,11 +694,18 @@ public class DefaultMessageStore implements MessageStore {
public HashMap<String, String> getRuntimeInfo() {
HashMap<String, String> result = this.storeStatsService.getRuntimeInfo();
{
if (DefaultMessageStore.this.getMessageStoreConfig().isMultiCommitLogPathEnable()) {
double maxValue = Double.MIN_VALUE;
for (String clPath : DefaultMessageStore.this.getMessageStoreConfig().getCommitLogStorePaths()) {
double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(clPath);
result.put(RunningStats.commitLogDiskRatio.name() + "_" + clPath, String.valueOf(physicRatio));
maxValue = Math.max(maxValue, physicRatio);
}
result.put(RunningStats.commitLogDiskRatio.name(), String.valueOf(maxValue));
} else {
String storePathPhysic = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog();
double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
result.put(RunningStats.commitLogDiskRatio.name(), String.valueOf(physicRatio));
}
{
......@@ -1537,12 +1545,19 @@ public class DefaultMessageStore implements MessageStore {
cleanImmediately = false;
{
String storePathPhysic = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog();
List<String> storePaths;
if (DefaultMessageStore.this.getMessageStoreConfig().isMultiCommitLogPathEnable()) {
storePaths = DefaultMessageStore.this.getMessageStoreConfig().getCommitLogStorePaths();
} else {
storePaths = Collections.singletonList(DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog());
}
for (String storePathPhysic : storePaths) {
double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
if (physicRatio > diskSpaceWarningLevelRatio) {
boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();
if (diskok) {
DefaultMessageStore.log.error("physic disk maybe full soon " + physicRatio + ", so mark disk full");
DefaultMessageStore.log.error("physic disk maybe full soon " + physicRatio + ", so mark disk full, storePathPhysic=" + storePathPhysic);
}
cleanImmediately = true;
......@@ -1551,15 +1566,16 @@ public class DefaultMessageStore implements MessageStore {
} else {
boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();
if (!diskok) {
DefaultMessageStore.log.info("physic disk space OK " + physicRatio + ", so mark disk ok");
DefaultMessageStore.log.info("physic disk space OK " + physicRatio + ", so mark disk ok, storePathPhysic=" + storePathPhysic);
}
}
if (physicRatio < 0 || physicRatio > ratio) {
DefaultMessageStore.log.info("physic disk maybe full soon, so reclaim space, " + physicRatio);
DefaultMessageStore.log.info("physic disk maybe full soon, so reclaim space, " + physicRatio + ", storePathPhysic=" + storePathPhysic);
return true;
}
}
}
{
String storePathLogics = StorePathConfigHelper
......
......@@ -20,6 +20,7 @@ import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
......@@ -37,13 +38,13 @@ public class MappedFileQueue {
private final String storePath;
private final int mappedFileSize;
protected final int mappedFileSize;
private final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();
protected final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();
private final AllocateMappedFileService allocateMappedFileService;
private long flushedWhere = 0;
protected long flushedWhere = 0;
private long committedWhere = 0;
private volatile long storeTimestamp = 0;
......@@ -144,14 +145,20 @@ public class MappedFileQueue {
}
}
public boolean load() {
File dir = new File(this.storePath);
File[] files = dir.listFiles();
if (files != null) {
File[] ls = dir.listFiles();
if (ls != null) {
return doLoad(Arrays.asList(ls));
}
return true;
}
public boolean doLoad(List<File> files) {
// ascending order
Arrays.sort(files);
Collections.sort(files);
for (File file : files) {
if (file.length() != this.mappedFileSize) {
log.warn(file + "\t" + file.length()
+ " length not matched message store config value, ignore it");
......@@ -171,8 +178,6 @@ public class MappedFileQueue {
return false;
}
}
}
return true;
}
......@@ -204,9 +209,20 @@ public class MappedFileQueue {
}
if (createOffset != -1 && needCreate) {
return tryCreateMappedFile(createOffset);
}
return mappedFileLast;
}
protected MappedFile tryCreateMappedFile(long createOffset) {
String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
String nextNextFilePath = this.storePath + File.separator
+ UtilAll.offset2FileName(createOffset + this.mappedFileSize);
String nextNextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset
+ this.mappedFileSize);
return doCreateMappedFile(nextFilePath, nextNextFilePath);
}
protected MappedFile doCreateMappedFile(String nextFilePath, String nextNextFilePath) {
MappedFile mappedFile = null;
if (this.allocateMappedFileService != null) {
......@@ -230,9 +246,6 @@ public class MappedFileQueue {
return mappedFile;
}
return mappedFileLast;
}
public MappedFile getLastMappedFile(final long startOffset) {
return getLastMappedFile(startOffset, true);
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.store;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
public class MultiPathMappedFileQueue extends MappedFileQueue {
private final MessageStoreConfig config;
public MultiPathMappedFileQueue(MessageStoreConfig messageStoreConfig, int mappedFileSize,
AllocateMappedFileService allocateMappedFileService) {
super(messageStoreConfig.getStorePathCommitLog(), mappedFileSize, allocateMappedFileService);
this.config = messageStoreConfig;
}
@Override
public boolean load() {
List<File> files = new ArrayList<>();
for (String path : config.getCommitLogStorePaths()) {
File dir = new File(path);
File[] ls = dir.listFiles();
if (ls != null) {
Collections.addAll(files, ls);
}
}
if (config.getReadOnlyCommitLogStorePaths() != null) {
for (String path : config.getReadOnlyCommitLogStorePaths()) {
File dir = new File(path);
File[] ls = dir.listFiles();
if (ls != null) {
Collections.addAll(files, ls);
}
}
}
return doLoad(files);
}
@Override
protected MappedFile tryCreateMappedFile(long createOffset) {
long fileIdx = createOffset / this.mappedFileSize;
List<String> pathList = config.getCommitLogStorePaths();
String nextFilePath = pathList.get((int) (fileIdx % pathList.size())) + File.separator
+ UtilAll.offset2FileName(createOffset);
String nextNextFilePath = pathList.get((int) ((fileIdx + 1) % pathList.size())) + File.separator
+ UtilAll.offset2FileName(createOffset + this.mappedFileSize);
return doCreateMappedFile(nextFilePath, nextNextFilePath);
}
@Override
public void destroy() {
for (MappedFile mf : this.mappedFiles) {
mf.destroy(1000 * 3);
}
this.mappedFiles.clear();
this.flushedWhere = 0;
if (config.getCommitLogStorePaths() != null) {
for (String path : config.getCommitLogStorePaths()) {
File file = new File(path);
if (file.isDirectory()) {
file.delete();
}
}
}
if (config.getReadOnlyCommitLogStorePaths() != null) {
for (String path : config.getReadOnlyCommitLogStorePaths()) {
File file = new File(path);
if (file.isDirectory()) {
file.delete();
}
}
}
}
}
......@@ -17,6 +17,10 @@
package org.apache.rocketmq.store.config;
import java.io.File;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.rocketmq.common.annotation.ImportantField;
import org.apache.rocketmq.store.ConsumeQueue;
......@@ -30,6 +34,12 @@ public class MessageStoreConfig {
private String storePathCommitLog = System.getProperty("user.home") + File.separator + "store"
+ File.separator + "commitlog";
private boolean multiCommitLogPathEnable = false;
private List<String> commitLogStorePaths = null;
private List<String> readOnlyCommitLogStorePaths = null;
// CommitLog file size,default is 1G
private int mapedFileSizeCommitLog = 1024 * 1024 * 1024;
// ConsumeQueue file size,default is 30W
......@@ -666,4 +676,41 @@ public class MessageStoreConfig {
this.commitCommitLogThoroughInterval = commitCommitLogThoroughInterval;
}
public boolean isMultiCommitLogPathEnable() {
return multiCommitLogPathEnable;
}
public void setMultiCommitLogPathEnable(boolean multiCommitLogPathEnable) {
this.multiCommitLogPathEnable = multiCommitLogPathEnable;
}
public List<String> getCommitLogStorePaths() {
return commitLogStorePaths;
}
public void setCommitLogStorePaths(String commitLogStorePaths) {
String[] tokens = commitLogStorePaths.trim().split(":");
List<String> pathList = Arrays.asList(tokens);
Collections.sort(pathList);
this.commitLogStorePaths = pathList;
}
public void setCommitLogStorePaths(List<String> commitLogStorePaths) {
this.commitLogStorePaths = commitLogStorePaths;
}
public List<String> getReadOnlyCommitLogStorePaths() {
return readOnlyCommitLogStorePaths;
}
public void setReadOnlyCommitLogStorePaths(List<String> readOnlyCommitLogStorePaths) {
this.readOnlyCommitLogStorePaths = readOnlyCommitLogStorePaths;
}
public void setReadOnlyCommitLogStorePaths(String readOnlyCommitLogStorePaths) {
String[] tokens = readOnlyCommitLogStorePaths.trim().split(":");
List<String> pathList = Arrays.asList(tokens);
Collections.sort(pathList);
this.readOnlyCommitLogStorePaths = pathList;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.store;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
public class MultiPathMappedFileQueueTest {
@Test
public void testGetLastMappedFile() {
final byte[] fixedMsg = new byte[1024];
MessageStoreConfig config = new MessageStoreConfig();
config.setMultiCommitLogPathEnable(true);
config.setCommitLogStorePaths("target/unit_test_store/a/:target/unit_test_store/b/:target/unit_test_store/c/");
MappedFileQueue mappedFileQueue = new MultiPathMappedFileQueue(config, 1024, null);
for (int i = 0; i < 1024; i++) {
MappedFile mappedFile = mappedFileQueue.getLastMappedFile(fixedMsg.length * i);
assertThat(mappedFile).isNotNull();
assertThat(mappedFile.appendMessage(fixedMsg)).isTrue();
int idx = i % config.getCommitLogStorePaths().size();
assertThat(mappedFile.getFileName().startsWith(config.getCommitLogStorePaths().get(idx))).isTrue();
}
mappedFileQueue.shutdown(1000);
mappedFileQueue.destroy();
}
@Test
public void testLoadReadOnlyMappedFiles() {
{
//create old mapped files
final byte[] fixedMsg = new byte[1024];
MessageStoreConfig config = new MessageStoreConfig();
config.setMultiCommitLogPathEnable(true);
config.setCommitLogStorePaths("target/unit_test_store/a/:target/unit_test_store/b/:target/unit_test_store/c/");
MappedFileQueue mappedFileQueue = new MultiPathMappedFileQueue(config, 1024, null);
for (int i = 0; i < 1024; i++) {
MappedFile mappedFile = mappedFileQueue.getLastMappedFile(fixedMsg.length * i);
assertThat(mappedFile).isNotNull();
assertThat(mappedFile.appendMessage(fixedMsg)).isTrue();
int idx = i % config.getCommitLogStorePaths().size();
assertThat(mappedFile.getFileName().startsWith(config.getCommitLogStorePaths().get(idx))).isTrue();
}
mappedFileQueue.shutdown(1000);
}
// test load and readonly
MessageStoreConfig config = new MessageStoreConfig();
config.setMultiCommitLogPathEnable(true);
config.setCommitLogStorePaths("target/unit_test_store/b/");
config.setReadOnlyCommitLogStorePaths("target/unit_test_store/a:target/unit_test_store/c");
MultiPathMappedFileQueue mappedFileQueue = new MultiPathMappedFileQueue(config, 1024, null);
mappedFileQueue.load();
assertThat(mappedFileQueue.mappedFiles.size()).isEqualTo(1024);
mappedFileQueue.destroy();
}
@Test
public void testUpdatePathsOnline() {
final byte[] fixedMsg = new byte[1024];
MessageStoreConfig config = new MessageStoreConfig();
config.setMultiCommitLogPathEnable(true);
config.setCommitLogStorePaths("target/unit_test_store/a/:target/unit_test_store/b/:target/unit_test_store/c/");
MappedFileQueue mappedFileQueue = new MultiPathMappedFileQueue(config, 1024, null);
for (int i = 0; i < 1024; i++) {
MappedFile mappedFile = mappedFileQueue.getLastMappedFile(fixedMsg.length * i);
assertThat(mappedFile).isNotNull();
assertThat(mappedFile.appendMessage(fixedMsg)).isTrue();
int idx = i % config.getCommitLogStorePaths().size();
assertThat(mappedFile.getFileName().startsWith(config.getCommitLogStorePaths().get(idx))).isTrue();
if (i == 500) {
config.setCommitLogStorePaths("target/unit_test_store/a/:target/unit_test_store/b/");
assertThat(config.getCommitLogStorePaths().size()).isEqualTo(2);
}
}
mappedFileQueue.shutdown(1000);
mappedFileQueue.destroy();
}
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册