未验证 提交 ec700a1e 编写于 作者: Z Zhijia Cao 提交者: GitHub

added the selection of files to be merged based on mods file size in the inner...

added the selection of files to be merged based on mods file size in the inner compaction selector (#10480)
上级 142cfbbc
......@@ -47,6 +47,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
public class InnerSpaceCompactionTask extends AbstractCompactionTask {
......@@ -67,6 +68,8 @@ public class InnerSpaceCompactionTask extends AbstractCompactionTask {
protected boolean[] isHoldingReadLock;
protected boolean[] isHoldingWriteLock;
protected long maxModsFileSize;
public InnerSpaceCompactionTask(
long timePartition,
TsFileManager tsFileManager,
......@@ -342,6 +345,7 @@ public class InnerSpaceCompactionTask extends AbstractCompactionTask {
sumOfCompactionCount = 0;
maxFileVersion = -1L;
maxCompactionCount = -1;
maxModsFileSize = 0;
if (selectedTsFileResourceList == null) {
return;
}
......@@ -357,6 +361,10 @@ public class InnerSpaceCompactionTask extends AbstractCompactionTask {
if (fileName.getVersion() > maxFileVersion) {
maxFileVersion = fileName.getVersion();
}
if (!Objects.isNull(resource.getModFile())) {
long modsFileSize = resource.getModFile().getSize();
maxModsFileSize = Math.max(maxModsFileSize, modsFileSize);
}
} catch (IOException e) {
LOGGER.warn("Fail to get the tsfile name of {}", resource.getTsFile(), e);
}
......@@ -383,6 +391,10 @@ public class InnerSpaceCompactionTask extends AbstractCompactionTask {
return maxFileVersion;
}
public long getMaxModsFileSize() {
return maxModsFileSize;
}
@Override
public String toString() {
return storageGroupName
......
......@@ -63,6 +63,13 @@ public class DefaultCompactionTaskComparatorImpl implements ICompactionTaskCompa
public int compareInnerSpaceCompactionTask(
InnerSpaceCompactionTask o1, InnerSpaceCompactionTask o2) {
// if max mods file size of o1 and o2 are different
// we prefer to execute task with greater mods file
if (o1.getMaxModsFileSize() != o2.getMaxModsFileSize()) {
return o2.getMaxModsFileSize() > o1.getMaxModsFileSize() ? 1 : -1;
}
// if the sum of compaction count of the selected files are different
// we prefer to execute task with smaller compaction count
// this can reduce write amplification
......
......@@ -26,6 +26,7 @@ import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.Compacti
import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.comparator.ICompactionTaskComparator;
import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.IInnerSeqSpaceSelector;
import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.IInnerUnseqSpaceSelector;
import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
......@@ -41,6 +42,7 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.PriorityQueue;
/**
......@@ -65,6 +67,7 @@ public class SizeTieredCompactionSelector
protected boolean sequence;
protected TsFileManager tsFileManager;
protected boolean hasNextTimePartition;
private static final long MODS_FILE_SIZE_THRESHOLD = 1024 * 1024 * 50L;
public SizeTieredCompactionSelector(
String storageGroupName,
......@@ -89,28 +92,23 @@ public class SizeTieredCompactionSelector
* longer search for higher layers), otherwise it will return true.
*
* @param level the level to be searched
* @param taskPriorityQueue it stores the batches of files to be compacted and the total size of
* each batch
* @return return whether to continue the search to higher levels
* @throws IOException if the name of tsfile is incorrect
*/
@SuppressWarnings({"squid:S3776", "squid:S135"})
private boolean selectLevelTask(
int level, PriorityQueue<Pair<List<TsFileResource>, Long>> taskPriorityQueue)
throws IOException {
boolean shouldContinueToSearch = true;
private List<Pair<List<TsFileResource>, Long>> selectSingleLevel(int level) throws IOException {
List<TsFileResource> selectedFileList = new ArrayList<>();
long selectedFileSize = 0L;
long targetCompactionFileSize = config.getTargetCompactionFileSize();
List<Pair<List<TsFileResource>, Long>> taskList = new ArrayList<>();
for (TsFileResource currentFile : tsFileResources) {
TsFileNameGenerator.TsFileName currentName =
TsFileNameGenerator.getTsFileName(currentFile.getTsFile().getName());
if (currentName.getInnerCompactionCnt() != level) {
// meet files of another level
if (selectedFileList.size() > 1) {
taskPriorityQueue.add(new Pair<>(new ArrayList<>(selectedFileList), selectedFileSize));
shouldContinueToSearch = false;
taskList.add(new Pair<>(new ArrayList<>(selectedFileList), selectedFileSize));
}
selectedFileList = new ArrayList<>();
selectedFileSize = 0L;
......@@ -134,8 +132,7 @@ public class SizeTieredCompactionSelector
|| selectedFileList.size() >= config.getFileLimitPerInnerTask()) {
// submit the task
if (selectedFileList.size() > 1) {
taskPriorityQueue.add(new Pair<>(new ArrayList<>(selectedFileList), selectedFileSize));
shouldContinueToSearch = false;
taskList.add(new Pair<>(new ArrayList<>(selectedFileList), selectedFileSize));
}
selectedFileList = new ArrayList<>();
selectedFileSize = 0L;
......@@ -145,17 +142,19 @@ public class SizeTieredCompactionSelector
// if next time partition exists
// submit a merge task even it does not meet the requirement for file num or file size
if (hasNextTimePartition && selectedFileList.size() > 1) {
taskPriorityQueue.add(new Pair<>(new ArrayList<>(selectedFileList), selectedFileSize));
shouldContinueToSearch = false;
taskList.add(new Pair<>(new ArrayList<>(selectedFileList), selectedFileSize));
}
return shouldContinueToSearch;
return taskList;
}
/**
* This method searches for a batch of files to be compacted from layer 0 to the highest layer. If
* there are more than a batch of files to be merged on a certain layer, it does not search to
* higher layers. It creates a compaction thread for each batch of files and put it into the
* candidateCompactionTaskQueue of the {@link CompactionTaskManager}.
* This method is used to select a batch of files to be merged. There are two ways to select
* files.If the first method selects the appropriate file, the second method is not executed. The
* first one is based on the mods file corresponding to the file. We will preferentially select
* file with mods file larger than 50M. The second way is based on the file layer from layer 0 to
* the highest layer. If there are more than a batch of files to be merged on a certain layer, it
* does not search to higher layers. It creates a compaction thread for each batch of files and
* put it into the candidateCompactionTaskQueue of the {@link CompactionTaskManager}.
*
* @return Returns whether the file was found and submits the merge task
*/
......@@ -165,12 +164,14 @@ public class SizeTieredCompactionSelector
PriorityQueue<Pair<List<TsFileResource>, Long>> taskPriorityQueue =
new PriorityQueue<>(new SizeTieredCompactionTaskComparator());
try {
int maxLevel = searchMaxFileLevel();
for (int currentLevel = 0; currentLevel <= maxLevel; currentLevel++) {
if (!selectLevelTask(currentLevel, taskPriorityQueue)) {
break;
}
// preferentially select files based on mods file size
taskPriorityQueue.addAll(selectMaxModsFileTask());
// if a suitable file is not selected in the first step, select the file at the tsfile level
if (taskPriorityQueue.isEmpty()) {
taskPriorityQueue.addAll(selectLevelTask());
}
List<List<TsFileResource>> taskList = new LinkedList<>();
while (!taskPriorityQueue.isEmpty()) {
List<TsFileResource> resources = taskPriorityQueue.poll().left;
......@@ -183,6 +184,32 @@ public class SizeTieredCompactionSelector
return Collections.emptyList();
}
private List<Pair<List<TsFileResource>, Long>> selectLevelTask() throws IOException {
List<Pair<List<TsFileResource>, Long>> taskList = new ArrayList<>();
int maxLevel = searchMaxFileLevel();
for (int currentLevel = 0; currentLevel <= maxLevel; currentLevel++) {
List<Pair<List<TsFileResource>, Long>> singleLevelTask = selectSingleLevel(currentLevel);
if (!singleLevelTask.isEmpty()) {
taskList.addAll(singleLevelTask);
break;
}
}
return taskList;
}
private List<Pair<List<TsFileResource>, Long>> selectMaxModsFileTask() {
List<Pair<List<TsFileResource>, Long>> taskList = new ArrayList<>();
for (TsFileResource tsFileResource : tsFileResources) {
ModificationFile modFile = tsFileResource.getModFile();
if (!Objects.isNull(modFile) && modFile.getSize() > MODS_FILE_SIZE_THRESHOLD) {
taskList.add(
new Pair<>(Collections.singletonList(tsFileResource), tsFileResource.getTsFileSize()));
LOGGER.debug("select tsfile {},the mod file size is {}", tsFileResource, modFile.getSize());
}
}
return taskList;
}
private int searchMaxFileLevel() throws IOException {
int maxLevel = -1;
for (TsFileResource currentFile : tsFileResources) {
......
......@@ -61,6 +61,8 @@ public class ModificationFile implements AutoCloseable {
private final SecureRandom random = new SecureRandom();
private static final long COMPACT_THRESHOLD = 1024 * 1024;
private boolean hasCompacted = false;
/**
* Construct a ModificationFile using a file as its storage.
*
......@@ -200,7 +202,7 @@ public class ModificationFile implements AutoCloseable {
public void compact() {
long originFileSize = getSize();
if (originFileSize > COMPACT_THRESHOLD) {
if (originFileSize > COMPACT_THRESHOLD && !hasCompacted) {
Map<String, List<Modification>> pathModificationMap =
getModifications().stream().collect(Collectors.groupingBy(Modification::getPathString));
String newModsFileName = filePath + COMPACT_SUFFIX;
......@@ -236,6 +238,7 @@ public class ModificationFile implements AutoCloseable {
} catch (IOException e) {
logger.error("remove origin file or rename new mods file error.", e);
}
hasCompacted = true;
}
}
......
......@@ -18,6 +18,8 @@
*/
package org.apache.iotdb.db.storageengine.dataregion.compaction;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.FastCompactionPerformer;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.ReadPointCompactionPerformer;
......@@ -28,19 +30,23 @@ import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.Compacti
import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.comparator.DefaultCompactionTaskComparatorImpl;
import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionPriority;
import org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionConfigRestorer;
import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.utils.datastructure.FixedPriorityBlockingQueue;
import com.google.common.collect.MinMaxPriorityQueue;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
......@@ -315,6 +321,40 @@ public class CompactionTaskComparatorTest {
}
}
@Test
public void testCompareByMaxModsFileSize()
throws InterruptedException, IllegalPathException, IOException {
for (int i = 0; i < 100; ++i) {
List<TsFileResource> resources = new ArrayList<>();
for (int j = i; j < 100; ++j) {
resources.add(
new FakedTsFileResource(new File(String.format("%d-%d-0-0.tsfile", i + j, i + j)), j));
}
FakedInnerSpaceCompactionTask innerTask =
new FakedInnerSpaceCompactionTask(
"fakeSg", 0, tsFileManager, taskNum, true, resources, 0);
compactionTaskQueue.put(innerTask);
}
String targetFileName = "101-101-0-0.tsfile";
FakedTsFileResource fakedTsFileResource =
new FakedTsFileResource(new File(targetFileName), 100);
fakedTsFileResource.getModFile().write(new Deletion(new PartialPath("root.test.d1"), 1, 1));
compactionTaskQueue.put(
new FakedInnerSpaceCompactionTask(
"fakeSg",
0,
tsFileManager,
taskNum,
true,
Collections.singletonList(fakedTsFileResource),
0));
FakedInnerSpaceCompactionTask task = (FakedInnerSpaceCompactionTask) compactionTaskQueue.take();
Assert.assertEquals(
targetFileName, task.getSelectedTsFileResourceList().get(0).getTsFile().getName());
fakedTsFileResource.getModFile().remove();
}
private static class FakedInnerSpaceCompactionTask extends InnerSpaceCompactionTask {
public FakedInnerSpaceCompactionTask(
......
......@@ -19,6 +19,7 @@
package org.apache.iotdb.db.storageengine.dataregion.compaction.inner;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.storageengine.dataregion.compaction.AbstractCompactionTest;
......@@ -26,6 +27,8 @@ import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.InnerSpaceCompactionTask;
import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager;
import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.impl.SizeTieredCompactionSelector;
import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion;
import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
import org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
......@@ -41,6 +44,8 @@ import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR;
public class InnerSpaceCompactionSelectorTest extends AbstractCompactionTest {
@Before
public void setUp()
......@@ -623,4 +628,32 @@ public class InnerSpaceCompactionSelectorTest extends AbstractCompactionTest {
Assert.fail();
}
}
@Test
public void testSelectWhenModsFileGreaterThan50M()
throws IOException, MetadataException, WriteProcessException {
createFiles(6, 2, 3, 50, 0, 10000, 50, 50, false, true);
tsFileManager.addAll(seqResources, true);
tsFileManager.addAll(unseqResources, false);
TsFileResource tsFileResource = seqResources.get(0);
ModificationFile modFile = tsFileResource.getModFile();
while (modFile.getSize() < 1024 * 1024 * 50) {
modFile.write(
new Deletion(
new PartialPath(COMPACTION_TEST_SG + PATH_SEPARATOR + "**"),
Long.MIN_VALUE,
Long.MAX_VALUE));
}
SizeTieredCompactionSelector selector =
new SizeTieredCompactionSelector("", "", 0, true, tsFileManager);
// copy candidate source file list
List<TsFileResource> resources = tsFileManager.getOrCreateSequenceListByTimePartition(0);
List<List<TsFileResource>> taskResource = selector.selectInnerSpaceTask(resources);
Assert.assertEquals(1, taskResource.size());
modFile.remove();
}
}
......@@ -33,6 +33,7 @@ public class FakedTsFileResource extends TsFileResource {
private String fakeTsfileName;
public FakedTsFileResource(long tsFileSize, String name) {
super(new File(name));
this.timeIndex = new FileTimeIndex();
this.tsFileSize = tsFileSize;
fakeTsfileName = name;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册