未验证 提交 9fc331b4 编写于 作者: zhanglingzhe0820's avatar zhanglingzhe0820 提交者: GitHub

[IOTDB-1294] Compaction mods for new mods structure (#3013)

* compaction mods for new mods structure

* fix ci
Co-authored-by: Nzhanglingzhe <surevil@foxmail.com>
上级 fc0af027
......@@ -346,6 +346,9 @@ public abstract class TsFileManagement {
seqFile.removeModFile();
if (mergingModification != null) {
for (Modification modification : mergingModification.getModifications()) {
// we have to set modification offset to MAX_VALUE, as the offset of source chunk may
// change after compaction
modification.setFileOffset(Long.MAX_VALUE);
seqFile.getModFile().write(modification);
}
try {
......
......@@ -115,6 +115,9 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
try (ModificationFile modificationFile =
new ModificationFile(targetTsFile.getTsFilePath() + ModificationFile.FILE_SUFFIX)) {
for (Modification modification : modifications) {
// we have to set modification offset to MAX_VALUE, as the offset of source chunk may
// change after compaction
modification.setFileOffset(Long.MAX_VALUE);
modificationFile.write(modification);
}
}
......
......@@ -75,19 +75,13 @@ public class CompactionUtils {
}
private static Pair<ChunkMetadata, Chunk> readByAppendMerge(
Map<TsFileSequenceReader, List<ChunkMetadata>> readerChunkMetadataMap,
Map<String, List<Modification>> modificationCache,
PartialPath seriesPath,
List<Modification> modifications)
throws IOException {
Map<TsFileSequenceReader, List<ChunkMetadata>> readerChunkMetadataMap) throws IOException {
ChunkMetadata newChunkMetadata = null;
Chunk newChunk = null;
for (Entry<TsFileSequenceReader, List<ChunkMetadata>> entry :
readerChunkMetadataMap.entrySet()) {
TsFileSequenceReader reader = entry.getKey();
List<ChunkMetadata> chunkMetadataList = entry.getValue();
modifyChunkMetaDataWithCache(
reader, chunkMetadataList, modificationCache, seriesPath, modifications);
for (ChunkMetadata chunkMetadata : chunkMetadataList) {
Chunk chunk = reader.readMemChunk(chunkMetadata);
if (newChunkMetadata == null) {
......@@ -133,16 +127,9 @@ public class CompactionUtils {
RateLimiter compactionWriteRateLimiter,
Entry<String, Map<TsFileSequenceReader, List<ChunkMetadata>>> entry,
TsFileResource targetResource,
RestorableTsFileIOWriter writer,
Map<String, List<Modification>> modificationCache,
List<Modification> modifications)
throws IOException, IllegalPathException {
Pair<ChunkMetadata, Chunk> chunkPair =
readByAppendMerge(
entry.getValue(),
modificationCache,
new PartialPath(device, entry.getKey()),
modifications);
RestorableTsFileIOWriter writer)
throws IOException {
Pair<ChunkMetadata, Chunk> chunkPair = readByAppendMerge(entry.getValue());
ChunkMetadata newChunkMetadata = chunkPair.left;
Chunk newChunk = chunkPair.right;
if (newChunkMetadata != null && newChunk != null) {
......@@ -356,9 +343,7 @@ public class CompactionUtils {
compactionWriteRateLimiter,
sensorReaderChunkMetadataListEntry,
targetResource,
writer,
modificationCache,
modifications);
writer);
} else {
logger.debug("{} [Compaction] page too small, use deserialize merge", storageGroup);
// we have to deserialize chunks to merge pages
......
......@@ -121,13 +121,7 @@ public class CompactionChunkTest extends LevelCompactionTest {
for (Entry<String, Map<TsFileSequenceReader, List<ChunkMetadata>>> entry :
measurementChunkMetadataMap.entrySet()) {
CompactionUtils.writeByAppendMerge(
device,
compactionWriteRateLimiter,
entry,
targetTsfileResource,
writer,
new HashMap<>(),
new ArrayList<>());
device, compactionWriteRateLimiter, entry, targetTsfileResource, writer);
}
reader.close();
}
......
......@@ -23,6 +23,10 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.constant.TestConstant;
import org.apache.iotdb.db.engine.compaction.TsFileManagement.CompactionMergeTask;
import org.apache.iotdb.db.engine.compaction.level.LevelCompactionTsFileManagement;
import org.apache.iotdb.db.engine.modification.Deletion;
import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
......@@ -42,6 +46,7 @@ import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import static org.junit.Assert.assertEquals;
......@@ -154,6 +159,72 @@ public class LevelCompactionMergeTest extends LevelCompactionTest {
IoTDBDescriptor.getInstance().getConfig().setSeqLevelNum(prevSeqLevelNum);
}
/**
* As we change the structure of mods file in 0.12, we have to check whether a modification record
* is valid by its offset in tsfile
*/
@Test
public void testCompactionModsByOffsetAfterMerge() throws IllegalPathException, IOException {
int prevPageLimit =
IoTDBDescriptor.getInstance().getConfig().getMergePagePointNumberThreshold();
IoTDBDescriptor.getInstance().getConfig().setMergePagePointNumberThreshold(1);
LevelCompactionTsFileManagement levelCompactionTsFileManagement =
new LevelCompactionTsFileManagement(COMPACTION_TEST_SG, tempSGDir.getPath());
TsFileResource forthSeqTsFileResource = seqResources.get(3);
PartialPath path =
new PartialPath(
deviceIds[0]
+ TsFileConstant.PATH_SEPARATOR
+ measurementSchemas[0].getMeasurementId());
try (ModificationFile sourceModificationFile =
new ModificationFile(
forthSeqTsFileResource.getTsFilePath() + ModificationFile.FILE_SUFFIX)) {
Modification modification =
new Deletion(path, forthSeqTsFileResource.getTsFileSize() / 10, 300, 310);
sourceModificationFile.write(modification);
}
levelCompactionTsFileManagement.addAll(seqResources, true);
levelCompactionTsFileManagement.addAll(unseqResources, false);
levelCompactionTsFileManagement.forkCurrentFileList(0);
CompactionMergeTask compactionMergeTask =
levelCompactionTsFileManagement
.new CompactionMergeTask(this::closeCompactionMergeCallBack, 0);
compactionMergeWorking = true;
compactionMergeTask.run();
while (compactionMergeWorking) {
// wait
}
QueryContext context = new QueryContext();
IBatchReader tsFilesReader =
new SeriesRawDataBatchReader(
path,
measurementSchemas[0].getType(),
context,
levelCompactionTsFileManagement.getTsFileList(true),
new ArrayList<>(),
null,
null,
true);
long count = 0L;
while (tsFilesReader.hasNextBatch()) {
BatchData batchData = tsFilesReader.nextBatch();
for (int i = 0; i < batchData.length(); i++) {
System.out.println(batchData.getTimeByIndex(i));
}
count += batchData.length();
}
assertEquals(489, count);
List<TsFileResource> tsFileResourceList = levelCompactionTsFileManagement.getTsFileList(true);
for (TsFileResource tsFileResource : tsFileResourceList) {
tsFileResource.getModFile().remove();
tsFileResource.remove();
}
IoTDBDescriptor.getInstance().getConfig().setMergePagePointNumberThreshold(prevPageLimit);
}
/** close compaction merge callback, to release some locks */
private void closeCompactionMergeCallBack() {
this.compactionMergeWorking = false;
......
......@@ -73,7 +73,7 @@ public class LevelCompactionModsTest extends LevelCompactionTest {
try (ModificationFile sourceModificationFile =
new ModificationFile(sourceTsFileResource.getTsFilePath() + ModificationFile.FILE_SUFFIX)) {
modification1 = new Deletion(new PartialPath(deviceIds[0], "sensor0"), 0, 0);
modification2 = new Deletion(new PartialPath(deviceIds[0], "sensor1"), 0, 0);
modification2 = new Deletion(new PartialPath(deviceIds[0], "sensor1"), Long.MAX_VALUE, 0);
sourceModificationFile.write(modification1);
sourceModificationFile.write(modification2);
filterModifications.add(modification1);
......@@ -89,4 +89,47 @@ public class LevelCompactionModsTest extends LevelCompactionTest {
assertEquals(modification2, modifications.stream().findFirst().get());
}
}
/**
* As we change the structure of mods file in 0.12, we have to check whether a modification record
* is valid by its offset in tsfile
*/
@Test
public void testCompactionModsByOffset() throws IllegalPathException, IOException {
LevelCompactionTsFileManagement levelCompactionTsFileManagement =
new LevelCompactionTsFileManagement(COMPACTION_TEST_SG, tempSGDir.getPath());
TsFileResource sourceTsFileResource = seqResources.get(0);
TsFileResource targetTsFileResource = seqResources.get(1);
List<Modification> filterModifications = new ArrayList<>();
Modification modification1;
Modification modification2;
try (ModificationFile sourceModificationFile =
new ModificationFile(sourceTsFileResource.getTsFilePath() + ModificationFile.FILE_SUFFIX)) {
modification1 =
new Deletion(
new PartialPath(deviceIds[0], "sensor0"),
sourceTsFileResource.getTsFileSize() / 2,
0,
100);
modification2 =
new Deletion(
new PartialPath(deviceIds[0], "sensor1"),
sourceTsFileResource.getTsFileSize() / 2,
0,
100);
sourceModificationFile.write(modification1);
sourceModificationFile.write(modification2);
filterModifications.add(modification1);
}
List<TsFileResource> sourceTsFileResources = new ArrayList<>();
sourceTsFileResources.add(sourceTsFileResource);
levelCompactionTsFileManagement.renameLevelFilesMods(
filterModifications, sourceTsFileResources, targetTsFileResource);
try (ModificationFile targetModificationFile =
new ModificationFile(targetTsFileResource.getTsFilePath() + ModificationFile.FILE_SUFFIX)) {
Collection<Modification> modifications = targetModificationFile.getModifications();
assertEquals(1, modifications.size());
assertEquals(Long.MAX_VALUE, modifications.stream().findFirst().get().getFileOffset());
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册