未验证 提交 5c7b113f 编写于 作者: Z Zhijia Cao 提交者: GitHub

[To rel/1.1] cp commits from master to solve the mods file OOM (#10928)

上级 780875b1
...@@ -257,6 +257,10 @@ public class IoTDBConstant { ...@@ -257,6 +257,10 @@ public class IoTDBConstant {
// compaction mods of previous version (<0.13) // compaction mods of previous version (<0.13)
public static final String COMPACTION_MODIFICATION_FILE_NAME_FROM_OLD = "merge.mods"; public static final String COMPACTION_MODIFICATION_FILE_NAME_FROM_OLD = "merge.mods";
public static final String SETTLE_SUFFIX = ".settle";
public static final String MODS_SETTLE_FILE_SUFFIX = ".mods.settle";
public static final String BLANK = "";
// write ahead log // write ahead log
public static final String WAL_FILE_PREFIX = "_"; public static final String WAL_FILE_PREFIX = "_";
public static final String WAL_FILE_SUFFIX = ".wal"; public static final String WAL_FILE_SUFFIX = ".wal";
......
...@@ -28,9 +28,16 @@ import org.slf4j.Logger; ...@@ -28,9 +28,16 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.File; import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List; import java.util.List;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import java.util.stream.Stream;
import static org.apache.iotdb.commons.conf.IoTDBConstant.BLANK;
import static org.apache.iotdb.commons.conf.IoTDBConstant.MODS_SETTLE_FILE_SUFFIX;
import static org.apache.iotdb.commons.conf.IoTDBConstant.SETTLE_SUFFIX;
import static org.apache.iotdb.db.engine.compaction.execute.utils.log.CompactionLogger.CROSS_COMPACTION_LOG_NAME_FROM_OLD; import static org.apache.iotdb.db.engine.compaction.execute.utils.log.CompactionLogger.CROSS_COMPACTION_LOG_NAME_FROM_OLD;
import static org.apache.iotdb.db.engine.compaction.execute.utils.log.CompactionLogger.INNER_COMPACTION_LOG_NAME_SUFFIX_FROM_OLD; import static org.apache.iotdb.db.engine.compaction.execute.utils.log.CompactionLogger.INNER_COMPACTION_LOG_NAME_SUFFIX_FROM_OLD;
...@@ -87,18 +94,51 @@ public class CompactionRecoverManager { ...@@ -87,18 +94,51 @@ public class CompactionRecoverManager {
|| !Pattern.compile("[0-9]*").matcher(timePartitionDir.getName()).matches()) { || !Pattern.compile("[0-9]*").matcher(timePartitionDir.getName()).matches()) {
continue; continue;
} }
File[] compactionLogs = // recover temporary files generated during compacted
CompactionLogger.findCompactionLogs(isInnerSpace, timePartitionDir.getPath()); recoverCompaction(isInnerSpace, timePartitionDir);
for (File compactionLog : compactionLogs) {
logger.info("Calling compaction recover task."); // recover temporary files generated during .mods file settled
new CompactionRecoverTask( recoverModSettleFile(timePartitionDir.toPath());
logicalStorageGroupName, dataRegionId, tsFileManager, compactionLog, isInnerSpace)
.doCompaction();
}
} }
} }
} }
public void recoverModSettleFile(Path timePartitionDir) {
try (Stream<Path> settlesStream = Files.list(timePartitionDir)) {
settlesStream
.filter(path -> path.toString().endsWith(MODS_SETTLE_FILE_SUFFIX))
.forEach(
modsSettle -> {
Path originModFile =
modsSettle.resolveSibling(
modsSettle.getFileName().toString().replace(SETTLE_SUFFIX, BLANK));
try {
if (Files.exists(originModFile)) {
Files.deleteIfExists(modsSettle);
} else {
Files.move(modsSettle, originModFile);
}
} catch (IOException e) {
logger.error(
"recover mods file error on delete origin file or rename mods settle,", e);
}
});
} catch (IOException e) {
logger.error("recover mods file error on list files:{}", timePartitionDir, e);
}
}
public void recoverCompaction(boolean isInnerSpace, File timePartitionDir) {
File[] compactionLogs =
CompactionLogger.findCompactionLogs(isInnerSpace, timePartitionDir.getPath());
for (File compactionLog : compactionLogs) {
logger.info("Calling compaction recover task.");
new CompactionRecoverTask(
logicalStorageGroupName, dataRegionId, tsFileManager, compactionLog, isInnerSpace)
.doCompaction();
}
}
/** Check whether there is old compaction log from previous version (<0.13) and recover it. */ /** Check whether there is old compaction log from previous version (<0.13) and recover it. */
private void recoverCompactionBefore013(boolean isInnerSpace) { private void recoverCompactionBefore013(boolean isInnerSpace) {
String oldLogName = String oldLogName =
......
...@@ -46,6 +46,7 @@ import java.io.IOException; ...@@ -46,6 +46,7 @@ import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
public class InnerSpaceCompactionTask extends AbstractCompactionTask { public class InnerSpaceCompactionTask extends AbstractCompactionTask {
...@@ -66,6 +67,8 @@ public class InnerSpaceCompactionTask extends AbstractCompactionTask { ...@@ -66,6 +67,8 @@ public class InnerSpaceCompactionTask extends AbstractCompactionTask {
protected boolean[] isHoldingReadLock; protected boolean[] isHoldingReadLock;
protected boolean[] isHoldingWriteLock; protected boolean[] isHoldingWriteLock;
protected long maxModsFileSize;
public InnerSpaceCompactionTask( public InnerSpaceCompactionTask(
long timePartition, long timePartition,
TsFileManager tsFileManager, TsFileManager tsFileManager,
...@@ -339,6 +342,7 @@ public class InnerSpaceCompactionTask extends AbstractCompactionTask { ...@@ -339,6 +342,7 @@ public class InnerSpaceCompactionTask extends AbstractCompactionTask {
sumOfCompactionCount = 0; sumOfCompactionCount = 0;
maxFileVersion = -1L; maxFileVersion = -1L;
maxCompactionCount = -1; maxCompactionCount = -1;
maxModsFileSize = 0;
if (selectedTsFileResourceList == null) { if (selectedTsFileResourceList == null) {
return; return;
} }
...@@ -354,6 +358,10 @@ public class InnerSpaceCompactionTask extends AbstractCompactionTask { ...@@ -354,6 +358,10 @@ public class InnerSpaceCompactionTask extends AbstractCompactionTask {
if (fileName.getVersion() > maxFileVersion) { if (fileName.getVersion() > maxFileVersion) {
maxFileVersion = fileName.getVersion(); maxFileVersion = fileName.getVersion();
} }
if (!Objects.isNull(resource.getModFile())) {
long modsFileSize = resource.getModFile().getSize();
maxModsFileSize = Math.max(maxModsFileSize, modsFileSize);
}
} catch (IOException e) { } catch (IOException e) {
LOGGER.warn("Fail to get the tsfile name of {}", resource.getTsFile(), e); LOGGER.warn("Fail to get the tsfile name of {}", resource.getTsFile(), e);
} }
...@@ -380,6 +388,10 @@ public class InnerSpaceCompactionTask extends AbstractCompactionTask { ...@@ -380,6 +388,10 @@ public class InnerSpaceCompactionTask extends AbstractCompactionTask {
return maxFileVersion; return maxFileVersion;
} }
public long getMaxModsFileSize() {
return maxModsFileSize;
}
@Override @Override
public String toString() { public String toString() {
return storageGroupName return storageGroupName
......
...@@ -62,6 +62,13 @@ public class DefaultCompactionTaskComparatorImpl implements ICompactionTaskCompa ...@@ -62,6 +62,13 @@ public class DefaultCompactionTaskComparatorImpl implements ICompactionTaskCompa
public int compareInnerSpaceCompactionTask( public int compareInnerSpaceCompactionTask(
InnerSpaceCompactionTask o1, InnerSpaceCompactionTask o2) { InnerSpaceCompactionTask o1, InnerSpaceCompactionTask o2) {
// if max mods file size of o1 and o2 are different
// we prefer to execute task with greater mods file
if (o1.getMaxModsFileSize() != o2.getMaxModsFileSize()) {
return o2.getMaxModsFileSize() > o1.getMaxModsFileSize() ? 1 : -1;
}
// if the sum of compaction count of the selected files are different // if the sum of compaction count of the selected files are different
// we prefer to execute task with smaller compaction count // we prefer to execute task with smaller compaction count
// this can reduce write amplification // this can reduce write amplification
......
...@@ -116,9 +116,15 @@ public class ReadPointCrossCompactionEstimator extends AbstractCrossSpaceEstimat ...@@ -116,9 +116,15 @@ public class ReadPointCrossCompactionEstimator extends AbstractCrossSpaceEstimat
return 0; return 0;
} }
// it means the max size of a timeseries in this file when reading all of its chunk into memory. // it means the max size of a timeseries in this file when reading all of its chunk into memory.
return compressionRatio
* concurrentSeriesNum long resourceFileSize =
* (unseqResource.getTsFileSize() * fileInfo.maxSeriesChunkNum / fileInfo.totalChunkNum); compressionRatio
* concurrentSeriesNum
* (unseqResource.getTsFileSize() * fileInfo.maxSeriesChunkNum / fileInfo.totalChunkNum);
// add mod file size
long modFileSize = unseqResource.getModFile().getSize();
return resourceFileSize + modFileSize;
} }
/** /**
...@@ -162,6 +168,9 @@ public class ReadPointCrossCompactionEstimator extends AbstractCrossSpaceEstimat ...@@ -162,6 +168,9 @@ public class ReadPointCrossCompactionEstimator extends AbstractCrossSpaceEstimat
cost += seqFileCost; cost += seqFileCost;
maxCostOfReadingSeqFile = seqFileCost; maxCostOfReadingSeqFile = seqFileCost;
} }
// add mod file size
cost += seqResource.getModFile().getSize();
} }
return cost; return cost;
} }
......
...@@ -25,6 +25,7 @@ import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager; ...@@ -25,6 +25,7 @@ import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager;
import org.apache.iotdb.db.engine.compaction.schedule.comparator.ICompactionTaskComparator; import org.apache.iotdb.db.engine.compaction.schedule.comparator.ICompactionTaskComparator;
import org.apache.iotdb.db.engine.compaction.selector.IInnerSeqSpaceSelector; import org.apache.iotdb.db.engine.compaction.selector.IInnerSeqSpaceSelector;
import org.apache.iotdb.db.engine.compaction.selector.IInnerUnseqSpaceSelector; import org.apache.iotdb.db.engine.compaction.selector.IInnerUnseqSpaceSelector;
import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.storagegroup.TsFileManager; import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
import org.apache.iotdb.db.engine.storagegroup.TsFileNameGenerator; import org.apache.iotdb.db.engine.storagegroup.TsFileNameGenerator;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
...@@ -40,6 +41,7 @@ import java.util.Collections; ...@@ -40,6 +41,7 @@ import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Objects;
import java.util.PriorityQueue; import java.util.PriorityQueue;
/** /**
...@@ -64,6 +66,7 @@ public class SizeTieredCompactionSelector ...@@ -64,6 +66,7 @@ public class SizeTieredCompactionSelector
protected boolean sequence; protected boolean sequence;
protected TsFileManager tsFileManager; protected TsFileManager tsFileManager;
protected boolean hasNextTimePartition; protected boolean hasNextTimePartition;
private static final long MODS_FILE_SIZE_THRESHOLD = 1024 * 1024 * 50L;
public SizeTieredCompactionSelector( public SizeTieredCompactionSelector(
String storageGroupName, String storageGroupName,
...@@ -88,27 +91,23 @@ public class SizeTieredCompactionSelector ...@@ -88,27 +91,23 @@ public class SizeTieredCompactionSelector
* longer search for higher layers), otherwise it will return true. * longer search for higher layers), otherwise it will return true.
* *
* @param level the level to be searched * @param level the level to be searched
* @param taskPriorityQueue it stores the batches of files to be compacted and the total size of
* each batch
* @return return whether to continue the search to higher levels * @return return whether to continue the search to higher levels
* @throws IOException * @throws IOException
*/ */
private boolean selectLevelTask( @SuppressWarnings({"squid:S3776", "squid:S135"})
int level, PriorityQueue<Pair<List<TsFileResource>, Long>> taskPriorityQueue) private List<Pair<List<TsFileResource>, Long>> selectSingleLevel(int level) throws IOException {
throws IOException {
boolean shouldContinueToSearch = true;
List<TsFileResource> selectedFileList = new ArrayList<>(); List<TsFileResource> selectedFileList = new ArrayList<>();
long selectedFileSize = 0L; long selectedFileSize = 0L;
long targetCompactionFileSize = config.getTargetCompactionFileSize(); long targetCompactionFileSize = config.getTargetCompactionFileSize();
List<Pair<List<TsFileResource>, Long>> taskList = new ArrayList<>();
for (TsFileResource currentFile : tsFileResources) { for (TsFileResource currentFile : tsFileResources) {
TsFileNameGenerator.TsFileName currentName = TsFileNameGenerator.TsFileName currentName =
TsFileNameGenerator.getTsFileName(currentFile.getTsFile().getName()); TsFileNameGenerator.getTsFileName(currentFile.getTsFile().getName());
if (currentName.getInnerCompactionCnt() != level) { if (currentName.getInnerCompactionCnt() != level) {
// meet files of another level // meet files of another level
if (selectedFileList.size() > 1) { if (selectedFileList.size() > 1) {
taskPriorityQueue.add(new Pair<>(new ArrayList<>(selectedFileList), selectedFileSize)); taskList.add(new Pair<>(new ArrayList<>(selectedFileList), selectedFileSize));
shouldContinueToSearch = false;
} }
selectedFileList = new ArrayList<>(); selectedFileList = new ArrayList<>();
selectedFileSize = 0L; selectedFileSize = 0L;
...@@ -132,8 +131,7 @@ public class SizeTieredCompactionSelector ...@@ -132,8 +131,7 @@ public class SizeTieredCompactionSelector
|| selectedFileList.size() >= config.getMaxInnerCompactionCandidateFileNum()) { || selectedFileList.size() >= config.getMaxInnerCompactionCandidateFileNum()) {
// submit the task // submit the task
if (selectedFileList.size() > 1) { if (selectedFileList.size() > 1) {
taskPriorityQueue.add(new Pair<>(new ArrayList<>(selectedFileList), selectedFileSize)); taskList.add(new Pair<>(new ArrayList<>(selectedFileList), selectedFileSize));
shouldContinueToSearch = false;
} }
selectedFileList = new ArrayList<>(); selectedFileList = new ArrayList<>();
selectedFileSize = 0L; selectedFileSize = 0L;
...@@ -143,17 +141,19 @@ public class SizeTieredCompactionSelector ...@@ -143,17 +141,19 @@ public class SizeTieredCompactionSelector
// if next time partition exists // if next time partition exists
// submit a merge task even it does not meet the requirement for file num or file size // submit a merge task even it does not meet the requirement for file num or file size
if (hasNextTimePartition && selectedFileList.size() > 1) { if (hasNextTimePartition && selectedFileList.size() > 1) {
taskPriorityQueue.add(new Pair<>(new ArrayList<>(selectedFileList), selectedFileSize)); taskList.add(new Pair<>(new ArrayList<>(selectedFileList), selectedFileSize));
shouldContinueToSearch = false;
} }
return shouldContinueToSearch; return taskList;
} }
/** /**
* This method searches for a batch of files to be compacted from layer 0 to the highest layer. If * This method is used to select a batch of files to be merged. There are two ways to select
* there are more than a batch of files to be merged on a certain layer, it does not search to * files.If the first method selects the appropriate file, the second method is not executed. The
* higher layers. It creates a compaction thread for each batch of files and put it into the * first one is based on the mods file corresponding to the file. We will preferentially select
* candidateCompactionTaskQueue of the {@link CompactionTaskManager}. * file with mods file larger than 50M. The second way is based on the file layer from layer 0 to
* the highest layer. If there are more than a batch of files to be merged on a certain layer, it
* does not search to higher layers. It creates a compaction thread for each batch of files and
* put it into the candidateCompactionTaskQueue of the {@link CompactionTaskManager}.
* *
* @return Returns whether the file was found and submits the merge task * @return Returns whether the file was found and submits the merge task
*/ */
...@@ -163,12 +163,14 @@ public class SizeTieredCompactionSelector ...@@ -163,12 +163,14 @@ public class SizeTieredCompactionSelector
PriorityQueue<Pair<List<TsFileResource>, Long>> taskPriorityQueue = PriorityQueue<Pair<List<TsFileResource>, Long>> taskPriorityQueue =
new PriorityQueue<>(new SizeTieredCompactionTaskComparator()); new PriorityQueue<>(new SizeTieredCompactionTaskComparator());
try { try {
int maxLevel = searchMaxFileLevel(); // preferentially select files based on mods file size
for (int currentLevel = 0; currentLevel <= maxLevel; currentLevel++) { taskPriorityQueue.addAll(selectMaxModsFileTask());
if (!selectLevelTask(currentLevel, taskPriorityQueue)) {
break; // if a suitable file is not selected in the first step, select the file at the tsfile level
} if (taskPriorityQueue.isEmpty()) {
taskPriorityQueue.addAll(selectLevelTask());
} }
List<List<TsFileResource>> taskList = new LinkedList<>(); List<List<TsFileResource>> taskList = new LinkedList<>();
while (taskPriorityQueue.size() > 0) { while (taskPriorityQueue.size() > 0) {
List<TsFileResource> resources = taskPriorityQueue.poll().left; List<TsFileResource> resources = taskPriorityQueue.poll().left;
...@@ -181,6 +183,32 @@ public class SizeTieredCompactionSelector ...@@ -181,6 +183,32 @@ public class SizeTieredCompactionSelector
return Collections.emptyList(); return Collections.emptyList();
} }
private List<Pair<List<TsFileResource>, Long>> selectLevelTask() throws IOException {
List<Pair<List<TsFileResource>, Long>> taskList = new ArrayList<>();
int maxLevel = searchMaxFileLevel();
for (int currentLevel = 0; currentLevel <= maxLevel; currentLevel++) {
List<Pair<List<TsFileResource>, Long>> singleLevelTask = selectSingleLevel(currentLevel);
if (!singleLevelTask.isEmpty()) {
taskList.addAll(singleLevelTask);
break;
}
}
return taskList;
}
private List<Pair<List<TsFileResource>, Long>> selectMaxModsFileTask() {
List<Pair<List<TsFileResource>, Long>> taskList = new ArrayList<>();
for (TsFileResource tsFileResource : tsFileResources) {
ModificationFile modFile = tsFileResource.getModFile();
if (!Objects.isNull(modFile) && modFile.getSize() > MODS_FILE_SIZE_THRESHOLD) {
taskList.add(
new Pair<>(Collections.singletonList(tsFileResource), tsFileResource.getTsFileSize()));
LOGGER.debug("select tsfile {},the mod file size is {}", tsFileResource, modFile.getSize());
}
}
return taskList;
}
private int searchMaxFileLevel() throws IOException { private int searchMaxFileLevel() throws IOException {
int maxLevel = -1; int maxLevel = -1;
for (TsFileResource currentFile : tsFileResources) { for (TsFileResource currentFile : tsFileResources) {
......
...@@ -44,6 +44,10 @@ public class Deletion extends Modification implements Cloneable { ...@@ -44,6 +44,10 @@ public class Deletion extends Modification implements Cloneable {
public Deletion(PartialPath path, long fileOffset, long endTime) { public Deletion(PartialPath path, long fileOffset, long endTime) {
super(Type.DELETION, path, fileOffset); super(Type.DELETION, path, fileOffset);
this.timeRange = new TimeRange(Long.MIN_VALUE, endTime); this.timeRange = new TimeRange(Long.MIN_VALUE, endTime);
this.timeRange.setLeftClose(false);
if (endTime == Long.MAX_VALUE) {
this.timeRange.setRightClose(false);
}
} }
/** /**
...@@ -56,6 +60,12 @@ public class Deletion extends Modification implements Cloneable { ...@@ -56,6 +60,12 @@ public class Deletion extends Modification implements Cloneable {
public Deletion(PartialPath path, long fileOffset, long startTime, long endTime) { public Deletion(PartialPath path, long fileOffset, long startTime, long endTime) {
super(Type.DELETION, path, fileOffset); super(Type.DELETION, path, fileOffset);
this.timeRange = new TimeRange(startTime, endTime); this.timeRange = new TimeRange(startTime, endTime);
if (startTime == Long.MIN_VALUE) {
this.timeRange.setLeftClose(false);
}
if (endTime == Long.MAX_VALUE) {
this.timeRange.setRightClose(false);
}
} }
public long getStartTime() { public long getStartTime() {
......
...@@ -36,8 +36,13 @@ import java.io.IOException; ...@@ -36,8 +36,13 @@ import java.io.IOException;
import java.nio.file.FileAlreadyExistsException; import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Random; import java.util.Random;
import java.util.Set;
import java.util.stream.Collectors;
/** /**
* ModificationFile stores the Modifications of a TsFile or unseq file in another file in the same * ModificationFile stores the Modifications of a TsFile or unseq file in another file in the same
...@@ -47,6 +52,7 @@ public class ModificationFile implements AutoCloseable { ...@@ -47,6 +52,7 @@ public class ModificationFile implements AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(ModificationFile.class); private static final Logger logger = LoggerFactory.getLogger(ModificationFile.class);
public static final String FILE_SUFFIX = ".mods"; public static final String FILE_SUFFIX = ".mods";
public static final String COMPACT_SUFFIX = ".settle";
public static final String COMPACTION_FILE_SUFFIX = ".compaction.mods"; public static final String COMPACTION_FILE_SUFFIX = ".compaction.mods";
// whether to verify the last line, it may be incomplete in extreme cases // whether to verify the last line, it may be incomplete in extreme cases
...@@ -57,6 +63,9 @@ public class ModificationFile implements AutoCloseable { ...@@ -57,6 +63,9 @@ public class ModificationFile implements AutoCloseable {
private String filePath; private String filePath;
private Random random = new Random(); private Random random = new Random();
private static final long COMPACT_THRESHOLD = 1024 * 1024;
private boolean hasCompacted = false;
/** /**
* Construct a ModificationFile using a file as its storage. * Construct a ModificationFile using a file as its storage.
* *
...@@ -124,7 +133,10 @@ public class ModificationFile implements AutoCloseable { ...@@ -124,7 +133,10 @@ public class ModificationFile implements AutoCloseable {
public void remove() throws IOException { public void remove() throws IOException {
close(); close();
FSFactoryProducer.getFSFactory().getFile(filePath).delete(); boolean deleted = FSFactoryProducer.getFSFactory().getFile(filePath).delete();
if (!deleted) {
logger.warn("Delete ModificationFile {} failed.", filePath);
}
} }
public boolean exists() { public boolean exists() {
...@@ -177,4 +189,78 @@ public class ModificationFile implements AutoCloseable { ...@@ -177,4 +189,78 @@ public class ModificationFile implements AutoCloseable {
return 0; return 0;
} }
} }
public void compact() {
long originFileSize = getSize();
if (originFileSize > COMPACT_THRESHOLD && !hasCompacted) {
Map<String, List<Modification>> pathModificationMap =
getModifications().stream().collect(Collectors.groupingBy(Modification::getPathString));
String newModsFileName = filePath + COMPACT_SUFFIX;
try (ModificationFile compactedModificationFile = new ModificationFile(newModsFileName)) {
Set<Map.Entry<String, List<Modification>>> modificationsEntrySet =
pathModificationMap.entrySet();
for (Map.Entry<String, List<Modification>> modificationEntry : modificationsEntrySet) {
List<Modification> settledModifications = sortAndMerge(modificationEntry.getValue());
for (Modification settledModification : settledModifications) {
compactedModificationFile.write(settledModification);
}
}
} catch (IOException e) {
logger.error("compact mods file exception of {}", filePath, e);
}
try {
// remove origin mods file
this.remove();
// rename new mods file to origin name
Files.move(new File(newModsFileName).toPath(), new File(filePath).toPath());
logger.info("{} settle successful", filePath);
if (getSize() > COMPACT_THRESHOLD) {
logger.warn(
"After the mod file is settled, the file size is still greater than 1M,the size of the file before settle is {},after settled the file size is {}",
originFileSize,
getSize());
}
} catch (IOException e) {
logger.error("remove origin file or rename new mods file error.", e);
}
hasCompacted = true;
}
}
public static List<Modification> sortAndMerge(List<Modification> modifications) {
modifications.sort(
(o1, o2) -> {
if (!o1.getType().equals(o2.getType())) {
return o1.getType().compareTo(o2.getType());
} else if (!o1.getPath().equals(o2.getPath())) {
return o1.getPath().compareTo(o2.getPath());
} else if (o1.getFileOffset() != o2.getFileOffset()) {
return (int) (o1.getFileOffset() - o2.getFileOffset());
} else {
if (o1.getType() == Modification.Type.DELETION) {
Deletion del1 = (Deletion) o1;
Deletion del2 = (Deletion) o2;
return del1.getTimeRange().compareTo(del2.getTimeRange());
}
throw new IllegalArgumentException();
}
});
List<Modification> result = new ArrayList<>();
if (!modifications.isEmpty()) {
Deletion current = ((Deletion) modifications.get(0)).clone();
for (int i = 1; i < modifications.size(); i++) {
Deletion del = (Deletion) modifications.get(i);
if (current.intersects(del)) {
current.merge(del);
} else {
result.add(current);
current = del.clone();
}
}
result.add(current);
}
return result;
}
} }
...@@ -2055,6 +2055,8 @@ public class DataRegion implements IDataRegionForQuery { ...@@ -2055,6 +2055,8 @@ public class DataRegion implements IDataRegionForQuery {
tsFileResource.getModFile().write(deletion); tsFileResource.getModFile().write(deletion);
// remember to close mod file // remember to close mod file
tsFileResource.getModFile().close(); tsFileResource.getModFile().close();
// if file length greater than 1M,execute compact.
modFile.compact();
if (!modFileExists) { if (!modFileExists) {
TsFileMetricManager.getInstance().increaseModFileNum(1); TsFileMetricManager.getInstance().increaseModFileNum(1);
} }
......
...@@ -22,7 +22,6 @@ package org.apache.iotdb.db.query.context; ...@@ -22,7 +22,6 @@ package org.apache.iotdb.db.query.context;
import org.apache.iotdb.commons.path.AlignedPath; import org.apache.iotdb.commons.path.AlignedPath;
import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PatternTreeMap; import org.apache.iotdb.commons.path.PatternTreeMap;
import org.apache.iotdb.db.engine.modification.Deletion;
import org.apache.iotdb.db.engine.modification.Modification; import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.modification.ModificationFile; import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.metadata.path.PatternTreeMapFactory; import org.apache.iotdb.db.metadata.path.PatternTreeMapFactory;
...@@ -113,45 +112,10 @@ public class QueryContext { ...@@ -113,45 +112,10 @@ public class QueryContext {
} }
fileModCache.put(modFile.getFilePath(), allModifications); fileModCache.put(modFile.getFilePath(), allModifications);
} }
return sortAndMerge(allModifications.getOverlapped(path)); return ModificationFile.sortAndMerge(allModifications.getOverlapped(path));
}); });
} }
private List<Modification> sortAndMerge(List<Modification> modifications) {
modifications.sort(
(o1, o2) -> {
if (!o1.getType().equals(o2.getType())) {
return o1.getType().compareTo(o2.getType());
} else if (!o1.getPath().equals(o2.getPath())) {
return o1.getPath().compareTo(o2.getPath());
} else if (o1.getFileOffset() != o2.getFileOffset()) {
return (int) (o1.getFileOffset() - o2.getFileOffset());
} else {
if (o1.getType() == Modification.Type.DELETION) {
Deletion del1 = (Deletion) o1;
Deletion del2 = (Deletion) o2;
return del1.getTimeRange().compareTo(del2.getTimeRange());
}
throw new IllegalArgumentException();
}
});
List<Modification> result = new ArrayList<>();
if (!modifications.isEmpty()) {
Deletion current = ((Deletion) modifications.get(0)).clone();
for (int i = 1; i < modifications.size(); i++) {
Deletion del = (Deletion) modifications.get(i);
if (current.intersects(del)) {
current.merge(del);
} else {
result.add(current);
current = del.clone();
}
}
result.add(current);
}
return result;
}
/** /**
* Find the modifications of all aligned 'paths' in 'modFile'. If they are not in the cache, read * Find the modifications of all aligned 'paths' in 'modFile'. If they are not in the cache, read
* them from 'modFile' and put then into the cache. * them from 'modFile' and put then into the cache.
......
...@@ -18,6 +18,8 @@ ...@@ -18,6 +18,8 @@
*/ */
package org.apache.iotdb.db.engine.compaction; package org.apache.iotdb.db.engine.compaction;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.compaction.execute.performer.impl.FastCompactionPerformer; import org.apache.iotdb.db.engine.compaction.execute.performer.impl.FastCompactionPerformer;
import org.apache.iotdb.db.engine.compaction.execute.performer.impl.ReadPointCompactionPerformer; import org.apache.iotdb.db.engine.compaction.execute.performer.impl.ReadPointCompactionPerformer;
...@@ -28,6 +30,7 @@ import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager; ...@@ -28,6 +30,7 @@ import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager;
import org.apache.iotdb.db.engine.compaction.schedule.comparator.DefaultCompactionTaskComparatorImpl; import org.apache.iotdb.db.engine.compaction.schedule.comparator.DefaultCompactionTaskComparatorImpl;
import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionPriority; import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionPriority;
import org.apache.iotdb.db.engine.compaction.utils.CompactionConfigRestorer; import org.apache.iotdb.db.engine.compaction.utils.CompactionConfigRestorer;
import org.apache.iotdb.db.engine.modification.Deletion;
import org.apache.iotdb.db.engine.storagegroup.TsFileManager; import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.utils.datastructure.FixedPriorityBlockingQueue; import org.apache.iotdb.db.utils.datastructure.FixedPriorityBlockingQueue;
...@@ -41,7 +44,9 @@ import org.slf4j.Logger; ...@@ -41,7 +44,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.File; import java.io.File;
import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
...@@ -374,6 +379,40 @@ public class CompactionTaskComparatorTest { ...@@ -374,6 +379,40 @@ public class CompactionTaskComparatorTest {
} }
} }
@Test
public void testCompareByMaxModsFileSize()
throws InterruptedException, IllegalPathException, IOException {
for (int i = 0; i < 100; ++i) {
List<TsFileResource> resources = new ArrayList<>();
for (int j = i; j < 100; ++j) {
resources.add(
new FakedTsFileResource(new File(String.format("%d-%d-0-0.tsfile", i + j, i + j)), j));
}
FakedInnerSpaceCompactionTask innerTask =
new FakedInnerSpaceCompactionTask(
"fakeSg", 0, tsFileManager, taskNum, true, resources, 0);
compactionTaskQueue.put(innerTask);
}
String targetFileName = "101-101-0-0.tsfile";
FakedTsFileResource fakedTsFileResource =
new FakedTsFileResource(new File(targetFileName), 100);
fakedTsFileResource.getModFile().write(new Deletion(new PartialPath("root.test.d1"), 1, 1));
compactionTaskQueue.put(
new FakedInnerSpaceCompactionTask(
"fakeSg",
0,
tsFileManager,
taskNum,
true,
Collections.singletonList(fakedTsFileResource),
0));
FakedInnerSpaceCompactionTask task = (FakedInnerSpaceCompactionTask) compactionTaskQueue.take();
Assert.assertEquals(
targetFileName, task.getSelectedTsFileResourceList().get(0).getTsFile().getName());
fakedTsFileResource.getModFile().remove();
}
private static class FakedInnerSpaceCompactionTask extends InnerSpaceCompactionTask { private static class FakedInnerSpaceCompactionTask extends InnerSpaceCompactionTask {
public FakedInnerSpaceCompactionTask( public FakedInnerSpaceCompactionTask(
......
...@@ -19,12 +19,15 @@ ...@@ -19,12 +19,15 @@
package org.apache.iotdb.db.engine.compaction.inner; package org.apache.iotdb.db.engine.compaction.inner;
import org.apache.iotdb.commons.exception.MetadataException; import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.compaction.AbstractCompactionTest; import org.apache.iotdb.db.engine.compaction.AbstractCompactionTest;
import org.apache.iotdb.db.engine.compaction.execute.performer.impl.FastCompactionPerformer; import org.apache.iotdb.db.engine.compaction.execute.performer.impl.FastCompactionPerformer;
import org.apache.iotdb.db.engine.compaction.execute.task.InnerSpaceCompactionTask; import org.apache.iotdb.db.engine.compaction.execute.task.InnerSpaceCompactionTask;
import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager; import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager;
import org.apache.iotdb.db.engine.compaction.selector.impl.SizeTieredCompactionSelector; import org.apache.iotdb.db.engine.compaction.selector.impl.SizeTieredCompactionSelector;
import org.apache.iotdb.db.engine.modification.Deletion;
import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus; import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus;
import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.exception.StorageEngineException;
...@@ -41,6 +44,8 @@ import java.util.List; ...@@ -41,6 +44,8 @@ import java.util.List;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR;
public class InnerSpaceCompactionSelectorTest extends AbstractCompactionTest { public class InnerSpaceCompactionSelectorTest extends AbstractCompactionTest {
@Before @Before
public void setUp() public void setUp()
...@@ -623,4 +628,32 @@ public class InnerSpaceCompactionSelectorTest extends AbstractCompactionTest { ...@@ -623,4 +628,32 @@ public class InnerSpaceCompactionSelectorTest extends AbstractCompactionTest {
Assert.fail(); Assert.fail();
} }
} }
@Test
public void testSelectWhenModsFileGreaterThan50M()
throws IOException, MetadataException, WriteProcessException {
createFiles(6, 2, 3, 50, 0, 10000, 50, 50, false, true);
tsFileManager.addAll(seqResources, true);
tsFileManager.addAll(unseqResources, false);
TsFileResource tsFileResource = seqResources.get(0);
ModificationFile modFile = tsFileResource.getModFile();
while (modFile.getSize() < 1024 * 1024 * 50) {
modFile.write(
new Deletion(
new PartialPath(COMPACTION_TEST_SG + PATH_SEPARATOR + "**"),
Long.MIN_VALUE,
Long.MAX_VALUE));
}
SizeTieredCompactionSelector selector =
new SizeTieredCompactionSelector("", "", 0, true, tsFileManager);
// copy candidate source file list
List<TsFileResource> resources = tsFileManager.getOrCreateSequenceListByTimePartition(0);
List<List<TsFileResource>> taskResource = selector.selectInnerSpaceTask(resources);
Assert.assertEquals(1, taskResource.size());
modFile.remove();
}
} }
...@@ -21,14 +21,19 @@ package org.apache.iotdb.db.engine.modification; ...@@ -21,14 +21,19 @@ package org.apache.iotdb.db.engine.modification;
import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.constant.TestConstant; import org.apache.iotdb.db.constant.TestConstant;
import org.apache.iotdb.db.engine.compaction.execute.recover.CompactionRecoverManager;
import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
public class ModificationFileTest { public class ModificationFileTest {
...@@ -99,4 +104,181 @@ public class ModificationFileTest { ...@@ -99,4 +104,181 @@ public class ModificationFileTest {
new File(tempFileName).delete(); new File(tempFileName).delete();
} }
} }
// test if file size greater than 1M.
@Test
public void testCompact01() {
String tempFileName = TestConstant.BASE_OUTPUT_PATH.concat("compact01.mods");
long time = 1000;
try (ModificationFile modificationFile = new ModificationFile(tempFileName)) {
while (modificationFile.getSize() < 1024 * 1024) {
modificationFile.write(
new Deletion(
new PartialPath(new String[] {"root", "sg", "d1"}),
1000,
Long.MIN_VALUE,
time += 5000));
}
modificationFile.compact();
List<Modification> modificationList = new ArrayList<>(modificationFile.getModifications());
assertEquals(1, modificationList.size());
Deletion deletion = (Deletion) modificationList.get(0);
assertEquals(time, deletion.getEndTime());
assertEquals(Long.MIN_VALUE, deletion.getStartTime());
} catch (IOException e) {
fail(e.getMessage());
} finally {
new File(tempFileName).delete();
}
}
// test if file size less than 1M.
@Test
public void testCompact02() {
String tempFileName = TestConstant.BASE_OUTPUT_PATH.concat("compact02.mods");
long time = 1000;
try (ModificationFile modificationFile = new ModificationFile(tempFileName)) {
while (modificationFile.getSize() < 1024 * 100) {
modificationFile.write(
new Deletion(
new PartialPath(new String[] {"root", "sg", "d1"}),
1000,
Long.MIN_VALUE,
time += 5000));
}
modificationFile.compact();
List<Modification> modificationList = new ArrayList<>(modificationFile.getModifications());
assertTrue(modificationList.size() > 1);
} catch (IOException e) {
fail(e.getMessage());
} finally {
new File(tempFileName).delete();
}
}
// test if file size greater than 1M.
@Test
public void testCompact03() {
String tempFileName = TestConstant.BASE_OUTPUT_PATH.concat("compact03.mods");
try (ModificationFile modificationFile = new ModificationFile(tempFileName)) {
while (modificationFile.getSize() < 1024 * 1024) {
modificationFile.write(
new Deletion(
new PartialPath(new String[] {"root", "sg", "d1"}),
1000,
Long.MIN_VALUE,
Long.MAX_VALUE));
}
modificationFile.compact();
List<Modification> modificationList = new ArrayList<>(modificationFile.getModifications());
assertEquals(1, modificationList.size());
Deletion deletion = (Deletion) modificationList.get(0);
assertEquals(Long.MAX_VALUE, deletion.getEndTime());
assertEquals(Long.MIN_VALUE, deletion.getStartTime());
} catch (IOException e) {
fail(e.getMessage());
} finally {
new File(tempFileName).delete();
}
}
@Test
public void testCompact04() {
String tempFileName = TestConstant.BASE_OUTPUT_PATH.concat("compact04.mods");
try (ModificationFile modificationFile = new ModificationFile(tempFileName)) {
long time = 0;
while (modificationFile.getSize() < 1024 * 1024) {
for (int i = 0; i < 5; i++) {
modificationFile.write(
new Deletion(
new PartialPath(new String[] {"root", "sg", "d1"}),
1000,
Long.MIN_VALUE,
time += 5000));
modificationFile.write(
new Deletion(
new PartialPath(new String[] {"root", "sg", "*"}),
1000,
Long.MIN_VALUE,
time += 5000));
}
}
modificationFile.compact();
List<Modification> modificationList = new ArrayList<>(modificationFile.getModifications());
assertEquals(2, modificationList.size());
} catch (IOException e) {
fail(e.getMessage());
} finally {
new File(tempFileName).delete();
}
}
// test mods file and mods settle file both exists
@Test
public void testRecover01() {
String modsFileName = TestConstant.BASE_OUTPUT_PATH.concat("compact01.mods");
String modsSettleFileName = TestConstant.BASE_OUTPUT_PATH.concat("compact01.mods.settle");
try (ModificationFile modsFile = new ModificationFile(modsFileName);
ModificationFile modsSettleFile = new ModificationFile(modsSettleFileName)) {
modsFile.write(
new Deletion(
new PartialPath(new String[] {"root", "sg", "d1"}),
1000,
Long.MIN_VALUE,
Long.MAX_VALUE));
modsSettleFile.write(
new Deletion(
new PartialPath(new String[] {"root", "sg", "d1"}),
1000,
Long.MIN_VALUE,
Long.MAX_VALUE));
modsFile.close();
modsSettleFile.close();
new CompactionRecoverManager(null, null, null)
.recoverModSettleFile(new File(TestConstant.BASE_OUTPUT_PATH).toPath());
Assert.assertTrue(modsFile.exists());
Assert.assertFalse(modsSettleFile.exists());
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
try {
Files.delete(new File(modsFileName).toPath());
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
// test only mods settle file exists
@Test
public void testRecover02() {
String modsSettleFileName = TestConstant.BASE_OUTPUT_PATH.concat("compact02.mods.settle");
String originModsFileName = TestConstant.BASE_OUTPUT_PATH.concat("compact02.mods");
try (ModificationFile modsSettleFile = new ModificationFile(modsSettleFileName)) {
modsSettleFile.write(
new Deletion(
new PartialPath(new String[] {"root", "sg", "d1"}),
1000,
Long.MIN_VALUE,
Long.MAX_VALUE));
modsSettleFile.close();
new CompactionRecoverManager(null, null, null)
.recoverModSettleFile(new File(TestConstant.BASE_OUTPUT_PATH).toPath());
Assert.assertFalse(modsSettleFile.exists());
Assert.assertTrue(new File(originModsFileName).exists());
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
try {
Files.delete(new File(originModsFileName).toPath());
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
} }
...@@ -33,6 +33,7 @@ public class FakedTsFileResource extends TsFileResource { ...@@ -33,6 +33,7 @@ public class FakedTsFileResource extends TsFileResource {
private String fakeTsfileName; private String fakeTsfileName;
public FakedTsFileResource(long tsFileSize, String name) { public FakedTsFileResource(long tsFileSize, String name) {
super(new File(name));
this.timeIndex = new FileTimeIndex(); this.timeIndex = new FileTimeIndex();
this.tsFileSize = tsFileSize; this.tsFileSize = tsFileSize;
fakeTsfileName = name; fakeTsfileName = name;
......
...@@ -448,4 +448,45 @@ public class TimeRangeTest { ...@@ -448,4 +448,45 @@ public class TimeRangeTest {
Assert.assertTrue(new TimeRange(0, 3).compareTo(new TimeRange(1, 2)) < 0); Assert.assertTrue(new TimeRange(0, 3).compareTo(new TimeRange(1, 2)) < 0);
Assert.assertTrue(new TimeRange(5, 6).compareTo(new TimeRange(5, 6)) == 0); Assert.assertTrue(new TimeRange(5, 6).compareTo(new TimeRange(5, 6)) == 0);
} }
@Test
/*
* test min is Long.MIN_VALUE
*/
public void intersect8() {
TimeRange r1 = new TimeRange(Long.MIN_VALUE, 3);
r1.setLeftClose(false);
TimeRange r2 = new TimeRange(Long.MIN_VALUE, 5);
r2.setLeftClose(false);
assertTrue(r1.intersects(r2));
assertTrue(r2.intersects(r1));
}
@Test
/*
* test max is Long.MAX_VALUE
*/
public void intersect9() {
TimeRange r1 = new TimeRange(1, Long.MAX_VALUE);
r1.setRightClose(false);
TimeRange r2 = new TimeRange(3, Long.MAX_VALUE);
r2.setRightClose(false);
assertTrue(r1.intersects(r2));
assertTrue(r2.intersects(r1));
}
@Test
/*
* test min is Long.MIN_VALUE and max is Long.MAX_VALUE
*/
public void intersect10() {
TimeRange r1 = new TimeRange(Long.MIN_VALUE, Long.MAX_VALUE);
r1.setLeftClose(false);
r1.setRightClose(false);
TimeRange r2 = new TimeRange(Long.MIN_VALUE, Long.MAX_VALUE);
r2.setLeftClose(false);
r2.setRightClose(false);
assertTrue(r1.intersects(r2));
assertTrue(r2.intersects(r1));
}
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册