diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java index 3a401e1a47755886569785ac1f6b81eb61134f12..c4f470cd568216c4dbe547b08594e31247bd57ec 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java @@ -56,6 +56,7 @@ import org.apache.rocketmq.remoting.netty.RemotingResponseCallback; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.store.MessageExtBrokerInner; import org.apache.rocketmq.store.PutMessageResult; +import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.config.StorePathConfigHelper; import org.apache.rocketmq.store.stats.BrokerStatsManager; @@ -186,7 +187,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement } } - if (msgExt.getReconsumeTimes() >= maxReconsumeTimes + if (msgExt.getReconsumeTimes() >= maxReconsumeTimes || delayLevel < 0) { newTopic = MixAll.getDLQTopic(requestHeader.getGroup()); queueIdInt = ThreadLocalRandom.current().nextInt(99999999) % DLQ_NUMS_PER_GROUP; @@ -636,8 +637,12 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement } private String diskUtil() { - String storePathPhysic = this.brokerController.getMessageStoreConfig().getStorePathCommitLog(); - double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic); + double physicRatio = 100; + String storePath = this.brokerController.getMessageStoreConfig().getStorePathCommitLog(); + String[] paths = storePath.trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER); + for (String storePathPhysic : paths) { + physicRatio = Math.min(physicRatio, UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic)); + } String storePathLogis = StorePathConfigHelper.getStorePathConsumeQueue(this.brokerController.getMessageStoreConfig().getStorePathRootDir()); diff --git a/common/src/main/java/org/apache/rocketmq/common/UtilAll.java b/common/src/main/java/org/apache/rocketmq/common/UtilAll.java index 457deb8dba8dcf347e1650aa0a11c359458eb56d..9971f334342c696bcd2c20601fcd9524d6a90e8d 100644 --- a/common/src/main/java/org/apache/rocketmq/common/UtilAll.java +++ b/common/src/main/java/org/apache/rocketmq/common/UtilAll.java @@ -198,6 +198,11 @@ public class UtilAll { cal.get(Calendar.SECOND)); } + public static boolean isPathExists(final String path) { + File file = new File(path); + return file.exists(); + } + public static double getDiskPartitionSpaceUsedPercent(final String path) { if (null == path || path.isEmpty()) { log.error("Error when measuring disk space usage, path is null or empty, path : {}", path); diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java index 32e9a657d1eb4c6d966bf98669fc9cc18e325ea5..3a89dc849954c49c0b5071d8985c5fa4fc7a7291 100644 --- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java @@ -22,10 +22,12 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.ByteBuffer; +import java.util.Collections; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; @@ -43,6 +45,7 @@ import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.store.config.BrokerRole; import org.apache.rocketmq.store.config.FlushDiskType; +import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.ha.HAService; import org.apache.rocketmq.store.schedule.ScheduleMessageService; @@ -71,9 +74,20 @@ public class CommitLog { protected final PutMessageLock putMessageLock; + private volatile Set fullStorePaths = Collections.emptySet(); + public CommitLog(final DefaultMessageStore defaultMessageStore) { - this.mappedFileQueue = new MappedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(), - defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(), defaultMessageStore.getAllocateMappedFileService()); + String storePath = defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(); + if (storePath.contains(MessageStoreConfig.MULTI_PATH_SPLITTER)) { + this.mappedFileQueue = new MultiPathMappedFileQueue(defaultMessageStore.getMessageStoreConfig(), + defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(), + defaultMessageStore.getAllocateMappedFileService(), this::getFullStorePaths); + } else { + this.mappedFileQueue = new MappedFileQueue(storePath, + defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(), + defaultMessageStore.getAllocateMappedFileService()); + } + this.defaultMessageStore = defaultMessageStore; if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { @@ -95,6 +109,14 @@ public class CommitLog { } + public void setFullStorePaths(Set fullStorePaths) { + this.fullStorePaths = fullStorePaths; + } + + public Set getFullStorePaths() { + return fullStorePaths; + } + public boolean load() { boolean result = this.mappedFileQueue.load(); log.info("load commit log " + (result ? "OK" : "Failed")); diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index 81344c0e7b3b35307946f7e1eeddeb0ec2c51dd3..bf61789d08900644669057701b74838b3b056610 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -26,6 +26,7 @@ import java.nio.ByteBuffer; import java.nio.channels.FileLock; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.LinkedList; import java.util.Map; @@ -784,15 +785,22 @@ public class DefaultMessageStore implements MessageStore { HashMap result = this.storeStatsService.getRuntimeInfo(); { - double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(getStorePathPhysic()); - result.put(RunningStats.commitLogDiskRatio.name(), String.valueOf(physicRatio)); - + double minPhysicsUsedRatio = Double.MAX_VALUE; + String commitLogStorePath = getStorePathPhysic(); + String[] paths = commitLogStorePath.trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER); + for (String clPath : paths) { + double physicRatio = UtilAll.isPathExists(clPath) ? + UtilAll.getDiskPartitionSpaceUsedPercent(clPath) : -1; + result.put(RunningStats.commitLogDiskRatio.name() + "_" + clPath, String.valueOf(physicRatio)); + minPhysicsUsedRatio = Math.min(minPhysicsUsedRatio, physicRatio); + } + result.put(RunningStats.commitLogDiskRatio.name(), String.valueOf(minPhysicsUsedRatio)); } { - String storePathLogics = StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()); - double logicsRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogics); + double logicsRatio = UtilAll.isPathExists(storePathLogics) ? + UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogics) : -1; result.put(RunningStats.consumeQueueDiskRatio.name(), String.valueOf(logicsRatio)); } @@ -1651,25 +1659,43 @@ public class DefaultMessageStore implements MessageStore { cleanImmediately = false; { - double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(getStorePathPhysic()); - if (physicRatio > diskSpaceWarningLevelRatio) { + String commitLogStorePath = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog(); + String[] storePaths = commitLogStorePath.trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER); + Set fullStorePath = new HashSet<>(); + double minPhysicRatio = 100; + String minStorePath = null; + for (String storePathPhysic : storePaths) { + double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic); + if (minPhysicRatio > physicRatio) { + minPhysicRatio = physicRatio; + minStorePath = storePathPhysic; + } + if (physicRatio > diskSpaceCleanForciblyRatio) { + fullStorePath.add(storePathPhysic); + } + } + DefaultMessageStore.this.commitLog.setFullStorePaths(fullStorePath); + if (minPhysicRatio > diskSpaceWarningLevelRatio) { boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull(); if (diskok) { - DefaultMessageStore.log.error("physic disk maybe full soon " + physicRatio + ", so mark disk full"); + DefaultMessageStore.log.error("physic disk maybe full soon " + minPhysicRatio + + ", so mark disk full, storePathPhysic=" + minStorePath); } cleanImmediately = true; - } else if (physicRatio > diskSpaceCleanForciblyRatio) { + } else if (minPhysicRatio > diskSpaceCleanForciblyRatio) { cleanImmediately = true; } else { boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK(); if (!diskok) { - DefaultMessageStore.log.info("physic disk space OK " + physicRatio + ", so mark disk ok"); + DefaultMessageStore.log.info("physic disk space OK " + minPhysicRatio + + ", so mark disk ok, storePathPhysic=" + minStorePath); } } - if (physicRatio < 0 || physicRatio > ratio) { - DefaultMessageStore.log.info("physic disk maybe full soon, so reclaim space, " + physicRatio); + if (minPhysicRatio < 0 || minPhysicRatio > ratio) { + DefaultMessageStore.log.info("physic disk maybe full soon, so reclaim space, " + + minPhysicRatio + ", storePathPhysic=" + minStorePath); return true; } } @@ -1710,8 +1736,27 @@ public class DefaultMessageStore implements MessageStore { public void setManualDeleteFileSeveralTimes(int manualDeleteFileSeveralTimes) { this.manualDeleteFileSeveralTimes = manualDeleteFileSeveralTimes; } + + public double calcStorePathPhysicRatio() { + Set fullStorePath = new HashSet<>(); + String storePath = getStorePathPhysic(); + String[] paths = storePath.trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER); + double minPhysicRatio = 100; + for (String path : paths) { + double physicRatio = UtilAll.isPathExists(path) ? + UtilAll.getDiskPartitionSpaceUsedPercent(path) : -1; + minPhysicRatio = Math.min(minPhysicRatio, physicRatio); + if (physicRatio > diskSpaceCleanForciblyRatio) { + fullStorePath.add(path); + } + } + DefaultMessageStore.this.commitLog.setFullStorePaths(fullStorePath); + return minPhysicRatio; + + } + public boolean isSpaceFull() { - double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(getStorePathPhysic()); + double physicRatio = calcStorePathPhysicRatio(); double ratio = DefaultMessageStore.this.getMessageStoreConfig().getDiskMaxUsedSpaceRatio() / 100.0; if (physicRatio > ratio) { DefaultMessageStore.log.info("physic disk of commitLog used: " + physicRatio); diff --git a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java index cc145921cef90239ca13dc5ff9b3714e2f4a27c2..1f25e61225aa5efcd1c377ebefe36b2b821ac83c 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java @@ -20,6 +20,7 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.ListIterator; @@ -37,13 +38,13 @@ public class MappedFileQueue { private final String storePath; - private final int mappedFileSize; + protected final int mappedFileSize; - private final CopyOnWriteArrayList mappedFiles = new CopyOnWriteArrayList(); + protected final CopyOnWriteArrayList mappedFiles = new CopyOnWriteArrayList(); private final AllocateMappedFileService allocateMappedFileService; - private long flushedWhere = 0; + protected long flushedWhere = 0; private long committedWhere = 0; private volatile long storeTimestamp = 0; @@ -144,35 +145,40 @@ public class MappedFileQueue { } } + public boolean load() { File dir = new File(this.storePath); - File[] files = dir.listFiles(); - if (files != null) { - // ascending order - Arrays.sort(files); - for (File file : files) { - - if (file.length() != this.mappedFileSize) { - log.warn(file + "\t" + file.length() - + " length not matched message store config value, please check it manually"); - return false; - } + File[] ls = dir.listFiles(); + if (ls != null) { + return doLoad(Arrays.asList(ls)); + } + return true; + } - try { - MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize); - - mappedFile.setWrotePosition(this.mappedFileSize); - mappedFile.setFlushedPosition(this.mappedFileSize); - mappedFile.setCommittedPosition(this.mappedFileSize); - this.mappedFiles.add(mappedFile); - log.info("load " + file.getPath() + " OK"); - } catch (IOException e) { - log.error("load file " + file + " error", e); - return false; - } + public boolean doLoad(List files) { + // ascending order + files.sort(Comparator.comparing(File::getName)); + + for (File file : files) { + if (file.length() != this.mappedFileSize) { + log.warn(file + "\t" + file.length() + + " length not matched message store config value, ignore it"); + return true; } - } + try { + MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize); + + mappedFile.setWrotePosition(this.mappedFileSize); + mappedFile.setFlushedPosition(this.mappedFileSize); + mappedFile.setCommittedPosition(this.mappedFileSize); + this.mappedFiles.add(mappedFile); + log.info("load " + file.getPath() + " OK"); + } catch (IOException e) { + log.error("load file " + file + " error", e); + return false; + } + } return true; } @@ -204,33 +210,41 @@ public class MappedFileQueue { } if (createOffset != -1 && needCreate) { - String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset); - String nextNextFilePath = this.storePath + File.separator - + UtilAll.offset2FileName(createOffset + this.mappedFileSize); - MappedFile mappedFile = null; + return tryCreateMappedFile(createOffset); + } - if (this.allocateMappedFileService != null) { - mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath, + return mappedFileLast; + } + + protected MappedFile tryCreateMappedFile(long createOffset) { + String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset); + String nextNextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset + + this.mappedFileSize); + return doCreateMappedFile(nextFilePath, nextNextFilePath); + } + + protected MappedFile doCreateMappedFile(String nextFilePath, String nextNextFilePath) { + MappedFile mappedFile = null; + + if (this.allocateMappedFileService != null) { + mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath, nextNextFilePath, this.mappedFileSize); - } else { - try { - mappedFile = new MappedFile(nextFilePath, this.mappedFileSize); - } catch (IOException e) { - log.error("create mappedFile exception", e); - } + } else { + try { + mappedFile = new MappedFile(nextFilePath, this.mappedFileSize); + } catch (IOException e) { + log.error("create mappedFile exception", e); } + } - if (mappedFile != null) { - if (this.mappedFiles.isEmpty()) { - mappedFile.setFirstCreateInQueue(true); - } - this.mappedFiles.add(mappedFile); + if (mappedFile != null) { + if (this.mappedFiles.isEmpty()) { + mappedFile.setFirstCreateInQueue(true); } - - return mappedFile; + this.mappedFiles.add(mappedFile); } - return mappedFileLast; + return mappedFile; } public MappedFile getLastMappedFile(final long startOffset) { diff --git a/store/src/main/java/org/apache/rocketmq/store/MultiPathMappedFileQueue.java b/store/src/main/java/org/apache/rocketmq/store/MultiPathMappedFileQueue.java new file mode 100644 index 0000000000000000000000000000000000000000..669698ff4e99c5e1e5b43a4baefa3ac70213407a --- /dev/null +++ b/store/src/main/java/org/apache/rocketmq/store/MultiPathMappedFileQueue.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.store; + + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; +import java.util.function.Supplier; +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.store.config.MessageStoreConfig; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +public class MultiPathMappedFileQueue extends MappedFileQueue { + + private final MessageStoreConfig config; + private final Supplier> fullStorePathsSupplier; + + public MultiPathMappedFileQueue(MessageStoreConfig messageStoreConfig, int mappedFileSize, + AllocateMappedFileService allocateMappedFileService, + Supplier> fullStorePathsSupplier) { + super(messageStoreConfig.getStorePathCommitLog(), mappedFileSize, allocateMappedFileService); + this.config = messageStoreConfig; + this.fullStorePathsSupplier = fullStorePathsSupplier; + } + + private Set getPaths() { + String[] paths = config.getStorePathCommitLog().trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER); + return new HashSet<>(Arrays.asList(paths)); + } + + private Set getReadonlyPaths() { + String pathStr = config.getReadOnlyCommitLogStorePaths(); + if (StringUtils.isBlank(pathStr)) { + return Collections.emptySet(); + } + String[] paths = pathStr.trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER); + return new HashSet<>(Arrays.asList(paths)); + } + + @Override + public boolean load() { + Set storePathSet = getPaths(); + storePathSet.addAll(getReadonlyPaths()); + + List files = new ArrayList<>(); + for (String path : storePathSet) { + File dir = new File(path); + File[] ls = dir.listFiles(); + if (ls != null) { + Collections.addAll(files, ls); + } + } + + return doLoad(files); + } + + @Override + protected MappedFile tryCreateMappedFile(long createOffset) { + long fileIdx = createOffset / this.mappedFileSize; + Set storePath = getPaths(); + Set readonlyPathSet = getReadonlyPaths(); + Set fullStorePaths = + fullStorePathsSupplier == null ? Collections.emptySet() : fullStorePathsSupplier.get(); + + + HashSet availableStorePath = new HashSet<>(storePath); + //do not create file in readonly store path. + availableStorePath.removeAll(readonlyPathSet); + + //do not create file is space is nearly full. + availableStorePath.removeAll(fullStorePaths); + + //if no store path left, fall back to writable store path. + if (availableStorePath.isEmpty()) { + availableStorePath = new HashSet<>(storePath); + availableStorePath.removeAll(readonlyPathSet); + } + + String[] paths = availableStorePath.toArray(new String[]{}); + Arrays.sort(paths); + String nextFilePath = paths[(int) (fileIdx % paths.length)] + File.separator + + UtilAll.offset2FileName(createOffset); + String nextNextFilePath = paths[(int) ((fileIdx + 1) % paths.length)] + File.separator + + UtilAll.offset2FileName(createOffset + this.mappedFileSize); + return doCreateMappedFile(nextFilePath, nextNextFilePath); + } + + @Override + public void destroy() { + for (MappedFile mf : this.mappedFiles) { + mf.destroy(1000 * 3); + } + this.mappedFiles.clear(); + this.flushedWhere = 0; + + + Set storePathSet = getPaths(); + storePathSet.addAll(getReadonlyPaths()); + + for (String path : storePathSet) { + File file = new File(path); + if (file.isDirectory()) { + file.delete(); + } + } + } +} diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java index 848fe22decdb211064003ae1e7c6b7cf832da55d..e1439a0c9d8260be9c7e9fe949b364726d9f84b2 100644 --- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java +++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java @@ -17,10 +17,14 @@ package org.apache.rocketmq.store.config; import java.io.File; + import org.apache.rocketmq.common.annotation.ImportantField; import org.apache.rocketmq.store.ConsumeQueue; public class MessageStoreConfig { + + public static final String MULTI_PATH_SPLITTER = System.getProperty("rocketmq.broker.multiPathSplitter", ","); + //The root directory in which the log data is kept @ImportantField private String storePathRootDir = System.getProperty("user.home") + File.separator + "store"; @@ -30,6 +34,8 @@ public class MessageStoreConfig { private String storePathCommitLog = System.getProperty("user.home") + File.separator + "store" + File.separator + "commitlog"; + private String readOnlyCommitLogStorePaths = null; + // CommitLog file size,default is 1G private int mappedFileSizeCommitLog = 1024 * 1024 * 1024; // ConsumeQueue file size,default is 30W @@ -676,6 +682,13 @@ public class MessageStoreConfig { this.commitCommitLogThoroughInterval = commitCommitLogThoroughInterval; } + public String getReadOnlyCommitLogStorePaths() { + return readOnlyCommitLogStorePaths; + } + + public void setReadOnlyCommitLogStorePaths(String readOnlyCommitLogStorePaths) { + this.readOnlyCommitLogStorePaths = readOnlyCommitLogStorePaths; + } public String getdLegerGroup() { return dLegerGroup; } diff --git a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java index 7455dfea3c811381d86a7e733aea0394479eec7e..69c1673ee48b792b7b63513297a4e851ff2a7867 100644 --- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java @@ -94,6 +94,41 @@ public class DefaultMessageStoreCleanFilesTest { } + @Test + public void testIsSpaceFullMultiCommitLogStorePath() throws Exception { + String deleteWhen = "04"; + // the min value of diskMaxUsedSpaceRatio. + int diskMaxUsedSpaceRatio = 1; + // used to set disk-full flag + double diskSpaceCleanForciblyRatio = 0.01D; + MessageStoreConfig config = genMessageStoreConfig(deleteWhen, diskMaxUsedSpaceRatio); + String storePath = config.getStorePathCommitLog(); + StringBuilder storePathBuilder = new StringBuilder(); + for (int i = 0; i < 3; i++) { + storePathBuilder.append(storePath).append(i).append(MessageStoreConfig.MULTI_PATH_SPLITTER); + } + config.setStorePathCommitLog(storePathBuilder.toString()); + String[] paths = config.getStorePathCommitLog().trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER); + assertEquals(3, paths.length); + initMessageStore(config, diskSpaceCleanForciblyRatio); + + + + // build and put 55 messages, exactly one message per CommitLog file. + buildAndPutMessagesToMessageStore(msgCount); + MappedFileQueue commitLogQueue = getMappedFileQueueCommitLog(); + assertEquals(fileCountCommitLog, commitLogQueue.getMappedFiles().size()); + int fileCountConsumeQueue = getFileCountConsumeQueue(); + MappedFileQueue consumeQueue = getMappedFileQueueConsumeQueue(); + assertEquals(fileCountConsumeQueue, consumeQueue.getMappedFiles().size()); + cleanCommitLogService.isSpaceFull(); + + assertEquals(1 << 4, messageStore.getRunningFlags().getFlagBits() & (1 << 4)); + messageStore.shutdown(); + messageStore.destroy(); + + } + @Test public void testIsSpaceFullFunctionFull2Empty() throws Exception { String deleteWhen = "04"; @@ -421,6 +456,10 @@ public class DefaultMessageStoreCleanFilesTest { } private void initMessageStore(String deleteWhen, int diskMaxUsedSpaceRatio, double diskSpaceCleanForciblyRatio) throws Exception { + initMessageStore(genMessageStoreConfig(deleteWhen,diskMaxUsedSpaceRatio), diskSpaceCleanForciblyRatio); + } + + private MessageStoreConfig genMessageStoreConfig(String deleteWhen, int diskMaxUsedSpaceRatio) { MessageStoreConfig messageStoreConfig = new MessageStoreConfigForTest(); messageStoreConfig.setMappedFileSizeCommitLog(mappedFileSize); messageStoreConfig.setMappedFileSizeConsumeQueue(mappedFileSize); @@ -442,7 +481,10 @@ public class DefaultMessageStoreCleanFilesTest { String storePathCommitLog = storePathRootDir + File.separator + "commitlog"; messageStoreConfig.setStorePathRootDir(storePathRootDir); messageStoreConfig.setStorePathCommitLog(storePathCommitLog); + return messageStoreConfig; + } + private void initMessageStore(MessageStoreConfig messageStoreConfig, double diskSpaceCleanForciblyRatio) throws Exception { messageStore = new DefaultMessageStore(messageStoreConfig, new BrokerStatsManager("test"), new MyMessageArrivingListener(), new BrokerConfig()); diff --git a/store/src/test/java/org/apache/rocketmq/store/MultiPathMappedFileQueueTest.java b/store/src/test/java/org/apache/rocketmq/store/MultiPathMappedFileQueueTest.java new file mode 100644 index 0000000000000000000000000000000000000000..66b3f93b01f8db4449b427dfdeaa087523254d7c --- /dev/null +++ b/store/src/test/java/org/apache/rocketmq/store/MultiPathMappedFileQueueTest.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.store; + +import static org.assertj.core.api.Assertions.assertThat; +import java.util.HashSet; +import java.util.Set; +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.store.config.MessageStoreConfig; +import org.junit.Test; + + +public class MultiPathMappedFileQueueTest { + + @Test + public void testGetLastMappedFile() { + final byte[] fixedMsg = new byte[1024]; + + MessageStoreConfig config = new MessageStoreConfig(); + config.setStorePathCommitLog("target/unit_test_store/a/" + MessageStoreConfig.MULTI_PATH_SPLITTER + + "target/unit_test_store/b/" + MessageStoreConfig.MULTI_PATH_SPLITTER + + "target/unit_test_store/c/"); + MappedFileQueue mappedFileQueue = new MultiPathMappedFileQueue(config, 1024, null, null); + String[] storePaths = config.getStorePathCommitLog().trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER); + for (int i = 0; i < 1024; i++) { + MappedFile mappedFile = mappedFileQueue.getLastMappedFile(fixedMsg.length * i); + assertThat(mappedFile).isNotNull(); + assertThat(mappedFile.appendMessage(fixedMsg)).isTrue(); + int idx = i % storePaths.length; + assertThat(mappedFile.getFileName().startsWith(storePaths[idx])).isTrue(); + } + mappedFileQueue.shutdown(1000); + mappedFileQueue.destroy(); + } + + @Test + public void testLoadReadOnlyMappedFiles() { + { + //create old mapped files + final byte[] fixedMsg = new byte[1024]; + MessageStoreConfig config = new MessageStoreConfig(); + config.setStorePathCommitLog("target/unit_test_store/a/" + MessageStoreConfig.MULTI_PATH_SPLITTER + + "target/unit_test_store/b/" + MessageStoreConfig.MULTI_PATH_SPLITTER + + "target/unit_test_store/c/"); + MappedFileQueue mappedFileQueue = new MultiPathMappedFileQueue(config, 1024, null, null); + String[] storePaths = config.getStorePathCommitLog().trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER); + for (int i = 0; i < 1024; i++) { + MappedFile mappedFile = mappedFileQueue.getLastMappedFile(fixedMsg.length * i); + assertThat(mappedFile).isNotNull(); + assertThat(mappedFile.appendMessage(fixedMsg)).isTrue(); + int idx = i % storePaths.length; + assertThat(mappedFile.getFileName().startsWith(storePaths[idx])).isTrue(); + } + mappedFileQueue.shutdown(1000); + } + + // test load and readonly + MessageStoreConfig config = new MessageStoreConfig(); + config.setStorePathCommitLog("target/unit_test_store/b/"); + config.setReadOnlyCommitLogStorePaths("target/unit_test_store/a" + MessageStoreConfig.MULTI_PATH_SPLITTER + + "target/unit_test_store/c"); + MultiPathMappedFileQueue mappedFileQueue = new MultiPathMappedFileQueue(config, 1024, null, null); + + mappedFileQueue.load(); + + assertThat(mappedFileQueue.mappedFiles.size()).isEqualTo(1024); + for (int i = 0; i < 1024; i++) { + assertThat(mappedFileQueue.mappedFiles.get(i).getFile().getName()) + .isEqualTo(UtilAll.offset2FileName(1024 * i)); + } + mappedFileQueue.destroy(); + + } + + @Test + public void testUpdatePathsOnline() { + final byte[] fixedMsg = new byte[1024]; + + MessageStoreConfig config = new MessageStoreConfig(); + config.setStorePathCommitLog("target/unit_test_store/a/" + MessageStoreConfig.MULTI_PATH_SPLITTER + + "target/unit_test_store/b/" + MessageStoreConfig.MULTI_PATH_SPLITTER + + "target/unit_test_store/c/"); + MappedFileQueue mappedFileQueue = new MultiPathMappedFileQueue(config, 1024, null, null); + String[] storePaths = config.getStorePathCommitLog().trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER); + for (int i = 0; i < 1024; i++) { + MappedFile mappedFile = mappedFileQueue.getLastMappedFile(fixedMsg.length * i); + assertThat(mappedFile).isNotNull(); + assertThat(mappedFile.appendMessage(fixedMsg)).isTrue(); + int idx = i % storePaths.length; + assertThat(mappedFile.getFileName().startsWith(storePaths[idx])).isTrue(); + + if (i == 500) { + config.setStorePathCommitLog("target/unit_test_store/a/" + MessageStoreConfig.MULTI_PATH_SPLITTER + + "target/unit_test_store/b/"); + storePaths = config.getStorePathCommitLog().trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER); + } + } + mappedFileQueue.shutdown(1000); + mappedFileQueue.destroy(); + } + + @Test + public void testFullStorePath() { + final byte[] fixedMsg = new byte[1024]; + + Set fullStorePath = new HashSet<>(); + MessageStoreConfig config = new MessageStoreConfig(); + config.setStorePathCommitLog("target/unit_test_store/a/" + MessageStoreConfig.MULTI_PATH_SPLITTER + + "target/unit_test_store/b/" + MessageStoreConfig.MULTI_PATH_SPLITTER + + "target/unit_test_store/c/"); + MappedFileQueue mappedFileQueue = new MultiPathMappedFileQueue(config, 1024, null, () -> fullStorePath); + String[] storePaths = config.getStorePathCommitLog().trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER); + assertThat(storePaths.length).isEqualTo(3); + + MappedFile mappedFile = mappedFileQueue.getLastMappedFile(0); + assertThat(mappedFile).isNotNull(); + assertThat(mappedFile.appendMessage(fixedMsg)).isTrue(); + assertThat(mappedFile.getFileName().startsWith(storePaths[0])).isTrue(); + + mappedFile = mappedFileQueue.getLastMappedFile(fixedMsg.length); + assertThat(mappedFile.getFileName().startsWith(storePaths[1])).isTrue(); + assertThat(mappedFile.appendMessage(fixedMsg)).isTrue(); + mappedFile = mappedFileQueue.getLastMappedFile(fixedMsg.length * 2); + assertThat(mappedFile.appendMessage(fixedMsg)).isTrue(); + assertThat(mappedFile.getFileName().startsWith(storePaths[2])).isTrue(); + + fullStorePath.add("target/unit_test_store/b/"); + mappedFile = mappedFileQueue.getLastMappedFile(fixedMsg.length * 3); + assertThat(mappedFile.appendMessage(fixedMsg)).isTrue(); + assertThat(mappedFile.getFileName().startsWith(storePaths[2])).isTrue(); + + mappedFile = mappedFileQueue.getLastMappedFile(fixedMsg.length * 4); + assertThat(mappedFile.appendMessage(fixedMsg)).isTrue(); + assertThat(mappedFile.getFileName().startsWith(storePaths[0])).isTrue(); + + mappedFileQueue.shutdown(1000); + mappedFileQueue.destroy(); + } +} \ No newline at end of file