未验证 提交 c307a455 编写于 作者: S shuwenwei 提交者: GitHub

Reimplement compaction memory estimator (#10951)

上级 f80c2894
......@@ -21,7 +21,7 @@ package org.apache.iotdb.db.storageengine.dataregion.compaction.selector;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.constant.CrossCompactionPerformer;
import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager;
import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.AbstractCompactionEstimator;
import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.ReadPointCrossCompactionEstimator;
import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.FastCrossSpaceCompactionEstimator;
import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.utils.CrossCompactionTaskResource;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
......@@ -66,7 +66,7 @@ public interface ICompactionSelector {
case READ_POINT:
case FAST:
if (!isInnerSpace) {
return new ReadPointCrossCompactionEstimator();
return new FastCrossSpaceCompactionEstimator();
}
default:
throw new RuntimeException(
......
......@@ -23,56 +23,121 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.storageengine.dataregion.flush.CompressionRatio;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.DeviceTimeIndex;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.FileTimeIndex;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ITimeIndex;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
/**
* Estimate the memory cost of one compaction task with specific source files based on its
* corresponding implementation.
*/
public abstract class AbstractCompactionEstimator {
public abstract class AbstractCompactionEstimator implements Closeable {
protected Map<TsFileResource, TsFileSequenceReader> fileReaderCache = new HashMap<>();
protected Map<TsFileResource, FileInfo> fileInfoCache = new HashMap<>();
protected Map<TsFileResource, DeviceTimeIndex> deviceTimeIndexCache = new HashMap<>();
protected IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
protected long compressionRatio = (long) CompressionRatio.getInstance().getRatio() + 1;
/**
* Estimate the memory cost of compacting the unseq file and its corresponding overlapped seq
* files in cross space compaction task.
*
* @throws IOException if io errors occurred
*/
public abstract long estimateCrossCompactionMemory(
List<TsFileResource> seqResources, TsFileResource unseqResource) throws IOException;
protected abstract long calculatingMetadataMemoryCost(CompactionTaskInfo taskInfo);
/** Estimate the memory cost of compacting the source files in inner space compaction task. */
public abstract long estimateInnerCompactionMemory(List<TsFileResource> resources)
throws IOException;
protected abstract long calculatingDataMemoryCost(CompactionTaskInfo taskInfo) throws IOException;
/**
* Construct a new or get an existing TsFileSequenceReader of a TsFile.
*
* @throws IOException if io errors occurred
*/
protected TsFileSequenceReader getFileReader(TsFileResource tsFileResource) throws IOException {
TsFileSequenceReader reader = fileReaderCache.get(tsFileResource);
if (reader == null) {
reader = new TsFileSequenceReader(tsFileResource.getTsFilePath(), true, false);
fileReaderCache.put(tsFileResource, reader);
protected CompactionTaskInfo calculatingCompactionTaskInfo(List<TsFileResource> resources)
throws IOException {
List<FileInfo> fileInfoList = new ArrayList<>();
for (TsFileResource resource : resources) {
FileInfo fileInfo = getFileInfoFromCache(resource);
fileInfoList.add(fileInfo);
}
return reader;
return new CompactionTaskInfo(resources, fileInfoList);
}
public void close() throws IOException {
for (TsFileSequenceReader reader : fileReaderCache.values()) {
reader.close();
private FileInfo getFileInfoFromCache(TsFileResource resource) throws IOException {
if (fileInfoCache.containsKey(resource)) {
return fileInfoCache.get(resource);
}
try (TsFileSequenceReader reader =
new TsFileSequenceReader(resource.getTsFilePath(), true, false)) {
FileInfo fileInfo = CompactionEstimateUtils.calculateFileInfo(reader);
fileInfoCache.put(resource, fileInfo);
return fileInfo;
}
}
protected int calculatingMaxOverlapFileNumInSubCompactionTask(List<TsFileResource> resources)
throws IOException {
Set<String> devices = new HashSet<>();
List<DeviceTimeIndex> resourceDevices = new ArrayList<>(resources.size());
for (TsFileResource resource : resources) {
DeviceTimeIndex deviceTimeIndex = getDeviceTimeIndexFromCache(resource);
devices.addAll(deviceTimeIndex.getDevices());
resourceDevices.add(deviceTimeIndex);
}
int maxOverlapFileNumInSubCompactionTask = 1;
for (String device : devices) {
List<DeviceTimeIndex> resourcesContainsCurrentDevice =
resourceDevices.stream()
.filter(resource -> !resource.definitelyNotContains(device))
.sorted(Comparator.comparingLong(resource -> resource.getStartTime(device)))
.collect(Collectors.toList());
if (resourcesContainsCurrentDevice.size() < maxOverlapFileNumInSubCompactionTask) {
continue;
}
long maxEndTimeOfCurrentDevice = Long.MIN_VALUE;
int overlapFileNumOfCurrentDevice = 0;
for (DeviceTimeIndex resource : resourcesContainsCurrentDevice) {
long deviceStartTimeInCurrentFile = resource.getStartTime(device);
long deviceEndTimeInCurrentFile = resource.getEndTime(device);
if (deviceStartTimeInCurrentFile <= maxEndTimeOfCurrentDevice) {
// has overlap, update max end time
maxEndTimeOfCurrentDevice =
Math.max(maxEndTimeOfCurrentDevice, deviceEndTimeInCurrentFile);
overlapFileNumOfCurrentDevice++;
maxOverlapFileNumInSubCompactionTask =
Math.max(maxOverlapFileNumInSubCompactionTask, overlapFileNumOfCurrentDevice);
} else {
// reset max end time and overlap file num of current device
maxEndTimeOfCurrentDevice = deviceEndTimeInCurrentFile;
overlapFileNumOfCurrentDevice = 1;
}
}
// already reach the max value
if (maxOverlapFileNumInSubCompactionTask == resources.size()) {
return maxOverlapFileNumInSubCompactionTask;
}
}
fileReaderCache.clear();
return maxOverlapFileNumInSubCompactionTask;
}
private DeviceTimeIndex getDeviceTimeIndexFromCache(TsFileResource resource) throws IOException {
if (deviceTimeIndexCache.containsKey(resource)) {
return deviceTimeIndexCache.get(resource);
}
ITimeIndex timeIndex = resource.getTimeIndex();
if (timeIndex instanceof FileTimeIndex) {
timeIndex = resource.buildDeviceTimeIndex();
}
deviceTimeIndexCache.put(resource, (DeviceTimeIndex) timeIndex);
return (DeviceTimeIndex) timeIndex;
}
public void close() throws IOException {
deviceTimeIndexCache.clear();
fileInfoCache.clear();
}
}
......@@ -22,6 +22,7 @@ package org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimat
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
......@@ -29,11 +30,27 @@ import java.util.List;
* its corresponding implementation.
*/
public abstract class AbstractCrossSpaceEstimator extends AbstractCompactionEstimator {
public abstract long estimateCrossCompactionMemory(
List<TsFileResource> seqResources, TsFileResource unseqResource) throws IOException;
public long estimateInnerCompactionMemory(List<TsFileResource> resources) {
throw new RuntimeException(
"This kind of estimator cannot be used to estimate inner space compaction task");
public long estimateCrossCompactionMemory(
List<TsFileResource> seqResources, List<TsFileResource> unseqResources) throws IOException {
if (!config.isEnableCompactionMemControl()) {
return 0;
}
List<TsFileResource> resources = new ArrayList<>(seqResources.size() + unseqResources.size());
resources.addAll(seqResources);
resources.addAll(unseqResources);
if (!CompactionEstimateUtils.addReadLock(resources)) {
return -1L;
}
long cost = 0;
try {
CompactionTaskInfo taskInfo = calculatingCompactionTaskInfo(resources);
cost += calculatingMetadataMemoryCost(taskInfo);
cost += calculatingDataMemoryCost(taskInfo);
} finally {
CompactionEstimateUtils.releaseReadLock(resources);
}
return cost;
}
}
......@@ -19,14 +19,9 @@
package org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
......@@ -34,78 +29,14 @@ import java.util.List;
* its corresponding implementation.
*/
public abstract class AbstractInnerSpaceEstimator extends AbstractCompactionEstimator {
protected IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
public long estimateInnerCompactionMemory(List<TsFileResource> resources) throws IOException {
InnerCompactionTaskInfo taskInfo = calculatingCompactionTaskInfo(resources);
if (!config.isEnableCompactionMemControl()) {
return 0;
}
CompactionTaskInfo taskInfo = calculatingCompactionTaskInfo(resources);
long cost = calculatingMetadataMemoryCost(taskInfo);
cost += calculatingDataMemoryCost(taskInfo);
return cost;
}
public abstract long calculatingMetadataMemoryCost(InnerCompactionTaskInfo taskInfo);
public abstract long calculatingDataMemoryCost(InnerCompactionTaskInfo taskInfo);
public long estimateCrossCompactionMemory(
List<TsFileResource> seqResources, TsFileResource unseqResource) throws IOException {
throw new RuntimeException(
"This kind of estimator cannot be used to estimate cross space compaction task");
}
protected InnerCompactionTaskInfo calculatingCompactionTaskInfo(List<TsFileResource> resources)
throws IOException {
List<FileInfo> fileInfoList = new ArrayList<>();
for (TsFileResource resource : resources) {
TsFileSequenceReader reader = getFileReader(resource);
FileInfo fileInfo = CompactionEstimateUtils.getSeriesAndDeviceChunkNum(reader);
fileInfoList.add(fileInfo);
}
return new InnerCompactionTaskInfo(resources, fileInfoList);
}
protected static class InnerCompactionTaskInfo {
private final List<FileInfo> fileInfoList;
private int maxConcurrentSeriesNum = 1;
private long maxChunkMetadataSize = 0;
private int maxChunkMetadataNumInDevice = 0;
private long modificationFileSize = 0;
protected InnerCompactionTaskInfo(List<TsFileResource> resources, List<FileInfo> fileInfoList) {
this.fileInfoList = fileInfoList;
for (TsFileResource resource : resources) {
ModificationFile modificationFile = resource.getModFile();
if (modificationFile.exists()) {
modificationFileSize += modificationFile.getSize();
}
}
for (FileInfo fileInfo : fileInfoList) {
maxConcurrentSeriesNum =
Math.max(maxConcurrentSeriesNum, fileInfo.maxAlignedSeriesNumInDevice);
maxChunkMetadataNumInDevice =
Math.max(maxChunkMetadataNumInDevice, fileInfo.maxDeviceChunkNum);
maxChunkMetadataSize = Math.max(maxChunkMetadataSize, fileInfo.averageChunkMetadataSize);
}
}
public int getMaxChunkMetadataNumInDevice() {
return maxChunkMetadataNumInDevice;
}
public long getMaxChunkMetadataSize() {
return maxChunkMetadataSize;
}
public List<FileInfo> getFileInfoList() {
return fileInfoList;
}
public int getMaxConcurrentSeriesNum() {
return maxConcurrentSeriesNum;
}
public long getModificationFileSize() {
return modificationFileSize;
}
}
}
......@@ -43,8 +43,7 @@ public class CompactionEstimateUtils {
*
* @throws IOException if io errors occurred
*/
public static FileInfo getSeriesAndDeviceChunkNum(TsFileSequenceReader reader)
throws IOException {
public static FileInfo calculateFileInfo(TsFileSequenceReader reader) throws IOException {
int totalChunkNum = 0;
int maxChunkNum = 0;
int maxAlignedSeriesNumInDevice = -1;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator;
import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import java.util.List;
public class CompactionTaskInfo {
private final List<FileInfo> fileInfoList;
private final List<TsFileResource> resources;
private int maxConcurrentSeriesNum = 1;
private long maxChunkMetadataSize = 0;
private int maxChunkMetadataNumInDevice = 0;
private int maxChunkMetadataNumInSeries = 0;
private long modificationFileSize = 0;
private long totalFileSize = 0;
private long totalChunkNum = 0;
private long totalChunkMetadataSize = 0;
protected CompactionTaskInfo(List<TsFileResource> resources, List<FileInfo> fileInfoList) {
this.fileInfoList = fileInfoList;
this.resources = resources;
for (TsFileResource resource : resources) {
ModificationFile modificationFile = resource.getModFile();
if (modificationFile.exists()) {
modificationFileSize += modificationFile.getSize();
}
this.totalFileSize += resource.getTsFileSize();
}
for (FileInfo fileInfo : fileInfoList) {
maxConcurrentSeriesNum =
Math.max(maxConcurrentSeriesNum, fileInfo.maxAlignedSeriesNumInDevice);
maxChunkMetadataNumInSeries =
Math.max(maxChunkMetadataNumInSeries, fileInfo.maxSeriesChunkNum);
maxChunkMetadataNumInDevice =
Math.max(maxChunkMetadataNumInDevice, fileInfo.maxDeviceChunkNum);
maxChunkMetadataSize = Math.max(maxChunkMetadataSize, fileInfo.averageChunkMetadataSize);
totalChunkNum += fileInfo.totalChunkNum;
totalChunkMetadataSize += fileInfo.totalChunkNum * fileInfo.averageChunkMetadataSize;
}
}
public int getMaxChunkMetadataNumInDevice() {
return maxChunkMetadataNumInDevice;
}
public int getMaxChunkMetadataNumInSeries() {
return maxChunkMetadataNumInSeries;
}
public long getMaxChunkMetadataSize() {
return maxChunkMetadataSize;
}
public List<FileInfo> getFileInfoList() {
return fileInfoList;
}
public int getMaxConcurrentSeriesNum() {
return maxConcurrentSeriesNum;
}
public long getModificationFileSize() {
return modificationFileSize;
}
public long getTotalFileSize() {
return totalFileSize;
}
public long getTotalChunkNum() {
return totalChunkNum;
}
public List<TsFileResource> getResources() {
return resources;
}
public long getTotalChunkMetadataSize() {
return totalChunkMetadataSize;
}
}
......@@ -19,34 +19,62 @@
package org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
import java.io.IOException;
public class FastCompactionInnerCompactionEstimator extends AbstractInnerSpaceEstimator {
/**
* The metadata algorithm is: maxChunkMetaDataSize * maxChunkNumber * fileSize * maxSeriesNumber
*
* @return estimate metadata memory cost
*/
@Override
public long calculatingMetadataMemoryCost(InnerCompactionTaskInfo taskInfo) {
return taskInfo.getFileInfoList().size()
* taskInfo.getMaxChunkMetadataNumInDevice()
* taskInfo.getMaxChunkMetadataSize()
* Math.max(config.getSubCompactionTaskNum(), taskInfo.getMaxConcurrentSeriesNum());
public long calculatingMetadataMemoryCost(CompactionTaskInfo taskInfo) {
long cost = 0;
// add ChunkMetadata size of MultiTsFileDeviceIterator
cost +=
Math.min(
taskInfo.getTotalChunkMetadataSize(),
taskInfo.getFileInfoList().size()
* taskInfo.getMaxChunkMetadataNumInDevice()
* taskInfo.getMaxChunkMetadataSize());
// add ChunkMetadata size of targetFileWriter
long sizeForFileWriter =
(long)
((double) SystemInfo.getInstance().getMemorySizeForCompaction()
/ IoTDBDescriptor.getInstance().getConfig().getCompactionThreadCount()
* IoTDBDescriptor.getInstance().getConfig().getChunkMetadataSizeProportion());
cost += sizeForFileWriter;
return cost;
}
/**
* The data algorithm is: (targetChunkSize * fileSize * compressionRatio * maxSeriesNumber) +
* modsFileSize
*
* @return estimate data memory cost
*/
@Override
public long calculatingDataMemoryCost(InnerCompactionTaskInfo taskInfo) {
long cost =
config.getTargetChunkSize()
public long calculatingDataMemoryCost(CompactionTaskInfo taskInfo) throws IOException {
if (taskInfo.getTotalChunkNum() == 0) {
return taskInfo.getModificationFileSize();
}
long maxConcurrentSeriesNum =
Math.max(config.getSubCompactionTaskNum(), taskInfo.getMaxConcurrentSeriesNum());
long averageUncompressedChunkSize =
taskInfo.getTotalFileSize() * compressionRatio / taskInfo.getTotalChunkNum();
long maxConcurrentSeriesSizeOfTotalFiles =
averageUncompressedChunkSize
* taskInfo.getFileInfoList().size()
* Math.max(config.getSubCompactionTaskNum(), taskInfo.getMaxConcurrentSeriesNum());
cost += taskInfo.getModificationFileSize();
return cost;
* maxConcurrentSeriesNum
* taskInfo.getMaxChunkMetadataNumInSeries()
/ compressionRatio;
long maxTargetChunkWriterSize = config.getTargetChunkSize() * maxConcurrentSeriesNum;
long targetChunkWriterSize =
Math.min(maxConcurrentSeriesSizeOfTotalFiles, maxTargetChunkWriterSize);
long maxConcurrentChunkSizeFromSourceFile =
averageUncompressedChunkSize
* maxConcurrentSeriesNum
* calculatingMaxOverlapFileNumInSubCompactionTask(taskInfo.getResources());
return targetChunkWriterSize
+ maxConcurrentChunkSizeFromSourceFile
+ taskInfo.getModificationFileSize();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
import java.io.IOException;
public class FastCrossSpaceCompactionEstimator extends AbstractCrossSpaceEstimator {
@Override
protected long calculatingMetadataMemoryCost(CompactionTaskInfo taskInfo) {
long cost = 0;
// add ChunkMetadata size of MultiTsFileDeviceIterator
cost +=
Math.min(
taskInfo.getTotalChunkMetadataSize(),
taskInfo.getFileInfoList().size()
* taskInfo.getMaxChunkMetadataNumInDevice()
* taskInfo.getMaxChunkMetadataSize());
// add ChunkMetadata size of targetFileWriter
long sizeForFileWriter =
(long)
((double) SystemInfo.getInstance().getMemorySizeForCompaction()
/ IoTDBDescriptor.getInstance().getConfig().getCompactionThreadCount()
* IoTDBDescriptor.getInstance().getConfig().getChunkMetadataSizeProportion());
cost += sizeForFileWriter;
return cost;
}
@Override
protected long calculatingDataMemoryCost(CompactionTaskInfo taskInfo) throws IOException {
if (taskInfo.getTotalChunkNum() == 0) {
return taskInfo.getModificationFileSize();
}
long maxConcurrentSeriesNum =
Math.max(config.getSubCompactionTaskNum(), taskInfo.getMaxConcurrentSeriesNum());
long averageUncompressedChunkSize =
taskInfo.getTotalFileSize() * compressionRatio / taskInfo.getTotalChunkNum();
long maxConcurrentSeriesSizeOfTotalFiles =
averageUncompressedChunkSize
* taskInfo.getFileInfoList().size()
* maxConcurrentSeriesNum
* taskInfo.getMaxChunkMetadataNumInSeries()
/ compressionRatio;
long maxTargetChunkWriterSize = config.getTargetChunkSize() * maxConcurrentSeriesNum;
long targetChunkWriterSize =
Math.min(maxConcurrentSeriesSizeOfTotalFiles, maxTargetChunkWriterSize);
long maxConcurrentChunkSizeFromSourceFile =
averageUncompressedChunkSize
* maxConcurrentSeriesNum
* calculatingMaxOverlapFileNumInSubCompactionTask(taskInfo.getResources());
return targetChunkWriterSize
+ maxConcurrentChunkSizeFromSourceFile
+ taskInfo.getModificationFileSize();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class InplaceCompactionEstimator extends AbstractCrossSpaceEstimator {
private static final Logger logger = LoggerFactory.getLogger(InplaceCompactionEstimator.class);
private static final String LOG_FILE_COST = "Memory cost of file {} is {}";
private boolean tightEstimate;
private long maxSeqFileCost;
// the number of timeseries being compacted at the same time
private final int concurrentSeriesNum =
IoTDBDescriptor.getInstance().getConfig().getSubCompactionTaskNum();
/** Total metadata size of each file. */
private final Map<TsFileResource, Long> fileMetaSizeMap = new HashMap<>();
/** Maximum memory cost of querying a timeseries in each file. */
private final Map<TsFileResource, Long> maxSeriesQueryCostMap = new HashMap<>();
public InplaceCompactionEstimator() {
this.tightEstimate = false;
this.maxSeqFileCost = 0;
}
@Override
public long estimateCrossCompactionMemory(
List<TsFileResource> seqResources, TsFileResource unseqResource) throws IOException {
if (tightEstimate) {
return calculateTightMemoryCost(unseqResource, seqResources);
} else {
return calculateLooseMemoryCost(unseqResource, seqResources);
}
}
private long calculateMemoryCost(
TsFileResource unseqResource,
List<TsFileResource> seqResources,
IFileQueryMemMeasurement unseqMeasurement,
IFileQueryMemMeasurement seqMeasurement)
throws IOException {
long cost = 0;
Long fileCost = unseqMeasurement.measure(unseqResource);
cost += fileCost;
for (TsFileResource seqFile : seqResources) {
fileCost = seqMeasurement.measure(seqFile);
if (fileCost > maxSeqFileCost) {
// only one file will be read at the same time, so only the largest one is recorded here
cost -= maxSeqFileCost;
cost += fileCost;
maxSeqFileCost = fileCost;
}
// but writing data into a new file may generate the same amount of metadata in memory
cost += calculateMetadataSize(seqFile);
}
return cost;
}
private long calculateLooseMemoryCost(
TsFileResource unseqResource, List<TsFileResource> seqResources) throws IOException {
return calculateMemoryCost(
unseqResource, seqResources, TsFileResource::getTsFileSize, this::calculateMetadataSize);
}
private long calculateTightMemoryCost(
TsFileResource unseqResource, List<TsFileResource> seqResources) throws IOException {
return calculateMemoryCost(
unseqResource,
seqResources,
this::calculateTightUnseqMemoryCost,
this::calculateTightSeqMemoryCost);
}
private long calculateMetadataSize(TsFileResource seqFile) throws IOException {
Long cost = fileMetaSizeMap.get(seqFile);
if (cost == null) {
cost = getFileReader(seqFile).getFileMetadataSize();
fileMetaSizeMap.put(seqFile, cost);
logger.debug(LOG_FILE_COST, seqFile, cost);
}
return cost;
}
private long calculateTightFileMemoryCost(
TsFileResource seqFile, IFileQueryMemMeasurement measurement) throws IOException {
Long cost = maxSeriesQueryCostMap.get(seqFile);
if (cost == null) {
long[] chunkNums = findTotalAndLargestSeriesChunkNum(seqFile, getFileReader(seqFile));
long totalChunkNum = chunkNums[0];
long maxChunkNum = chunkNums[1];
cost = measurement.measure(seqFile) * maxChunkNum / totalChunkNum;
maxSeriesQueryCostMap.put(seqFile, cost);
logger.debug(LOG_FILE_COST, seqFile, cost);
}
return cost;
}
// this method traverses all ChunkMetadata to find out which series has the most chunks and uses
// its proportion to all series to get a maximum estimation
private long calculateTightSeqMemoryCost(TsFileResource seqFile) throws IOException {
long singleSeriesCost = calculateTightFileMemoryCost(seqFile, this::calculateMetadataSize);
long multiSeriesCost = concurrentSeriesNum * singleSeriesCost;
long maxCost = calculateMetadataSize(seqFile);
return Math.min(multiSeriesCost, maxCost);
}
// this method traverses all ChunkMetadata to find out which series has the most chunks and uses
// its proportion among all series to get a maximum estimation
private long calculateTightUnseqMemoryCost(TsFileResource unseqFile) throws IOException {
long singleSeriesCost = calculateTightFileMemoryCost(unseqFile, TsFileResource::getTsFileSize);
long multiSeriesCost = concurrentSeriesNum * singleSeriesCost;
long maxCost = unseqFile.getTsFileSize();
return Math.min(multiSeriesCost, maxCost);
}
// returns totalChunkNum of a file and the max number of chunks of a series
private long[] findTotalAndLargestSeriesChunkNum(
TsFileResource tsFileResource, TsFileSequenceReader sequenceReader) throws IOException {
long totalChunkNum = 0;
long maxChunkNum = Long.MIN_VALUE;
List<Path> paths = sequenceReader.getAllPaths();
for (Path path : paths) {
List<ChunkMetadata> chunkMetadataList = sequenceReader.getChunkMetadataList(path, true);
totalChunkNum += chunkMetadataList.size();
maxChunkNum = chunkMetadataList.size() > maxChunkNum ? chunkMetadataList.size() : maxChunkNum;
}
logger.debug(
"In file {}, total chunk num {}, series max chunk num {}",
tsFileResource,
totalChunkNum,
maxChunkNum);
return new long[] {totalChunkNum, maxChunkNum};
}
public void setTightEstimate(boolean tightEstimate) {
this.tightEstimate = tightEstimate;
}
}
......@@ -25,13 +25,15 @@ import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
public class ReadChunkInnerCompactionEstimator extends AbstractInnerSpaceEstimator {
@Override
public long calculatingMetadataMemoryCost(InnerCompactionTaskInfo taskInfo) {
public long calculatingMetadataMemoryCost(CompactionTaskInfo taskInfo) {
long cost = 0;
// add ChunkMetadata size of MultiTsFileDeviceIterator
cost +=
taskInfo.getFileInfoList().size()
* taskInfo.getMaxChunkMetadataNumInDevice()
* taskInfo.getMaxChunkMetadataSize();
Math.min(
taskInfo.getTotalChunkMetadataSize(),
taskInfo.getFileInfoList().size()
* taskInfo.getMaxChunkMetadataNumInDevice()
* taskInfo.getMaxChunkMetadataSize());
// add ChunkMetadata size of targetFileWriter
long sizeForFileWriter =
......@@ -45,15 +47,27 @@ public class ReadChunkInnerCompactionEstimator extends AbstractInnerSpaceEstimat
}
@Override
public long calculatingDataMemoryCost(InnerCompactionTaskInfo taskInfo) {
// add max target chunk size and max source chunk size
long cost =
2
public long calculatingDataMemoryCost(CompactionTaskInfo taskInfo) {
if (taskInfo.getTotalChunkNum() == 0) {
return taskInfo.getModificationFileSize();
}
long averageUncompressedChunkSize =
taskInfo.getTotalFileSize() * compressionRatio / taskInfo.getTotalChunkNum();
long maxConcurrentSeriesSizeOfTotalFiles =
averageUncompressedChunkSize
* taskInfo.getFileInfoList().size()
* taskInfo.getMaxConcurrentSeriesNum()
* IoTDBDescriptor.getInstance().getConfig().getTargetChunkSize();
* taskInfo.getMaxChunkMetadataNumInSeries()
/ compressionRatio;
long maxTargetChunkWriterSize =
config.getTargetChunkSize() * taskInfo.getMaxConcurrentSeriesNum();
long targetChunkWriterSize =
Math.min(maxConcurrentSeriesSizeOfTotalFiles, maxTargetChunkWriterSize);
// add modification file size
cost += taskInfo.getModificationFileSize();
return cost;
long chunkSizeFromSourceFile =
averageUncompressedChunkSize * taskInfo.getMaxConcurrentSeriesNum();
return targetChunkWriterSize + chunkSizeFromSourceFile + taskInfo.getModificationFileSize();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class ReadPointCrossCompactionEstimator extends AbstractCrossSpaceEstimator {
private static final Logger logger =
LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME);
// the max cost of reading source seq file among all source seq files of this cross compaction
// task
private long maxCostOfReadingSeqFile;
// the max cost of writing target file
private long maxCostOfWritingTargetFile;
private int maxConcurrentSeriesNum = 1;
// the number of timeseries being compacted at the same time
private final int subCompactionTaskNum =
IoTDBDescriptor.getInstance().getConfig().getSubCompactionTaskNum();
public ReadPointCrossCompactionEstimator() {
this.maxCostOfReadingSeqFile = 0;
this.maxCostOfWritingTargetFile = 0;
}
@Override
public long estimateCrossCompactionMemory(
List<TsFileResource> seqResources, TsFileResource unseqResource) throws IOException {
if (!addReadLock(seqResources, unseqResource)) {
// there is file been deleted during selection, return -1
return -1L;
}
try {
long cost = 0;
cost += calculateReadingUnseqFile(unseqResource);
cost += calculateReadingSeqFiles(seqResources);
cost += calculatingWritingTargetFiles(seqResources, unseqResource);
return cost;
} finally {
releaseReadLock(seqResources, unseqResource);
}
}
/** Add read lock. Return false if any of the file were deleted. */
private boolean addReadLock(List<TsFileResource> seqResources, TsFileResource unseqResource) {
List<TsFileResource> allResources = new ArrayList<>(seqResources);
allResources.add(unseqResource);
return CompactionEstimateUtils.addReadLock(allResources);
}
private void releaseReadLock(List<TsFileResource> seqResources, TsFileResource unseqResource) {
seqResources.forEach(TsFileResource::readUnlock);
unseqResource.readUnlock();
}
/**
* Calculate memory cost of reading source unseq files in the cross space compaction. Double the
* total size of the timeseries to be compacted at the same time in all unseq files.
*
* @throws IOException if io errors occurred
*/
private long calculateReadingUnseqFile(TsFileResource unseqResource) throws IOException {
TsFileSequenceReader reader = getFileReader(unseqResource);
FileInfo fileInfo = CompactionEstimateUtils.getSeriesAndDeviceChunkNum(reader);
// it is max aligned series num of one device when tsfile contains aligned series,
// else is sub compaction task num.
int concurrentSeriesNum =
fileInfo.maxAlignedSeriesNumInDevice == -1
? subCompactionTaskNum
: fileInfo.maxAlignedSeriesNumInDevice;
maxConcurrentSeriesNum = Math.max(maxConcurrentSeriesNum, concurrentSeriesNum);
if (fileInfo.totalChunkNum == 0) { // If totalChunkNum ==0, i.e. this unSeq tsFile has no chunk.
logger.warn(
"calculateReadingUnseqFile(), find 1 empty unSeq tsFile: {}.",
unseqResource.getTsFilePath());
return 0;
}
// it means the max size of a timeseries in this file when reading all of its chunk into memory.
long resourceFileSize =
compressionRatio
* concurrentSeriesNum
* (unseqResource.getTsFileSize() * fileInfo.maxSeriesChunkNum / fileInfo.totalChunkNum);
// add mod file size
long modFileSize = unseqResource.getModFile().getSize();
return resourceFileSize + modFileSize;
}
/**
* Calculate memory cost of reading source seq files in the cross space compaction. Select the
* maximun size of the timeseries to be compacted at the same time in one seq file, because only
* one seq file will be queried at the same time.
*
* @throws IOException if io errors occurred
*/
private long calculateReadingSeqFiles(List<TsFileResource> seqResources) throws IOException {
long cost = 0;
for (TsFileResource seqResource : seqResources) {
TsFileSequenceReader reader = getFileReader(seqResource);
FileInfo fileInfo = CompactionEstimateUtils.getSeriesAndDeviceChunkNum(reader);
// it is max aligned series num of one device when tsfile contains aligned series,
// else is sub compaction task num.
int concurrentSeriesNum =
fileInfo.maxAlignedSeriesNumInDevice == -1
? subCompactionTaskNum
: fileInfo.maxAlignedSeriesNumInDevice;
maxConcurrentSeriesNum = Math.max(maxConcurrentSeriesNum, concurrentSeriesNum);
long seqFileCost;
if (fileInfo.totalChunkNum == 0) { // If totalChunkNum ==0, i.e. this seq tsFile has no chunk.
logger.warn(
"calculateReadingSeqFiles(), find 1 empty seq tsFile: {}.",
seqResource.getTsFilePath());
seqFileCost = 0;
} else {
// We need to multiply the compression ratio here.
seqFileCost =
compressionRatio
* seqResource.getTsFileSize()
* concurrentSeriesNum
/ fileInfo.totalChunkNum;
}
if (seqFileCost > maxCostOfReadingSeqFile) {
// Only one seq file will be read at the same time.
// not only reading chunk into chunk cache, but also need to deserialize data point into
// merge reader. We have to add the cost in merge reader here and the cost of chunk cache is
// unnecessary.
cost -= maxCostOfReadingSeqFile;
cost += seqFileCost;
maxCostOfReadingSeqFile = seqFileCost;
}
// add mod file size
cost += seqResource.getModFile().getSize();
}
return cost;
}
/**
* Calculate memory cost of writing target files in the cross space compaction. Including metadata
* size of all source files and size of concurrent target chunks.
*
* @throws IOException if io errors occurred
*/
private long calculatingWritingTargetFiles(
List<TsFileResource> seqResources, TsFileResource unseqResource) throws IOException {
long cost = 0;
for (TsFileResource seqResource : seqResources) {
TsFileSequenceReader reader = getFileReader(seqResource);
// add seq file metadata size
cost += reader.getFileMetadataSize();
}
// add unseq file metadata size
cost += getFileReader(unseqResource).getFileMetadataSize();
// concurrent series chunk size
long writingTargetCost = maxConcurrentSeriesNum * config.getTargetChunkSize();
if (writingTargetCost > maxCostOfWritingTargetFile) {
cost -= maxCostOfWritingTargetFile;
cost += writingTargetCost;
maxCostOfWritingTargetFile = writingTargetCost;
}
return cost;
}
}
......@@ -26,7 +26,7 @@ import org.apache.iotdb.db.exception.MergeException;
import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager;
import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.ICompactionSelector;
import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.ICrossSpaceSelector;
import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.AbstractCompactionEstimator;
import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.AbstractCrossSpaceEstimator;
import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.utils.CrossCompactionTaskResource;
import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.utils.CrossSpaceCompactionCandidate;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
......@@ -38,6 +38,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
......@@ -58,7 +59,7 @@ public class RewriteCrossSpaceCompactionSelector implements ICrossSpaceSelector
private final int maxCrossCompactionFileNum;
private final long maxCrossCompactionFileSize;
private AbstractCompactionEstimator compactionEstimator;
private final AbstractCrossSpaceEstimator compactionEstimator;
public RewriteCrossSpaceCompactionSelector(
String logicalStorageGroupName,
......@@ -80,8 +81,9 @@ public class RewriteCrossSpaceCompactionSelector implements ICrossSpaceSelector
IoTDBDescriptor.getInstance().getConfig().getMaxCrossCompactionCandidateFileSize();
this.compactionEstimator =
ICompactionSelector.getCompactionEstimator(
IoTDBDescriptor.getInstance().getConfig().getCrossCompactionPerformer(), false);
(AbstractCrossSpaceEstimator)
ICompactionSelector.getCompactionEstimator(
IoTDBDescriptor.getInstance().getConfig().getCrossCompactionPerformer(), false);
}
/**
......@@ -173,8 +175,15 @@ public class RewriteCrossSpaceCompactionSelector implements ICrossSpaceSelector
}
}
List<TsFileResource> newSelectedSeqResources = new ArrayList<>(taskResource.getSeqFiles());
newSelectedSeqResources.addAll(targetSeqFiles);
List<TsFileResource> newSelectedUnseqResources =
new ArrayList<>(taskResource.getUnseqFiles());
newSelectedUnseqResources.add(unseqFile);
long memoryCost =
compactionEstimator.estimateCrossCompactionMemory(targetSeqFiles, unseqFile);
compactionEstimator.estimateCrossCompactionMemory(
newSelectedSeqResources, newSelectedUnseqResources);
if (!canAddToTaskResource(taskResource, unseqFile, targetSeqFiles, memoryCost)) {
break;
}
......@@ -254,7 +263,7 @@ public class RewriteCrossSpaceCompactionSelector implements ICrossSpaceSelector
return taskResource.getTotalFileNums() + 1 + seqFiles.size() <= maxCrossCompactionFileNum
&& taskResource.getTotalFileSize() + totalFileSize <= maxCrossCompactionFileSize
&& taskResource.getTotalMemoryCost() + memoryCost < memoryBudget;
&& memoryCost < memoryBudget;
}
private boolean canSubmitCrossTask(
......
......@@ -69,7 +69,7 @@ public class CrossCompactionTaskResource {
TsFileResource unseqFile, List<TsFileResource> seqFiles, long memoryCost) {
addUnseqFile(unseqFile);
addTargetSeqFiles(seqFiles);
increaseMemoryCost(memoryCost);
updateMemoryCost(memoryCost);
}
private void addUnseqFile(TsFileResource file) {
......@@ -88,8 +88,8 @@ public class CrossCompactionTaskResource {
countStatistic(file);
}
private void increaseMemoryCost(long newMemoryCost) {
this.totalMemoryCost += newMemoryCost;
private void updateMemoryCost(long newMemoryCost) {
this.totalMemoryCost = Math.max(totalMemoryCost, newMemoryCost);
}
private void countStatistic(TsFileResource file) {
......
......@@ -242,6 +242,9 @@ public class SystemInfo {
}
public synchronized void resetCompactionMemoryCost(long compactionMemoryCost) {
if (!config.isEnableCompactionMemControl()) {
return;
}
this.compactionMemoryCost.addAndGet(-compactionMemoryCost);
}
......
......@@ -101,7 +101,6 @@ public class CompactionSchedulerTest {
.getConfig()
.setInnerUnseqCompactionPerformer(InnerUnseqCompactionPerformer.READ_POINT);
IoTDBDescriptor.getInstance().getConfig().setMinCrossCompactionUnseqFileLevel(0);
IoTDBDescriptor.getInstance().getConfig().setEnableCompactionMemControl(false);
CompactionTaskManager.getInstance().start();
while (CompactionTaskManager.getInstance().getExecutingTaskCount() > 0) {
try {
......
......@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.storageengine.dataregion.compaction.AbstractCompactionTest;
import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.FastCompactionInnerCompactionEstimator;
import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.FastCrossSpaceCompactionEstimator;
import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.ReadChunkInnerCompactionEstimator;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
......@@ -72,12 +73,12 @@ public class CompactionTaskMemCostEstimatorTest extends AbstractCompactionTest {
@Test
public void testEstimateFastCompactionInnerSpaceCompactionTaskMemCost()
throws IOException, MetadataException, WriteProcessException {
createFiles(3, 10, 5, 100000, 0, 0, 50, 50, true, true);
tsFileManager.addAll(seqResources, true);
List<TsFileResource> tsFileList = tsFileManager.getTsFileList(true);
System.out.println(tsFileList.get(0).getTsFile().getAbsolutePath());
createFiles(5, 10, 5, 10000, 0, 0, 50, 50, true, false);
createFiles(10, 4, 5, 10000, 1000, 0, 30, 90, true, false);
tsFileManager.addAll(unseqResources, false);
long cost =
new FastCompactionInnerCompactionEstimator().estimateInnerCompactionMemory(tsFileList);
new FastCompactionInnerCompactionEstimator().estimateInnerCompactionMemory(unseqResources);
Assert.assertTrue(cost > 0);
}
......@@ -91,4 +92,15 @@ public class CompactionTaskMemCostEstimatorTest extends AbstractCompactionTest {
new FastCompactionInnerCompactionEstimator().estimateInnerCompactionMemory(tsFileList);
Assert.assertTrue(cost > 0);
}
@Test
public void testEstimateFastCompactionCrossSpaceCompactionTaskMemCost1()
throws IOException, MetadataException, WriteProcessException {
createFiles(3, 10, 5, 100, 0, 0, 50, 50, false, true);
createFiles(4, 10, 5, 400, 0, 0, 30, 50, false, false);
long cost =
new FastCrossSpaceCompactionEstimator()
.estimateCrossCompactionMemory(seqResources, unseqResources);
Assert.assertTrue(cost > 0);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册