未验证 提交 7a005340 编写于 作者: Z Zhang.Jinrui 提交者: GitHub

add lazy page reader for aligned page reader to avoid huge memory cost when...

add lazy page reader for aligned page reader to avoid huge memory cost when reading rows of aligned timeseries (#10966)
上级 c307a455
......@@ -22,6 +22,7 @@ package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.ex
import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.reader.IChunkReader;
import org.apache.iotdb.tsfile.read.reader.IPointReader;
import org.apache.iotdb.tsfile.read.reader.chunk.AlignedChunkReader;
import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
......@@ -38,6 +39,10 @@ public class PageElement {
public TsBlock batchData;
// pointReader is used to replace batchData to get rid of huge memory cost by loading data point
// in a lazy way
public IPointReader pointReader;
// compressed page data
public ByteBuffer pageData;
......@@ -99,9 +104,9 @@ public class PageElement {
public void deserializePage() throws IOException {
if (iChunkReader instanceof AlignedChunkReader) {
this.batchData =
this.pointReader =
((AlignedChunkReader) iChunkReader)
.readPageData(pageHeader, valuePageHeaders, pageData, valuePageDatas);
.getPagePointReader(pageHeader, valuePageHeaders, pageData, valuePageDatas);
} else {
this.batchData = ((ChunkReader) iChunkReader).readPageData(pageHeader, pageData);
}
......
......@@ -39,7 +39,9 @@ public class PointElement {
if (pageElement.iChunkReader instanceof ChunkReader) {
this.pointReader = pageElement.batchData.getTsBlockSingleColumnIterator();
} else {
this.pointReader = pageElement.batchData.getTsBlockAlignedRowIterator();
// For aligned page, we use pointReader rather than deserialize all data point to get rid of
// huge memory cost
this.pointReader = pageElement.pointReader;
}
if (!pointReader.hasNextTimeValuePair()) {
return;
......
......@@ -31,10 +31,10 @@ import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.common.TimeRange;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.reader.IChunkReader;
import org.apache.iotdb.tsfile.read.reader.IPageReader;
import org.apache.iotdb.tsfile.read.reader.IPointReader;
import org.apache.iotdb.tsfile.read.reader.page.AlignedPageReader;
import java.io.IOException;
......@@ -277,7 +277,7 @@ public class AlignedChunkReader implements IChunkReader {
}
/** Read data from compressed page data. Uncompress the page and decode it to tsblock data. */
public TsBlock readPageData(
public IPointReader getPagePointReader(
PageHeader timePageHeader,
List<PageHeader> valuePageHeaders,
ByteBuffer compressedTimePageData,
......@@ -323,7 +323,7 @@ public class AlignedChunkReader implements IChunkReader {
false);
alignedPageReader.initTsBlockBuilder(valueTypes);
alignedPageReader.setDeleteIntervalList(valueDeleteIntervalList);
return alignedPageReader.getAllSatisfiedData();
return alignedPageReader.getLazyPointReader();
}
private ByteBuffer uncompressPageData(
......
......@@ -32,6 +32,7 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.filter.operator.AndFilter;
import org.apache.iotdb.tsfile.read.reader.IAlignedPageReader;
import org.apache.iotdb.tsfile.read.reader.IPageReader;
import org.apache.iotdb.tsfile.read.reader.IPointReader;
import org.apache.iotdb.tsfile.read.reader.series.PaginationController;
import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
......@@ -160,6 +161,10 @@ public class AlignedPageReader implements IPageReader, IAlignedPageReader {
}
}
public IPointReader getLazyPointReader() throws IOException {
return new LazyLoadAlignedPagePointReader(timePageReader, valuePageReaderList);
}
@Override
public TsBlock getAllSatisfiedData() throws IOException {
builder.reset();
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.tsfile.read.reader.page;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.read.reader.IPointReader;
import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
import java.io.IOException;
import java.util.List;
/**
* This class is used to read data of aligned-series row by row. It won't deserialize all data point
* of one page in memory. In contrast, it constructs row one by one as needed
*/
public class LazyLoadAlignedPagePointReader implements IPointReader {
private TimePageReader timeReader;
private List<ValuePageReader> valueReaders;
private boolean hasNextRow = false;
private int timeIndex;
private long currentTime;
private TsPrimitiveType currentRow;
public LazyLoadAlignedPagePointReader(
TimePageReader timeReader, List<ValuePageReader> valueReaders) throws IOException {
this.timeIndex = -1;
this.timeReader = timeReader;
this.valueReaders = valueReaders;
prepareNextRow();
}
private void prepareNextRow() throws IOException {
while (true) {
if (!timeReader.hasNextTime()) {
hasNextRow = false;
return;
}
currentTime = timeReader.nextTime();
timeIndex++;
boolean someValueNotNull = false;
TsPrimitiveType[] valuesInThisRow = new TsPrimitiveType[valueReaders.size()];
for (int i = 0; i < valueReaders.size(); i++) {
TsPrimitiveType value =
valueReaders.get(i) == null
? null
: valueReaders.get(i).nextValue(currentTime, timeIndex);
someValueNotNull = someValueNotNull || (value != null);
valuesInThisRow[i] = value;
}
if (someValueNotNull) {
currentRow = new TsPrimitiveType.TsVector(valuesInThisRow);
hasNextRow = true;
break;
}
}
}
@Override
public boolean hasNextTimeValuePair() throws IOException {
return hasNextRow;
}
@Override
public TimeValuePair nextTimeValuePair() throws IOException {
TimeValuePair ret = currentTimeValuePair();
prepareNextRow();
return ret;
}
@Override
public TimeValuePair currentTimeValuePair() throws IOException {
return new TimeValuePair(currentTime, currentRow);
}
@Override
public void close() throws IOException {}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.tsfile.read.reader;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.read.reader.page.LazyLoadAlignedPagePointReader;
import org.apache.iotdb.tsfile.read.reader.page.TimePageReader;
import org.apache.iotdb.tsfile.read.reader.page.ValuePageReader;
import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
public class LazyLoadAlignedPagePointReaderTest {
@Test
public void testTimeNoData() throws IOException {
int columnCount = 2;
TimePageReader timePageReader = PowerMockito.mock(TimePageReader.class);
List<ValuePageReader> valuePageReaders = new LinkedList<>();
for (int i = 0; i < columnCount; i++) {
valuePageReaders.add(PowerMockito.mock(ValuePageReader.class));
}
PowerMockito.when(timePageReader.hasNextTime()).thenReturn(false);
PowerMockito.when(valuePageReaders.get(0).nextValue(Mockito.anyLong(), Mockito.anyInt()))
.thenReturn(null);
PowerMockito.when(valuePageReaders.get(1).nextValue(Mockito.anyLong(), Mockito.anyInt()))
.thenReturn(null);
LazyLoadAlignedPagePointReader reader =
new LazyLoadAlignedPagePointReader(timePageReader, valuePageReaders);
boolean hasNextValue = reader.hasNextTimeValuePair();
Assert.assertFalse(hasNextValue);
}
@Test
public void testValueNoData() throws IOException {
int columnCount = 2;
TimePageReader timePageReader = PowerMockito.mock(TimePageReader.class);
List<ValuePageReader> valuePageReaders = new LinkedList<>();
for (int i = 0; i < columnCount; i++) {
valuePageReaders.add(PowerMockito.mock(ValuePageReader.class));
}
PowerMockito.when(timePageReader.hasNextTime()).thenReturn(true).thenReturn(false);
PowerMockito.when(valuePageReaders.get(0).nextValue(Mockito.anyLong(), Mockito.anyInt()))
.thenReturn(null);
PowerMockito.when(valuePageReaders.get(1).nextValue(Mockito.anyLong(), Mockito.anyInt()))
.thenReturn(null);
LazyLoadAlignedPagePointReader reader =
new LazyLoadAlignedPagePointReader(timePageReader, valuePageReaders);
boolean hasNextValue = reader.hasNextTimeValuePair();
Assert.assertFalse(hasNextValue);
}
@Test
public void testOneRow() throws IOException {
int columnCount = 2;
TimePageReader timePageReader = PowerMockito.mock(TimePageReader.class);
List<ValuePageReader> valuePageReaders = new LinkedList<>();
for (int i = 0; i < columnCount; i++) {
valuePageReaders.add(PowerMockito.mock(ValuePageReader.class));
}
PowerMockito.when(timePageReader.hasNextTime()).thenReturn(true).thenReturn(false);
PowerMockito.when(timePageReader.nextTime()).thenReturn(1L);
PowerMockito.when(valuePageReaders.get(0).nextValue(Mockito.anyLong(), Mockito.anyInt()))
.thenReturn(new TsPrimitiveType.TsInt(1));
PowerMockito.when(valuePageReaders.get(1).nextValue(Mockito.anyLong(), Mockito.anyInt()))
.thenReturn(new TsPrimitiveType.TsInt(2));
LazyLoadAlignedPagePointReader reader =
new LazyLoadAlignedPagePointReader(timePageReader, valuePageReaders);
boolean hasNextValue = reader.hasNextTimeValuePair();
Assert.assertTrue(hasNextValue);
TimeValuePair row = reader.nextTimeValuePair();
Assert.assertEquals(1L, row.getTimestamp());
Assert.assertEquals(
new TsPrimitiveType.TsVector(
new TsPrimitiveType.TsInt[] {
new TsPrimitiveType.TsInt(1), new TsPrimitiveType.TsInt(2)
}),
row.getValue());
Assert.assertFalse(reader.hasNextTimeValuePair());
}
@Test
public void testSomeColumnNull() throws IOException {
int columnCount = 2;
TimePageReader timePageReader = PowerMockito.mock(TimePageReader.class);
List<ValuePageReader> valuePageReaders = new LinkedList<>();
for (int i = 0; i < columnCount; i++) {
valuePageReaders.add(PowerMockito.mock(ValuePageReader.class));
}
PowerMockito.when(timePageReader.hasNextTime())
.thenReturn(true)
.thenReturn(true)
.thenReturn(false);
PowerMockito.when(timePageReader.nextTime()).thenReturn(1L).thenReturn(2L);
PowerMockito.when(valuePageReaders.get(0).nextValue(Mockito.anyLong(), Mockito.anyInt()))
.thenReturn(new TsPrimitiveType.TsInt(1))
.thenReturn(null);
PowerMockito.when(valuePageReaders.get(1).nextValue(Mockito.anyLong(), Mockito.anyInt()))
.thenReturn(null)
.thenReturn(null);
LazyLoadAlignedPagePointReader reader =
new LazyLoadAlignedPagePointReader(timePageReader, valuePageReaders);
boolean hasNextValue = reader.hasNextTimeValuePair();
Assert.assertTrue(hasNextValue);
TimeValuePair row = reader.nextTimeValuePair();
Assert.assertEquals(1L, row.getTimestamp());
Assert.assertEquals("[1, null]", row.getValue().toString());
Assert.assertFalse(reader.hasNextTimeValuePair());
}
@Test
public void testMultiRow() throws IOException {
int columnCount = 2;
TimePageReader timePageReader = PowerMockito.mock(TimePageReader.class);
List<ValuePageReader> valuePageReaders = new LinkedList<>();
for (int i = 0; i < columnCount; i++) {
valuePageReaders.add(PowerMockito.mock(ValuePageReader.class));
}
PowerMockito.when(timePageReader.hasNextTime())
.thenReturn(true)
.thenReturn(true)
.thenReturn(false);
PowerMockito.when(timePageReader.nextTime()).thenReturn(1L).thenReturn(2L);
PowerMockito.when(valuePageReaders.get(0).nextValue(Mockito.anyLong(), Mockito.anyInt()))
.thenReturn(new TsPrimitiveType.TsInt(1))
.thenReturn(new TsPrimitiveType.TsInt(1));
PowerMockito.when(valuePageReaders.get(1).nextValue(Mockito.anyLong(), Mockito.anyInt()))
.thenReturn(null)
.thenReturn(new TsPrimitiveType.TsInt(2));
LazyLoadAlignedPagePointReader reader =
new LazyLoadAlignedPagePointReader(timePageReader, valuePageReaders);
boolean hasNextValue = reader.hasNextTimeValuePair();
Assert.assertTrue(hasNextValue);
TimeValuePair row1 = reader.nextTimeValuePair();
Assert.assertEquals(1L, row1.getTimestamp());
Assert.assertEquals("[1, null]", row1.getValue().toString());
Assert.assertTrue(reader.hasNextTimeValuePair());
TimeValuePair row2 = reader.nextTimeValuePair();
Assert.assertEquals(2L, row2.getTimestamp());
Assert.assertEquals("[1, 2]", row2.getValue().toString());
Assert.assertFalse(reader.hasNextTimeValuePair());
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册