From 0a31de1cfd71f77cf3f167fdd9fff2b63fdd6123 Mon Sep 17 00:00:00 2001 From: Jason918 Date: Thu, 21 Feb 2019 17:44:49 +0800 Subject: [PATCH] [RIP-7] Multiple Directories Storage Support (#751) * Update issue_template.md * Update README.md * RIP-7 Multiple Directories Storage Suppor * add readonly commit log paths * update * finish test * add test for testUpdatePathsOnline * bug fix * code format --- .github/ISSUE_TEMPLATE/issue_template.md | 2 +- README.md | 3 +- .../processor/SendMessageProcessor.java | 11 +- .../org/apache/rocketmq/store/CommitLog.java | 12 +- .../rocketmq/store/DefaultMessageStore.java | 56 +++++---- .../rocketmq/store/MappedFileQueue.java | 111 ++++++++++-------- .../store/MultiPathMappedFileQueue.java | 98 ++++++++++++++++ .../store/config/MessageStoreConfig.java | 47 ++++++++ .../store/MultiPathMappedFileQueueTest.java | 103 ++++++++++++++++ 9 files changed, 368 insertions(+), 75 deletions(-) create mode 100644 store/src/main/java/org/apache/rocketmq/store/MultiPathMappedFileQueue.java create mode 100644 store/src/test/java/org/apache/rocketmq/store/MultiPathMappedFileQueueTest.java diff --git a/.github/ISSUE_TEMPLATE/issue_template.md b/.github/ISSUE_TEMPLATE/issue_template.md index a77fb61d..1c8fa94a 100644 --- a/.github/ISSUE_TEMPLATE/issue_template.md +++ b/.github/ISSUE_TEMPLATE/issue_template.md @@ -4,7 +4,7 @@ about: Describe this issue template's purpose here. --- -The issue tracker is **ONLY** used for bug report and feature request. Keep in mind, please check whether there is an existing same report before your raise a new one. +The issue tracker is **ONLY** used for bug report(feature request need to follow [RIP process](https://github.com/apache/rocketmq/wiki/RocketMQ-Improvement-Proposal)). Keep in mind, please check whether there is an existing same report before your raise a new one. Alternately (especially if your communication is not a bug report), you can send mail to our [mailing lists](http://rocketmq.apache.org/about/contact/). We welcome any friendly suggestions, bug fixes, collaboration and other improvements. diff --git a/README.md b/README.md index 478f32fa..35817421 100644 --- a/README.md +++ b/README.md @@ -32,6 +32,7 @@ It offers a variety of features: * Home: * Docs: * Issues: +* Rips: * Ask: * Slack: @@ -43,7 +44,7 @@ It offers a variety of features: ---------- ## Contributing -We always welcome new contributions, whether for trivial cleanups, big new features or other material rewards, more details see [here](http://rocketmq.apache.org/docs/how-to-contribute/). +We always welcome new contributions, whether for trivial cleanups, [big new features](https://github.com/apache/rocketmq/wiki/RocketMQ-Improvement-Proposal) or other material rewards, more details see [here](http://rocketmq.apache.org/docs/how-to-contribute/). ---------- ## License diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java index b7e7a618..6bae91f0 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java @@ -560,8 +560,15 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement } private String diskUtil() { - String storePathPhysic = this.brokerController.getMessageStoreConfig().getStorePathCommitLog(); - double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic); + double physicRatio = -1; + if (this.brokerController.getMessageStoreConfig().isMultiCommitLogPathEnable()) { + for (String storePathPhysic : this.brokerController.getMessageStoreConfig().getCommitLogStorePaths()) { + physicRatio = Math.max(physicRatio, UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic)); + } + } else { + String storePathPhysic = this.brokerController.getMessageStoreConfig().getStorePathCommitLog(); + physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic); + } String storePathLogis = StorePathConfigHelper.getStorePathConsumeQueue(this.brokerController.getMessageStoreConfig().getStorePathRootDir()); diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java index 03b11516..d920c411 100644 --- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java @@ -64,8 +64,16 @@ public class CommitLog { private final PutMessageLock putMessageLock; public CommitLog(final DefaultMessageStore defaultMessageStore) { - this.mappedFileQueue = new MappedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(), - defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog(), defaultMessageStore.getAllocateMappedFileService()); + if (defaultMessageStore.getMessageStoreConfig().isMultiCommitLogPathEnable()) { + this.mappedFileQueue = new MultiPathMappedFileQueue(defaultMessageStore.getMessageStoreConfig(), + defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog(), + defaultMessageStore.getAllocateMappedFileService()); + } else { + this.mappedFileQueue = new MappedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(), + defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog(), + defaultMessageStore.getAllocateMappedFileService()); + } + this.defaultMessageStore = defaultMessageStore; if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index e0aef4f3..86dd6893 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -26,6 +26,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; +import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; @@ -693,11 +694,18 @@ public class DefaultMessageStore implements MessageStore { public HashMap getRuntimeInfo() { HashMap result = this.storeStatsService.getRuntimeInfo(); - { + if (DefaultMessageStore.this.getMessageStoreConfig().isMultiCommitLogPathEnable()) { + double maxValue = Double.MIN_VALUE; + for (String clPath : DefaultMessageStore.this.getMessageStoreConfig().getCommitLogStorePaths()) { + double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(clPath); + result.put(RunningStats.commitLogDiskRatio.name() + "_" + clPath, String.valueOf(physicRatio)); + maxValue = Math.max(maxValue, physicRatio); + } + result.put(RunningStats.commitLogDiskRatio.name(), String.valueOf(maxValue)); + } else { String storePathPhysic = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog(); double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic); result.put(RunningStats.commitLogDiskRatio.name(), String.valueOf(physicRatio)); - } { @@ -1537,27 +1545,35 @@ public class DefaultMessageStore implements MessageStore { cleanImmediately = false; { - String storePathPhysic = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog(); - double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic); - if (physicRatio > diskSpaceWarningLevelRatio) { - boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull(); - if (diskok) { - DefaultMessageStore.log.error("physic disk maybe full soon " + physicRatio + ", so mark disk full"); - } - - cleanImmediately = true; - } else if (physicRatio > diskSpaceCleanForciblyRatio) { - cleanImmediately = true; + List storePaths; + if (DefaultMessageStore.this.getMessageStoreConfig().isMultiCommitLogPathEnable()) { + storePaths = DefaultMessageStore.this.getMessageStoreConfig().getCommitLogStorePaths(); } else { - boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK(); - if (!diskok) { - DefaultMessageStore.log.info("physic disk space OK " + physicRatio + ", so mark disk ok"); - } + storePaths = Collections.singletonList(DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog()); } - if (physicRatio < 0 || physicRatio > ratio) { - DefaultMessageStore.log.info("physic disk maybe full soon, so reclaim space, " + physicRatio); - return true; + for (String storePathPhysic : storePaths) { + double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic); + if (physicRatio > diskSpaceWarningLevelRatio) { + boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull(); + if (diskok) { + DefaultMessageStore.log.error("physic disk maybe full soon " + physicRatio + ", so mark disk full, storePathPhysic=" + storePathPhysic); + } + + cleanImmediately = true; + } else if (physicRatio > diskSpaceCleanForciblyRatio) { + cleanImmediately = true; + } else { + boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK(); + if (!diskok) { + DefaultMessageStore.log.info("physic disk space OK " + physicRatio + ", so mark disk ok, storePathPhysic=" + storePathPhysic); + } + } + + if (physicRatio < 0 || physicRatio > ratio) { + DefaultMessageStore.log.info("physic disk maybe full soon, so reclaim space, " + physicRatio + ", storePathPhysic=" + storePathPhysic); + return true; + } } } diff --git a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java index 86de3d2c..be29e6cb 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java @@ -20,6 +20,7 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.ListIterator; @@ -37,13 +38,13 @@ public class MappedFileQueue { private final String storePath; - private final int mappedFileSize; + protected final int mappedFileSize; - private final CopyOnWriteArrayList mappedFiles = new CopyOnWriteArrayList(); + protected final CopyOnWriteArrayList mappedFiles = new CopyOnWriteArrayList(); private final AllocateMappedFileService allocateMappedFileService; - private long flushedWhere = 0; + protected long flushedWhere = 0; private long committedWhere = 0; private volatile long storeTimestamp = 0; @@ -144,35 +145,39 @@ public class MappedFileQueue { } } + public boolean load() { File dir = new File(this.storePath); - File[] files = dir.listFiles(); - if (files != null) { - // ascending order - Arrays.sort(files); - for (File file : files) { - - if (file.length() != this.mappedFileSize) { - log.warn(file + "\t" + file.length() + File[] ls = dir.listFiles(); + if (ls != null) { + return doLoad(Arrays.asList(ls)); + } + return true; + } + + public boolean doLoad(List files) { + // ascending order + Collections.sort(files); + for (File file : files) { + if (file.length() != this.mappedFileSize) { + log.warn(file + "\t" + file.length() + " length not matched message store config value, ignore it"); - return true; - } + return true; + } - try { - MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize); - - mappedFile.setWrotePosition(this.mappedFileSize); - mappedFile.setFlushedPosition(this.mappedFileSize); - mappedFile.setCommittedPosition(this.mappedFileSize); - this.mappedFiles.add(mappedFile); - log.info("load " + file.getPath() + " OK"); - } catch (IOException e) { - log.error("load file " + file + " error", e); - return false; - } + try { + MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize); + + mappedFile.setWrotePosition(this.mappedFileSize); + mappedFile.setFlushedPosition(this.mappedFileSize); + mappedFile.setCommittedPosition(this.mappedFileSize); + this.mappedFiles.add(mappedFile); + log.info("load " + file.getPath() + " OK"); + } catch (IOException e) { + log.error("load file " + file + " error", e); + return false; } } - return true; } @@ -204,33 +209,41 @@ public class MappedFileQueue { } if (createOffset != -1 && needCreate) { - String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset); - String nextNextFilePath = this.storePath + File.separator - + UtilAll.offset2FileName(createOffset + this.mappedFileSize); - MappedFile mappedFile = null; + return tryCreateMappedFile(createOffset); + } + + return mappedFileLast; + } + + protected MappedFile tryCreateMappedFile(long createOffset) { + String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset); + String nextNextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset + + this.mappedFileSize); + return doCreateMappedFile(nextFilePath, nextNextFilePath); + } - if (this.allocateMappedFileService != null) { - mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath, + protected MappedFile doCreateMappedFile(String nextFilePath, String nextNextFilePath) { + MappedFile mappedFile = null; + + if (this.allocateMappedFileService != null) { + mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath, nextNextFilePath, this.mappedFileSize); - } else { - try { - mappedFile = new MappedFile(nextFilePath, this.mappedFileSize); - } catch (IOException e) { - log.error("create mappedFile exception", e); - } + } else { + try { + mappedFile = new MappedFile(nextFilePath, this.mappedFileSize); + } catch (IOException e) { + log.error("create mappedFile exception", e); } + } - if (mappedFile != null) { - if (this.mappedFiles.isEmpty()) { - mappedFile.setFirstCreateInQueue(true); - } - this.mappedFiles.add(mappedFile); + if (mappedFile != null) { + if (this.mappedFiles.isEmpty()) { + mappedFile.setFirstCreateInQueue(true); } - - return mappedFile; + this.mappedFiles.add(mappedFile); } - return mappedFileLast; + return mappedFile; } public MappedFile getLastMappedFile(final long startOffset) { @@ -398,7 +411,7 @@ public class MappedFileQueue { destroy = maxOffsetInLogicQueue < offset; if (destroy) { log.info("physic min offset " + offset + ", logics in current mappedFile max offset " - + maxOffsetInLogicQueue + ", delete it"); + + maxOffsetInLogicQueue + ", delete it"); } } else if (!mappedFile.isAvailable()) { // Handle hanged file. log.warn("Found a hanged consume queue file, attempting to delete it."); @@ -466,7 +479,7 @@ public class MappedFileQueue { if (firstMappedFile != null && lastMappedFile != null) { if (offset < firstMappedFile.getFileFromOffset() || offset >= lastMappedFile.getFileFromOffset() + this.mappedFileSize) { LOG_ERROR.warn("Offset not matched. Request offset: {}, firstOffset: {}, lastOffset: {}, mappedFileSize: {}, mappedFiles count: {}", - offset, + offset, firstMappedFile.getFileFromOffset(), lastMappedFile.getFileFromOffset() + this.mappedFileSize, this.mappedFileSize, @@ -480,7 +493,7 @@ public class MappedFileQueue { } if (targetFile != null && offset >= targetFile.getFileFromOffset() - && offset < targetFile.getFileFromOffset() + this.mappedFileSize) { + && offset < targetFile.getFileFromOffset() + this.mappedFileSize) { return targetFile; } diff --git a/store/src/main/java/org/apache/rocketmq/store/MultiPathMappedFileQueue.java b/store/src/main/java/org/apache/rocketmq/store/MultiPathMappedFileQueue.java new file mode 100644 index 00000000..78ef4e00 --- /dev/null +++ b/store/src/main/java/org/apache/rocketmq/store/MultiPathMappedFileQueue.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.store; + + +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.store.config.MessageStoreConfig; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +public class MultiPathMappedFileQueue extends MappedFileQueue { + + private final MessageStoreConfig config; + + public MultiPathMappedFileQueue(MessageStoreConfig messageStoreConfig, int mappedFileSize, + AllocateMappedFileService allocateMappedFileService) { + super(messageStoreConfig.getStorePathCommitLog(), mappedFileSize, allocateMappedFileService); + this.config = messageStoreConfig; + } + + + @Override + public boolean load() { + List files = new ArrayList<>(); + for (String path : config.getCommitLogStorePaths()) { + File dir = new File(path); + File[] ls = dir.listFiles(); + if (ls != null) { + Collections.addAll(files, ls); + } + } + if (config.getReadOnlyCommitLogStorePaths() != null) { + for (String path : config.getReadOnlyCommitLogStorePaths()) { + File dir = new File(path); + File[] ls = dir.listFiles(); + if (ls != null) { + Collections.addAll(files, ls); + } + } + } + + return doLoad(files); + } + + @Override + protected MappedFile tryCreateMappedFile(long createOffset) { + long fileIdx = createOffset / this.mappedFileSize; + List pathList = config.getCommitLogStorePaths(); + String nextFilePath = pathList.get((int) (fileIdx % pathList.size())) + File.separator + + UtilAll.offset2FileName(createOffset); + String nextNextFilePath = pathList.get((int) ((fileIdx + 1) % pathList.size())) + File.separator + + UtilAll.offset2FileName(createOffset + this.mappedFileSize); + return doCreateMappedFile(nextFilePath, nextNextFilePath); + } + + @Override + public void destroy() { + for (MappedFile mf : this.mappedFiles) { + mf.destroy(1000 * 3); + } + this.mappedFiles.clear(); + this.flushedWhere = 0; + + if (config.getCommitLogStorePaths() != null) { + for (String path : config.getCommitLogStorePaths()) { + File file = new File(path); + if (file.isDirectory()) { + file.delete(); + } + } + } + if (config.getReadOnlyCommitLogStorePaths() != null) { + for (String path : config.getReadOnlyCommitLogStorePaths()) { + File file = new File(path); + if (file.isDirectory()) { + file.delete(); + } + } + } + } +} diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java index 8d60321e..7218c35e 100644 --- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java +++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java @@ -17,6 +17,10 @@ package org.apache.rocketmq.store.config; import java.io.File; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + import org.apache.rocketmq.common.annotation.ImportantField; import org.apache.rocketmq.store.ConsumeQueue; @@ -30,6 +34,12 @@ public class MessageStoreConfig { private String storePathCommitLog = System.getProperty("user.home") + File.separator + "store" + File.separator + "commitlog"; + private boolean multiCommitLogPathEnable = false; + + private List commitLogStorePaths = null; + + private List readOnlyCommitLogStorePaths = null; + // CommitLog file size,default is 1G private int mapedFileSizeCommitLog = 1024 * 1024 * 1024; // ConsumeQueue file size,default is 30W @@ -666,4 +676,41 @@ public class MessageStoreConfig { this.commitCommitLogThoroughInterval = commitCommitLogThoroughInterval; } + public boolean isMultiCommitLogPathEnable() { + return multiCommitLogPathEnable; + } + + public void setMultiCommitLogPathEnable(boolean multiCommitLogPathEnable) { + this.multiCommitLogPathEnable = multiCommitLogPathEnable; + } + + public List getCommitLogStorePaths() { + return commitLogStorePaths; + } + + public void setCommitLogStorePaths(String commitLogStorePaths) { + String[] tokens = commitLogStorePaths.trim().split(":"); + List pathList = Arrays.asList(tokens); + Collections.sort(pathList); + this.commitLogStorePaths = pathList; + } + + public void setCommitLogStorePaths(List commitLogStorePaths) { + this.commitLogStorePaths = commitLogStorePaths; + } + + public List getReadOnlyCommitLogStorePaths() { + return readOnlyCommitLogStorePaths; + } + + public void setReadOnlyCommitLogStorePaths(List readOnlyCommitLogStorePaths) { + this.readOnlyCommitLogStorePaths = readOnlyCommitLogStorePaths; + } + + public void setReadOnlyCommitLogStorePaths(String readOnlyCommitLogStorePaths) { + String[] tokens = readOnlyCommitLogStorePaths.trim().split(":"); + List pathList = Arrays.asList(tokens); + Collections.sort(pathList); + this.readOnlyCommitLogStorePaths = pathList; + } } diff --git a/store/src/test/java/org/apache/rocketmq/store/MultiPathMappedFileQueueTest.java b/store/src/test/java/org/apache/rocketmq/store/MultiPathMappedFileQueueTest.java new file mode 100644 index 00000000..1d0a6e5f --- /dev/null +++ b/store/src/test/java/org/apache/rocketmq/store/MultiPathMappedFileQueueTest.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.store; + +import org.apache.rocketmq.store.config.MessageStoreConfig; +import org.junit.Test; + +import static org.assertj.core.api.Assertions.assertThat; + + +public class MultiPathMappedFileQueueTest { + + @Test + public void testGetLastMappedFile() { + final byte[] fixedMsg = new byte[1024]; + + MessageStoreConfig config = new MessageStoreConfig(); + config.setMultiCommitLogPathEnable(true); + config.setCommitLogStorePaths("target/unit_test_store/a/:target/unit_test_store/b/:target/unit_test_store/c/"); + MappedFileQueue mappedFileQueue = new MultiPathMappedFileQueue(config, 1024, null); + for (int i = 0; i < 1024; i++) { + MappedFile mappedFile = mappedFileQueue.getLastMappedFile(fixedMsg.length * i); + assertThat(mappedFile).isNotNull(); + assertThat(mappedFile.appendMessage(fixedMsg)).isTrue(); + int idx = i % config.getCommitLogStorePaths().size(); + assertThat(mappedFile.getFileName().startsWith(config.getCommitLogStorePaths().get(idx))).isTrue(); + } + mappedFileQueue.shutdown(1000); + mappedFileQueue.destroy(); + } + + @Test + public void testLoadReadOnlyMappedFiles() { + { + //create old mapped files + final byte[] fixedMsg = new byte[1024]; + MessageStoreConfig config = new MessageStoreConfig(); + config.setMultiCommitLogPathEnable(true); + config.setCommitLogStorePaths("target/unit_test_store/a/:target/unit_test_store/b/:target/unit_test_store/c/"); + MappedFileQueue mappedFileQueue = new MultiPathMappedFileQueue(config, 1024, null); + for (int i = 0; i < 1024; i++) { + MappedFile mappedFile = mappedFileQueue.getLastMappedFile(fixedMsg.length * i); + assertThat(mappedFile).isNotNull(); + assertThat(mappedFile.appendMessage(fixedMsg)).isTrue(); + int idx = i % config.getCommitLogStorePaths().size(); + assertThat(mappedFile.getFileName().startsWith(config.getCommitLogStorePaths().get(idx))).isTrue(); + } + mappedFileQueue.shutdown(1000); + } + + // test load and readonly + MessageStoreConfig config = new MessageStoreConfig(); + config.setMultiCommitLogPathEnable(true); + config.setCommitLogStorePaths("target/unit_test_store/b/"); + config.setReadOnlyCommitLogStorePaths("target/unit_test_store/a:target/unit_test_store/c"); + MultiPathMappedFileQueue mappedFileQueue = new MultiPathMappedFileQueue(config, 1024, null); + + mappedFileQueue.load(); + + assertThat(mappedFileQueue.mappedFiles.size()).isEqualTo(1024); + mappedFileQueue.destroy(); + + } + + @Test + public void testUpdatePathsOnline() { + final byte[] fixedMsg = new byte[1024]; + + MessageStoreConfig config = new MessageStoreConfig(); + config.setMultiCommitLogPathEnable(true); + config.setCommitLogStorePaths("target/unit_test_store/a/:target/unit_test_store/b/:target/unit_test_store/c/"); + MappedFileQueue mappedFileQueue = new MultiPathMappedFileQueue(config, 1024, null); + for (int i = 0; i < 1024; i++) { + MappedFile mappedFile = mappedFileQueue.getLastMappedFile(fixedMsg.length * i); + assertThat(mappedFile).isNotNull(); + assertThat(mappedFile.appendMessage(fixedMsg)).isTrue(); + int idx = i % config.getCommitLogStorePaths().size(); + assertThat(mappedFile.getFileName().startsWith(config.getCommitLogStorePaths().get(idx))).isTrue(); + + if (i == 500) { + config.setCommitLogStorePaths("target/unit_test_store/a/:target/unit_test_store/b/"); + assertThat(config.getCommitLogStorePaths().size()).isEqualTo(2); + } + } + mappedFileQueue.shutdown(1000); + mappedFileQueue.destroy(); + } +} \ No newline at end of file -- GitLab