未验证 提交 f80c2894 编写于 作者: S shuwenwei 提交者: GitHub

Fix point priority reader array index out of bound exception (#10919)

上级 f1a40124
......@@ -244,8 +244,9 @@ public abstract class SeriesCompactionExecutor {
|| firstPageElement.needForceDecoding) {
// has overlap or modified pages, then deserialize it
summary.pageOverlapOrModified += 1;
pointPriorityReader.addNewPage(firstPageElement);
compactWithOverlapPages();
if (pointPriorityReader.addNewPageIfPageNotEmpty(firstPageElement)) {
compactWithOverlapPages();
}
} else {
// has none overlap or modified pages, flush it to chunk writer directly
summary.pageNoneOverlap += 1;
......@@ -276,7 +277,9 @@ public abstract class SeriesCompactionExecutor {
} else {
// unsealed page is not large enough or page.endTime > file.endTime, then deserialze it
summary.pageNoneOverlapButDeserialize += 1;
pointPriorityReader.addNewPage(pageElement);
if (!pointPriorityReader.addNewPageIfPageNotEmpty(pageElement)) {
return;
}
// write data points of the current page into chunk writer
TimeValuePair point;
......@@ -352,7 +355,7 @@ public abstract class SeriesCompactionExecutor {
|| nextPageElement.needForceDecoding) {
// next page is overlapped or modified, then deserialize it
summary.pageOverlapOrModified++;
pointPriorityReader.addNewPage(nextPageElement);
pointPriorityReader.addNewPageIfPageNotEmpty(nextPageElement);
} else {
// has none overlap or modified pages, flush it to chunk writer directly
summary.pageFakeOverlap += 1;
......
......@@ -41,6 +41,9 @@ public class PointElement {
} else {
this.pointReader = pageElement.batchData.getTsBlockAlignedRowIterator();
}
if (!pointReader.hasNextTimeValuePair()) {
return;
}
this.timeValuePair = pointReader.nextTimeValuePair();
this.timestamp = timeValuePair.getTimestamp();
this.priority = pageElement.priority;
......
......@@ -170,8 +170,10 @@ public class PointPriorityReader {
* Add a new overlapped page.
*
* @throws IOException if io errors occurred
* @return whether page is added into the queue
*/
public void addNewPage(PageElement pageElement) throws IOException {
public boolean addNewPageIfPageNotEmpty(PageElement pageElement)
throws IOException, IllegalPathException, WriteProcessException {
if (currentPointElement != null) {
nextPointInOtherPage = Math.min(nextPointInOtherPage, pageElement.startTime);
if (currentPoint.getTimestamp() >= nextPointInOtherPage) {
......@@ -179,6 +181,14 @@ public class PointPriorityReader {
currentPointElement = null;
}
}
pointQueue.add(new PointElement(pageElement));
PointElement pointElement = new PointElement(pageElement);
boolean pageIsNotEmpty = pointElement.timeValuePair != null;
if (pageIsNotEmpty) {
pointQueue.add(pointElement);
} else {
removePage.call(pageElement);
}
return pageIsNotEmpty;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.storageengine.dataregion.compaction;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.FastCompactionPerformer;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.CrossSpaceCompactionTask;
import org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionTestFileWriter;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.TimeRange;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
public class FastCompactionPerformerWithEmptyPageTest extends AbstractCompactionTest {
@Before
public void setUp()
throws IOException, WriteProcessException, MetadataException, InterruptedException {
super.setUp();
}
@After
public void tearDown() throws IOException, StorageEngineException {
super.tearDown();
}
@Test
public void test1() throws IOException, IllegalPathException {
String device = "root.testsg.d1";
TsFileResource seqFile1 = createEmptyFileAndResource(true);
try (CompactionTestFileWriter writer = new CompactionTestFileWriter(seqFile1)) {
writer.startChunkGroup("d1");
writer.generateSimpleAlignedSeriesToCurrentDeviceWithNullValue(
Arrays.asList("s1", "s2", "s3"),
new TimeRange[][] {new TimeRange[] {new TimeRange(10, 30)}},
TSEncoding.RLE,
CompressionType.UNCOMPRESSED,
Arrays.asList(false, true, true));
writer.generateSimpleAlignedSeriesToCurrentDeviceWithNullValue(
Arrays.asList("s1", "s2", "s3"),
new TimeRange[][] {new TimeRange[] {new TimeRange(40, 50)}},
TSEncoding.RLE,
CompressionType.UNCOMPRESSED,
Arrays.asList(false, false, false));
writer.endChunkGroup();
writer.endFile();
}
seqFile1.updateStartTime(device, 10);
seqFile1.updateEndTime(device, 50);
seqFile1.serialize();
generateModsFile(Arrays.asList(new PartialPath("root.testsg.d1.s1")), seqFile1, 0, 31);
TsFileResource unseqFile1 = createEmptyFileAndResource(false);
try (CompactionTestFileWriter writer = new CompactionTestFileWriter(unseqFile1)) {
writer.startChunkGroup("d1");
writer.generateSimpleAlignedSeriesToCurrentDevice(
Arrays.asList("s1", "s2", "s3"),
new TimeRange[][] {new TimeRange[] {new TimeRange(20, 34)}},
TSEncoding.RLE,
CompressionType.UNCOMPRESSED);
writer.endChunkGroup();
writer.endFile();
}
unseqFile1.updateStartTime(device, 20);
unseqFile1.updateEndTime(device, 34);
unseqFile1.serialize();
CrossSpaceCompactionTask task =
new CrossSpaceCompactionTask(
0,
tsFileManager,
Arrays.asList(seqFile1),
Arrays.asList(unseqFile1),
new FastCompactionPerformer(true),
new AtomicInteger(0),
0,
0);
try {
Assert.assertTrue(task.start());
} catch (Exception e) {
Assert.fail();
}
TsFileResource result = tsFileManager.getTsFileList(true).get(0);
result.buildDeviceTimeIndex();
Assert.assertEquals(20, result.getStartTime(device));
Assert.assertEquals(50, result.getEndTime(device));
validateSeqFiles(true);
try (TsFileSequenceReader reader = new TsFileSequenceReader(result.getTsFilePath())) {
Map<String, List<ChunkMetadata>> chunkMetadataInDevice =
reader.readChunkMetadataInDevice(device);
long startTime = Long.MAX_VALUE, endTime = Long.MIN_VALUE;
List<ChunkMetadata> chunkMetadataList = chunkMetadataInDevice.get("s1");
for (ChunkMetadata chunkMetadata : chunkMetadataList) {
startTime = Math.min(startTime, chunkMetadata.getStartTime());
endTime = Math.max(endTime, chunkMetadata.getEndTime());
}
Assert.assertEquals(20, startTime);
Assert.assertEquals(50, endTime);
}
}
}
......@@ -31,12 +31,13 @@ import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
public class CompactionTestFileWriter {
public class CompactionTestFileWriter implements Closeable {
private TsFileResource resource;
private TsFileIOWriter fileWriter;
......@@ -170,6 +171,34 @@ public class CompactionTestFileWriter {
}
}
public void generateSimpleAlignedSeriesToCurrentDeviceWithNullValue(
List<String> measurementNames,
TimeRange[] toGenerateChunkTimeRanges,
TSEncoding encoding,
CompressionType compressionType,
List<Boolean> nullMeasurements)
throws IOException {
List<IMeasurementSchema> measurementSchemas = new ArrayList<>();
for (String measurementName : measurementNames) {
measurementSchemas.add(
new MeasurementSchema(measurementName, TSDataType.INT32, encoding, compressionType));
}
for (TimeRange toGenerateChunk : toGenerateChunkTimeRanges) {
AlignedChunkWriterImpl alignedChunkWriter = new AlignedChunkWriterImpl(measurementSchemas);
currentDeviceStartTime = Math.min(toGenerateChunk.getMin(), currentDeviceStartTime);
currentDeviceEndTime = Math.max(toGenerateChunk.getMax(), currentDeviceEndTime);
for (long time = toGenerateChunk.getMin(); time <= toGenerateChunk.getMax(); time++) {
alignedChunkWriter.getTimeChunkWriter().write(time);
for (int i = 0; i < measurementNames.size(); i++) {
alignedChunkWriter
.getValueChunkWriterByIndex(i)
.write(time, new Random().nextInt(), nullMeasurements.get(i));
}
}
alignedChunkWriter.writeToFileWriter(fileWriter);
}
}
public void generateSimpleAlignedSeriesToCurrentDevice(
List<String> measurementNames,
TimeRange[][] toGenerateChunkPageTimeRanges,
......@@ -203,6 +232,40 @@ public class CompactionTestFileWriter {
}
}
public void generateSimpleAlignedSeriesToCurrentDeviceWithNullValue(
List<String> measurementNames,
TimeRange[][] toGenerateChunkPageTimeRanges,
TSEncoding encoding,
CompressionType compressionType,
List<Boolean> nullMeasurement)
throws IOException {
List<IMeasurementSchema> measurementSchemas = new ArrayList<>();
for (String measurementName : measurementNames) {
measurementSchemas.add(
new MeasurementSchema(measurementName, TSDataType.INT32, encoding, compressionType));
}
for (TimeRange[] toGenerateChunk : toGenerateChunkPageTimeRanges) {
AlignedChunkWriterImpl alignedChunkWriter = new AlignedChunkWriterImpl(measurementSchemas);
for (TimeRange toGeneratePageTimeRange : toGenerateChunk) {
currentDeviceStartTime = Math.min(toGeneratePageTimeRange.getMin(), currentDeviceStartTime);
currentDeviceEndTime = Math.max(toGeneratePageTimeRange.getMax(), currentDeviceEndTime);
for (long time = toGeneratePageTimeRange.getMin();
time <= toGeneratePageTimeRange.getMax();
time++) {
alignedChunkWriter.write(time);
for (int i = 0; i < measurementNames.size(); i++) {
alignedChunkWriter
.getValueChunkWriterByIndex(i)
.getPageWriter()
.write(time, new Random().nextInt(), nullMeasurement.get(i));
}
}
alignedChunkWriter.sealCurrentPage();
}
alignedChunkWriter.writeToFileWriter(fileWriter);
}
}
public void generateSimpleAlignedSeriesToCurrentDevice(
List<String> measurementNames,
TimeRange[][][] toGenerateChunkPageTimeRanges,
......@@ -235,4 +298,38 @@ public class CompactionTestFileWriter {
alignedChunkWriter.writeToFileWriter(fileWriter);
}
}
public void generateSimpleAlignedSeriesToCurrentDeviceWithNullValue(
List<String> measurementNames,
TimeRange[][][] toGenerateChunkPageTimeRanges,
TSEncoding encoding,
CompressionType compressionType,
List<Boolean> nullMeasurements)
throws IOException {
List<IMeasurementSchema> measurementSchemas = new ArrayList<>();
for (String measurementName : measurementNames) {
measurementSchemas.add(
new MeasurementSchema(measurementName, TSDataType.INT32, encoding, compressionType));
}
for (TimeRange[][] toGenerateChunk : toGenerateChunkPageTimeRanges) {
AlignedChunkWriterImpl alignedChunkWriter = new AlignedChunkWriterImpl(measurementSchemas);
for (TimeRange[] toGeneratePageTimeRanges : toGenerateChunk) {
for (TimeRange pointsTimeRange : toGeneratePageTimeRanges) {
currentDeviceStartTime = Math.min(pointsTimeRange.getMin(), currentDeviceStartTime);
currentDeviceEndTime = Math.max(pointsTimeRange.getMax(), currentDeviceEndTime);
for (long time = pointsTimeRange.getMin(); time <= pointsTimeRange.getMax(); time++) {
alignedChunkWriter.write(time);
for (int i = 0; i < measurementNames.size(); i++) {
alignedChunkWriter
.getValueChunkWriterByIndex(i)
.getPageWriter()
.write(time, new Random().nextInt(), nullMeasurements.get(i));
}
}
}
alignedChunkWriter.sealCurrentPage();
}
alignedChunkWriter.writeToFileWriter(fileWriter);
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册