diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java index 6942c88d870812a4d49e5574d0a93ad6c3d3eb99..a11fb6cc1e544522d140b1438b3721820da16979 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java @@ -56,6 +56,7 @@ import org.apache.rocketmq.remoting.netty.RemotingResponseCallback; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.store.MessageExtBrokerInner; import org.apache.rocketmq.store.PutMessageResult; +import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.config.StorePathConfigHelper; import org.apache.rocketmq.store.stats.BrokerStatsManager; @@ -186,7 +187,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement } } - if (msgExt.getReconsumeTimes() >= maxReconsumeTimes + if (msgExt.getReconsumeTimes() >= maxReconsumeTimes || delayLevel < 0) { newTopic = MixAll.getDLQTopic(requestHeader.getGroup()); queueIdInt = ThreadLocalRandom.current().nextInt(99999999) % DLQ_NUMS_PER_GROUP; @@ -635,8 +636,16 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement } private String diskUtil() { - String storePathPhysic = this.brokerController.getMessageStoreConfig().getStorePathCommitLog(); - double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic); + double physicRatio = -1; + String storePath =this.brokerController.getMessageStoreConfig().getStorePathCommitLog(); + if (storePath.contains(MessageStoreConfig.MULTI_PATH_SPLITTER)) { + String[] paths = storePath.trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER); + for (String storePathPhysic : paths) { + physicRatio = Math.min(physicRatio, UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic)); + } + } else { + physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePath); + } String storePathLogis = StorePathConfigHelper.getStorePathConsumeQueue(this.brokerController.getMessageStoreConfig().getStorePathRootDir()); diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java index 32e9a657d1eb4c6d966bf98669fc9cc18e325ea5..3a89dc849954c49c0b5071d8985c5fa4fc7a7291 100644 --- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java @@ -22,10 +22,12 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.ByteBuffer; +import java.util.Collections; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; @@ -43,6 +45,7 @@ import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.store.config.BrokerRole; import org.apache.rocketmq.store.config.FlushDiskType; +import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.ha.HAService; import org.apache.rocketmq.store.schedule.ScheduleMessageService; @@ -71,9 +74,20 @@ public class CommitLog { protected final PutMessageLock putMessageLock; + private volatile Set fullStorePaths = Collections.emptySet(); + public CommitLog(final DefaultMessageStore defaultMessageStore) { - this.mappedFileQueue = new MappedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(), - defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(), defaultMessageStore.getAllocateMappedFileService()); + String storePath = defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(); + if (storePath.contains(MessageStoreConfig.MULTI_PATH_SPLITTER)) { + this.mappedFileQueue = new MultiPathMappedFileQueue(defaultMessageStore.getMessageStoreConfig(), + defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(), + defaultMessageStore.getAllocateMappedFileService(), this::getFullStorePaths); + } else { + this.mappedFileQueue = new MappedFileQueue(storePath, + defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(), + defaultMessageStore.getAllocateMappedFileService()); + } + this.defaultMessageStore = defaultMessageStore; if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { @@ -95,6 +109,14 @@ public class CommitLog { } + public void setFullStorePaths(Set fullStorePaths) { + this.fullStorePaths = fullStorePaths; + } + + public Set getFullStorePaths() { + return fullStorePaths; + } + public boolean load() { boolean result = this.mappedFileQueue.load(); log.info("load commit log " + (result ? "OK" : "Failed")); diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index 5bf68ac5fc70a32fd1cf8a73afbcffa31c8254b9..942c10b7266c160ea98ec92f5bd3aec38f5c545d 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -26,6 +26,7 @@ import java.nio.ByteBuffer; import java.nio.channels.FileLock; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.LinkedList; import java.util.Map; @@ -782,10 +783,20 @@ public class DefaultMessageStore implements MessageStore { public HashMap getRuntimeInfo() { HashMap result = this.storeStatsService.getRuntimeInfo(); - { - double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(getStorePathPhysic()); + String commitLogStorePath = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog(); + if (commitLogStorePath.contains(MessageStoreConfig.MULTI_PATH_SPLITTER)) { + double maxValue = Double.MIN_VALUE; + String[] paths = commitLogStorePath.trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER); + for (String clPath : paths) { + double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(clPath); + result.put(RunningStats.commitLogDiskRatio.name() + "_" + clPath, String.valueOf(physicRatio)); + maxValue = Math.max(maxValue, physicRatio); + } + result.put(RunningStats.commitLogDiskRatio.name(), String.valueOf(maxValue)); + } else { + String storePathPhysic = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog(); + double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic); result.put(RunningStats.commitLogDiskRatio.name(), String.valueOf(physicRatio)); - } { @@ -1650,25 +1661,49 @@ public class DefaultMessageStore implements MessageStore { cleanImmediately = false; { - double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(getStorePathPhysic()); - if (physicRatio > diskSpaceWarningLevelRatio) { + String[] storePaths; + String commitLogStorePath = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog(); + if (commitLogStorePath.contains(MessageStoreConfig.MULTI_PATH_SPLITTER)) { + storePaths = commitLogStorePath.trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER); + } else { + storePaths = new String[]{commitLogStorePath}; + } + + Set fullStorePath = new HashSet<>(); + double minPhysicRatio = 100; + String minStorePath = null; + for (String storePathPhysic : storePaths) { + double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic); + if (minPhysicRatio > physicRatio) { + minPhysicRatio = physicRatio; + minStorePath = storePathPhysic; + } + if (physicRatio > diskSpaceCleanForciblyRatio) { + fullStorePath.add(storePathPhysic); + } + } + DefaultMessageStore.this.commitLog.setFullStorePaths(fullStorePath); + if (minPhysicRatio > diskSpaceWarningLevelRatio) { boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull(); if (diskok) { - DefaultMessageStore.log.error("physic disk maybe full soon " + physicRatio + ", so mark disk full"); + DefaultMessageStore.log.error("physic disk maybe full soon " + minPhysicRatio + + ", so mark disk full, storePathPhysic=" + minStorePath); } cleanImmediately = true; - } else if (physicRatio > diskSpaceCleanForciblyRatio) { + } else if (minPhysicRatio > diskSpaceCleanForciblyRatio) { cleanImmediately = true; } else { boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK(); if (!diskok) { - DefaultMessageStore.log.info("physic disk space OK " + physicRatio + ", so mark disk ok"); + DefaultMessageStore.log.info("physic disk space OK " + minPhysicRatio + + ", so mark disk ok, storePathPhysic=" + minStorePath); } } - if (physicRatio < 0 || physicRatio > ratio) { - DefaultMessageStore.log.info("physic disk maybe full soon, so reclaim space, " + physicRatio); + if (minPhysicRatio < 0 || minPhysicRatio > ratio) { + DefaultMessageStore.log.info("physic disk maybe full soon, so reclaim space, " + + minPhysicRatio + ", storePathPhysic=" + minStorePath); return true; } } diff --git a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java index cc145921cef90239ca13dc5ff9b3714e2f4a27c2..be29e6cb64613277733f373aaa7dd6ffda56b102 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java @@ -20,6 +20,7 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.ListIterator; @@ -37,13 +38,13 @@ public class MappedFileQueue { private final String storePath; - private final int mappedFileSize; + protected final int mappedFileSize; - private final CopyOnWriteArrayList mappedFiles = new CopyOnWriteArrayList(); + protected final CopyOnWriteArrayList mappedFiles = new CopyOnWriteArrayList(); private final AllocateMappedFileService allocateMappedFileService; - private long flushedWhere = 0; + protected long flushedWhere = 0; private long committedWhere = 0; private volatile long storeTimestamp = 0; @@ -144,35 +145,39 @@ public class MappedFileQueue { } } + public boolean load() { File dir = new File(this.storePath); - File[] files = dir.listFiles(); - if (files != null) { - // ascending order - Arrays.sort(files); - for (File file : files) { - - if (file.length() != this.mappedFileSize) { - log.warn(file + "\t" + file.length() - + " length not matched message store config value, please check it manually"); - return false; - } + File[] ls = dir.listFiles(); + if (ls != null) { + return doLoad(Arrays.asList(ls)); + } + return true; + } - try { - MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize); - - mappedFile.setWrotePosition(this.mappedFileSize); - mappedFile.setFlushedPosition(this.mappedFileSize); - mappedFile.setCommittedPosition(this.mappedFileSize); - this.mappedFiles.add(mappedFile); - log.info("load " + file.getPath() + " OK"); - } catch (IOException e) { - log.error("load file " + file + " error", e); - return false; - } + public boolean doLoad(List files) { + // ascending order + Collections.sort(files); + for (File file : files) { + if (file.length() != this.mappedFileSize) { + log.warn(file + "\t" + file.length() + + " length not matched message store config value, ignore it"); + return true; } - } + try { + MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize); + + mappedFile.setWrotePosition(this.mappedFileSize); + mappedFile.setFlushedPosition(this.mappedFileSize); + mappedFile.setCommittedPosition(this.mappedFileSize); + this.mappedFiles.add(mappedFile); + log.info("load " + file.getPath() + " OK"); + } catch (IOException e) { + log.error("load file " + file + " error", e); + return false; + } + } return true; } @@ -204,33 +209,41 @@ public class MappedFileQueue { } if (createOffset != -1 && needCreate) { - String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset); - String nextNextFilePath = this.storePath + File.separator - + UtilAll.offset2FileName(createOffset + this.mappedFileSize); - MappedFile mappedFile = null; + return tryCreateMappedFile(createOffset); + } + + return mappedFileLast; + } + + protected MappedFile tryCreateMappedFile(long createOffset) { + String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset); + String nextNextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset + + this.mappedFileSize); + return doCreateMappedFile(nextFilePath, nextNextFilePath); + } - if (this.allocateMappedFileService != null) { - mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath, + protected MappedFile doCreateMappedFile(String nextFilePath, String nextNextFilePath) { + MappedFile mappedFile = null; + + if (this.allocateMappedFileService != null) { + mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath, nextNextFilePath, this.mappedFileSize); - } else { - try { - mappedFile = new MappedFile(nextFilePath, this.mappedFileSize); - } catch (IOException e) { - log.error("create mappedFile exception", e); - } + } else { + try { + mappedFile = new MappedFile(nextFilePath, this.mappedFileSize); + } catch (IOException e) { + log.error("create mappedFile exception", e); } + } - if (mappedFile != null) { - if (this.mappedFiles.isEmpty()) { - mappedFile.setFirstCreateInQueue(true); - } - this.mappedFiles.add(mappedFile); + if (mappedFile != null) { + if (this.mappedFiles.isEmpty()) { + mappedFile.setFirstCreateInQueue(true); } - - return mappedFile; + this.mappedFiles.add(mappedFile); } - return mappedFileLast; + return mappedFile; } public MappedFile getLastMappedFile(final long startOffset) { @@ -398,7 +411,7 @@ public class MappedFileQueue { destroy = maxOffsetInLogicQueue < offset; if (destroy) { log.info("physic min offset " + offset + ", logics in current mappedFile max offset " - + maxOffsetInLogicQueue + ", delete it"); + + maxOffsetInLogicQueue + ", delete it"); } } else if (!mappedFile.isAvailable()) { // Handle hanged file. log.warn("Found a hanged consume queue file, attempting to delete it."); @@ -466,7 +479,7 @@ public class MappedFileQueue { if (firstMappedFile != null && lastMappedFile != null) { if (offset < firstMappedFile.getFileFromOffset() || offset >= lastMappedFile.getFileFromOffset() + this.mappedFileSize) { LOG_ERROR.warn("Offset not matched. Request offset: {}, firstOffset: {}, lastOffset: {}, mappedFileSize: {}, mappedFiles count: {}", - offset, + offset, firstMappedFile.getFileFromOffset(), lastMappedFile.getFileFromOffset() + this.mappedFileSize, this.mappedFileSize, @@ -480,7 +493,7 @@ public class MappedFileQueue { } if (targetFile != null && offset >= targetFile.getFileFromOffset() - && offset < targetFile.getFileFromOffset() + this.mappedFileSize) { + && offset < targetFile.getFileFromOffset() + this.mappedFileSize) { return targetFile; } diff --git a/store/src/main/java/org/apache/rocketmq/store/MultiPathMappedFileQueue.java b/store/src/main/java/org/apache/rocketmq/store/MultiPathMappedFileQueue.java new file mode 100644 index 0000000000000000000000000000000000000000..d7a413228c894aeb8eacf5a3e6d05116988f4e9f --- /dev/null +++ b/store/src/main/java/org/apache/rocketmq/store/MultiPathMappedFileQueue.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.store; + + +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; +import java.util.function.Supplier; +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.store.config.MessageStoreConfig; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +public class MultiPathMappedFileQueue extends MappedFileQueue { + + private final MessageStoreConfig config; + private final Supplier> fullStorePathsSupplier; + + public MultiPathMappedFileQueue(MessageStoreConfig messageStoreConfig, int mappedFileSize, + AllocateMappedFileService allocateMappedFileService, + Supplier> fullStorePathsSupplier) { + super(messageStoreConfig.getStorePathCommitLog(), mappedFileSize, allocateMappedFileService); + this.config = messageStoreConfig; + this.fullStorePathsSupplier = fullStorePathsSupplier; + } + + private Set getPaths() { + String[] paths = config.getStorePathCommitLog().trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER); + return new HashSet<>(Arrays.asList(paths)); + } + + private Set getReadonlyPaths() { + String pathStr = config.getReadOnlyCommitLogStorePaths(); + if (StringUtils.isBlank(pathStr)) { + return Collections.emptySet(); + } + String[] paths = pathStr.trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER); + return new HashSet<>(Arrays.asList(paths)); + } + + @Override + public boolean load() { + Set storePathSet = getPaths(); + storePathSet.addAll(getReadonlyPaths()); + + List files = new ArrayList<>(); + for (String path : storePathSet) { + File dir = new File(path); + File[] ls = dir.listFiles(); + if (ls != null) { + Collections.addAll(files, ls); + } + } + + return doLoad(files); + } + + @Override + protected MappedFile tryCreateMappedFile(long createOffset) { + long fileIdx = createOffset / this.mappedFileSize; + Set storePath = getPaths(); + Set readonlyPathSet = getReadonlyPaths(); + Set fullStorePaths = + fullStorePathsSupplier == null ? Collections.emptySet() : fullStorePathsSupplier.get(); + + + HashSet availableStorePath = new HashSet<>(storePath); + //do not create file in readonly store path. + availableStorePath.removeAll(readonlyPathSet); + + //do not create file is space is nearly full. + availableStorePath.removeAll(fullStorePaths); + + //if no store path left, fall back to wriable store path. + if (availableStorePath.isEmpty()) { + availableStorePath = new HashSet<>(storePath); + availableStorePath.removeAll(readonlyPathSet); + } + + String[] paths = availableStorePath.toArray(new String[]{}); + Arrays.sort(paths); + String nextFilePath = paths[(int) (fileIdx % paths.length)] + File.separator + + UtilAll.offset2FileName(createOffset); + String nextNextFilePath = paths[(int) ((fileIdx + 1) % paths.length)] + File.separator + + UtilAll.offset2FileName(createOffset + this.mappedFileSize); + return doCreateMappedFile(nextFilePath, nextNextFilePath); + } + + @Override + public void destroy() { + for (MappedFile mf : this.mappedFiles) { + mf.destroy(1000 * 3); + } + this.mappedFiles.clear(); + this.flushedWhere = 0; + + + Set storePathSet = getPaths(); + storePathSet.addAll(getReadonlyPaths()); + + for (String path : storePathSet) { + File file = new File(path); + if (file.isDirectory()) { + file.delete(); + } + } + } +} diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java index 848fe22decdb211064003ae1e7c6b7cf832da55d..113fba093c6c4bf8af1bc05c7cb138f6314e985f 100644 --- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java +++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java @@ -17,10 +17,17 @@ package org.apache.rocketmq.store.config; import java.io.File; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + import org.apache.rocketmq.common.annotation.ImportantField; import org.apache.rocketmq.store.ConsumeQueue; public class MessageStoreConfig { + + public static final String MULTI_PATH_SPLITTER = ":"; + //The root directory in which the log data is kept @ImportantField private String storePathRootDir = System.getProperty("user.home") + File.separator + "store"; @@ -30,6 +37,8 @@ public class MessageStoreConfig { private String storePathCommitLog = System.getProperty("user.home") + File.separator + "store" + File.separator + "commitlog"; + private String readOnlyCommitLogStorePaths = null; + // CommitLog file size,default is 1G private int mappedFileSizeCommitLog = 1024 * 1024 * 1024; // ConsumeQueue file size,default is 30W @@ -676,6 +685,13 @@ public class MessageStoreConfig { this.commitCommitLogThoroughInterval = commitCommitLogThoroughInterval; } + public String getReadOnlyCommitLogStorePaths() { + return readOnlyCommitLogStorePaths; + } + + public void setReadOnlyCommitLogStorePaths(String readOnlyCommitLogStorePaths) { + this.readOnlyCommitLogStorePaths = readOnlyCommitLogStorePaths; + } public String getdLegerGroup() { return dLegerGroup; } diff --git a/store/src/test/java/org/apache/rocketmq/store/MultiPathMappedFileQueueTest.java b/store/src/test/java/org/apache/rocketmq/store/MultiPathMappedFileQueueTest.java new file mode 100644 index 0000000000000000000000000000000000000000..d299df15987a0c29dc9633f78397c33b48272a7d --- /dev/null +++ b/store/src/test/java/org/apache/rocketmq/store/MultiPathMappedFileQueueTest.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.store; + +import static org.assertj.core.api.Assertions.assertThat; +import java.util.HashSet; +import java.util.Set; +import org.apache.rocketmq.store.config.MessageStoreConfig; +import org.junit.Test; + + +public class MultiPathMappedFileQueueTest { + + @Test + public void testGetLastMappedFile() { + final byte[] fixedMsg = new byte[1024]; + + MessageStoreConfig config = new MessageStoreConfig(); + config.setStorePathCommitLog("target/unit_test_store/a/:target/unit_test_store/b/:target/unit_test_store/c/"); + MappedFileQueue mappedFileQueue = new MultiPathMappedFileQueue(config, 1024, null, null); + String[] storePaths = config.getStorePathCommitLog().trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER); + for (int i = 0; i < 1024; i++) { + MappedFile mappedFile = mappedFileQueue.getLastMappedFile(fixedMsg.length * i); + assertThat(mappedFile).isNotNull(); + assertThat(mappedFile.appendMessage(fixedMsg)).isTrue(); + int idx = i % storePaths.length; + assertThat(mappedFile.getFileName().startsWith(storePaths[idx])).isTrue(); + } + mappedFileQueue.shutdown(1000); + mappedFileQueue.destroy(); + } + + @Test + public void testLoadReadOnlyMappedFiles() { + { + //create old mapped files + final byte[] fixedMsg = new byte[1024]; + MessageStoreConfig config = new MessageStoreConfig(); + config.setStorePathCommitLog( + "target/unit_test_store/a/:target/unit_test_store/b/:target/unit_test_store/c/"); + MappedFileQueue mappedFileQueue = new MultiPathMappedFileQueue(config, 1024, null, null); + String[] storePaths = config.getStorePathCommitLog().trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER); + for (int i = 0; i < 1024; i++) { + MappedFile mappedFile = mappedFileQueue.getLastMappedFile(fixedMsg.length * i); + assertThat(mappedFile).isNotNull(); + assertThat(mappedFile.appendMessage(fixedMsg)).isTrue(); + int idx = i % storePaths.length; + assertThat(mappedFile.getFileName().startsWith(storePaths[idx])).isTrue(); + } + mappedFileQueue.shutdown(1000); + } + + // test load and readonly + MessageStoreConfig config = new MessageStoreConfig(); + config.setStorePathCommitLog("target/unit_test_store/b/"); + config.setReadOnlyCommitLogStorePaths("target/unit_test_store/a:target/unit_test_store/c"); + MultiPathMappedFileQueue mappedFileQueue = new MultiPathMappedFileQueue(config, 1024, null, null); + + mappedFileQueue.load(); + + assertThat(mappedFileQueue.mappedFiles.size()).isEqualTo(1025); + mappedFileQueue.destroy(); + + } + + @Test + public void testUpdatePathsOnline() { + final byte[] fixedMsg = new byte[1024]; + + MessageStoreConfig config = new MessageStoreConfig(); + config.setStorePathCommitLog("target/unit_test_store/a/:target/unit_test_store/b/:target/unit_test_store/c/"); + MappedFileQueue mappedFileQueue = new MultiPathMappedFileQueue(config, 1024, null, null); + String[] storePaths = config.getStorePathCommitLog().trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER); + for (int i = 0; i < 1024; i++) { + MappedFile mappedFile = mappedFileQueue.getLastMappedFile(fixedMsg.length * i); + assertThat(mappedFile).isNotNull(); + assertThat(mappedFile.appendMessage(fixedMsg)).isTrue(); + int idx = i % storePaths.length; + assertThat(mappedFile.getFileName().startsWith(storePaths[idx])).isTrue(); + + if (i == 500) { + config.setStorePathCommitLog("target/unit_test_store/a/:target/unit_test_store/b/"); + storePaths = config.getStorePathCommitLog().trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER); + } + } + mappedFileQueue.shutdown(1000); + mappedFileQueue.destroy(); + } + + @Test + public void testFullStorePath() { + final byte[] fixedMsg = new byte[1024]; + + Set fullStorePath = new HashSet<>(); + MessageStoreConfig config = new MessageStoreConfig(); + config.setStorePathCommitLog("target/unit_test_store/a/:target/unit_test_store/b/:target/unit_test_store/c/"); + MappedFileQueue mappedFileQueue = new MultiPathMappedFileQueue(config, 1024, null, () -> fullStorePath); + String[] storePaths = config.getStorePathCommitLog().trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER); + assertThat(storePaths.length).isEqualTo(3); + + MappedFile mappedFile = mappedFileQueue.getLastMappedFile(0); + assertThat(mappedFile).isNotNull(); + assertThat(mappedFile.appendMessage(fixedMsg)).isTrue(); + assertThat(mappedFile.getFileName().startsWith(storePaths[0])).isTrue(); + + mappedFile = mappedFileQueue.getLastMappedFile(fixedMsg.length); + assertThat(mappedFile.getFileName().startsWith(storePaths[1])).isTrue(); + assertThat(mappedFile.appendMessage(fixedMsg)).isTrue(); + mappedFile = mappedFileQueue.getLastMappedFile(fixedMsg.length * 2); + assertThat(mappedFile.appendMessage(fixedMsg)).isTrue(); + assertThat(mappedFile.getFileName().startsWith(storePaths[2])).isTrue(); + + fullStorePath.add("target/unit_test_store/b/"); + mappedFile = mappedFileQueue.getLastMappedFile(fixedMsg.length * 3); + assertThat(mappedFile.appendMessage(fixedMsg)).isTrue(); + assertThat(mappedFile.getFileName().startsWith(storePaths[2])).isTrue(); + + mappedFile = mappedFileQueue.getLastMappedFile(fixedMsg.length * 4); + assertThat(mappedFile.appendMessage(fixedMsg)).isTrue(); + assertThat(mappedFile.getFileName().startsWith(storePaths[0])).isTrue(); + + mappedFileQueue.shutdown(1000); + mappedFileQueue.destroy(); + } +} \ No newline at end of file