未验证 提交 ba76c4bf 编写于 作者: J Jialin Qiao 提交者: GitHub

refactor chunk reader (#680)

* optimize PageReader and ChunkReader & refactor BatchData
上级 777dd00c
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
# 一、工作流程
## 主要链接
......
......@@ -85,15 +85,13 @@ public class TsFileSequenceRead {
System.out
.println("\t\tUncompressed page data size: " + pageHeader.getUncompressedSize());
PageReader reader1 = new PageReader(pageData, header.getDataType(), valueDecoder,
defaultTimeDecoder);
while (reader1.hasNextBatch()) {
BatchData batchData = reader1.nextBatch();
while (batchData.hasCurrent()) {
System.out.println(
"\t\t\ttime, value: " + batchData.currentTime() + ", " + batchData
.currentValue());
batchData.next();
}
defaultTimeDecoder, null);
BatchData batchData = reader1.getAllSatisfiedPageData();
while (batchData.hasCurrent()) {
System.out.println(
"\t\t\ttime, value: " + batchData.currentTime() + ", " + batchData
.currentValue());
batchData.next();
}
}
break;
......
......@@ -49,7 +49,6 @@ import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.reader.chunk.AbstractChunkReader;
import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
......@@ -401,9 +400,9 @@ class MergeMultiChunkTask {
private int writeChunkWithUnseq(Chunk chunk, IChunkWriter chunkWriter, IPointReader unseqReader,
long chunkLimitTime, int pathIdx) throws IOException {
int cnt = 0;
AbstractChunkReader chunkReader = new ChunkReader(chunk, null);
while (chunkReader.hasNextBatch()) {
BatchData batchData = chunkReader.nextBatch();
ChunkReader chunkReader = new ChunkReader(chunk, null);
while (chunkReader.hasNextSatisfiedPage()) {
BatchData batchData = chunkReader.nextPageData();
cnt += mergeWriteBatch(batchData, chunkWriter, unseqReader, pathIdx);
}
cnt += writeRemainingUnseq(chunkWriter, unseqReader, chunkLimitTime, pathIdx);
......
......@@ -24,16 +24,16 @@ import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.utils.TimeValuePair;
import org.apache.iotdb.db.utils.TimeValuePairUtils;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.reader.chunk.AbstractChunkReader;
import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
public class CachedDiskChunkReader implements IPointReader {
private AbstractChunkReader chunkReader;
private ChunkReader chunkReader;
private BatchData data;
private TimeValuePair prev;
private TimeValuePair current;
public CachedDiskChunkReader(AbstractChunkReader chunkReader) {
public CachedDiskChunkReader(ChunkReader chunkReader) {
this.chunkReader = chunkReader;
this.prev =
TimeValuePairUtils.getEmptyTimeValuePair(chunkReader.getChunkHeader().getDataType());
......@@ -44,8 +44,8 @@ public class CachedDiskChunkReader implements IPointReader {
if (data != null && data.hasCurrent()) {
return true;
}
while (chunkReader.hasNextBatch()) {
data = chunkReader.nextBatch();
while (chunkReader.hasNextSatisfiedPage()) {
data = chunkReader.nextPageData();
if (data.hasCurrent()) {
return true;
}
......@@ -60,8 +60,8 @@ public class CachedDiskChunkReader implements IPointReader {
if (data.hasCurrent()) {
TimeValuePairUtils.setCurrentTimeValuePair(data, current());
} else {
while (chunkReader.hasNextBatch()) {
data = chunkReader.nextBatch();
while (chunkReader.hasNextSatisfiedPage()) {
data = chunkReader.nextPageData();
if (data.hasCurrent()) {
TimeValuePairUtils.setCurrentTimeValuePair(data, current());
break;
......
......@@ -26,7 +26,6 @@ import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.controller.IChunkLoader;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.reader.chunk.AbstractChunkReader;
import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderByTimestamp;
......@@ -71,7 +70,7 @@ public class ChunkReaderWrap {
public IPointReader getIPointReader() throws IOException {
if (type.equals(ChunkReaderType.DISK_CHUNK)) {
Chunk chunk = chunkLoader.getChunk(chunkMetaData);
AbstractChunkReader chunkReader = new ChunkReader(chunk, filter);
ChunkReader chunkReader = new ChunkReader(chunk, filter);
return new DiskChunkReader(chunkReader);
} else {
return new MemChunkReader(readOnlyMemChunk, filter);
......
......@@ -20,16 +20,15 @@ package org.apache.iotdb.db.query.reader.chunkRelated;
import java.io.IOException;
import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.query.reader.resourceRelated.NewUnseqResourceMergeReader;
import org.apache.iotdb.db.utils.TimeValuePair;
import org.apache.iotdb.db.utils.TimeValuePairUtils;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.reader.IBatchReader;
import org.apache.iotdb.tsfile.read.reader.chunk.AbstractChunkReader;
import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
/**
* To read chunk data on disk, this class implements an interface {@link IPointReader} based on the
* data reader {@link AbstractChunkReader}.
* data reader {@link ChunkReader}.
* <p>
* Note that <code>ChunkReader</code> is an abstract class with three concrete classes, two of which
* are used here: <code>ChunkReaderWithoutFilter</code> and <code>ChunkReaderWithFilter</code>.
......@@ -37,10 +36,10 @@ import org.apache.iotdb.tsfile.read.reader.chunk.AbstractChunkReader;
*/
public class DiskChunkReader implements IPointReader, IBatchReader {
private AbstractChunkReader chunkReader;
private ChunkReader chunkReader;
private BatchData data;
public DiskChunkReader(AbstractChunkReader chunkReader) {
public DiskChunkReader(ChunkReader chunkReader) {
this.chunkReader = chunkReader;
}
......@@ -49,8 +48,8 @@ public class DiskChunkReader implements IPointReader, IBatchReader {
if (data != null && data.hasCurrent()) {
return true;
}
while (chunkReader.hasNextBatch()) {
data = chunkReader.nextBatch();
while (chunkReader.hasNextSatisfiedPage()) {
data = chunkReader.nextPageData();
if (data.hasCurrent()) {
return true;
}
......
......@@ -54,8 +54,8 @@ public class DiskChunkReaderByTimestamp implements IReaderByTimestamp {
return null;
} else {
chunkReaderByTimestamp.setCurrentTimestamp(timestamp);
if (chunkReaderByTimestamp.hasNextBatch()) {
data = chunkReaderByTimestamp.nextBatch();
if (chunkReaderByTimestamp.hasNextSatisfiedPage()) {
data = chunkReaderByTimestamp.nextPageData();
} else {
return null;
}
......@@ -70,8 +70,8 @@ public class DiskChunkReaderByTimestamp implements IReaderByTimestamp {
if (data != null && data.hasCurrent()) {
return true;
}
if (chunkReaderByTimestamp != null && chunkReaderByTimestamp.hasNextBatch()) {
data = chunkReaderByTimestamp.nextBatch();
if (chunkReaderByTimestamp != null && chunkReaderByTimestamp.hasNextSatisfiedPage()) {
data = chunkReaderByTimestamp.nextPageData();
return true;
}
return false;
......
......@@ -92,7 +92,7 @@ public class MemChunkReader implements IPointReader, IAggregateReader {
@Override
public BatchData nextBatch() {
BatchData batchData = new BatchData(dataType, true);
BatchData batchData = new BatchData(dataType);
if (hasCachedTimeValuePair) {
hasCachedTimeValuePair = false;
batchData.putTime(cachedTimeValuePair.getTimestamp());
......
......@@ -25,7 +25,6 @@ import org.apache.iotdb.db.query.reader.chunkRelated.CachedDiskChunkReader;
import org.apache.iotdb.db.query.reader.universal.CachedPriorityMergeReader;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.reader.chunk.AbstractChunkReader;
import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
public class CachedUnseqResourceMergeReader extends CachedPriorityMergeReader {
......@@ -35,7 +34,7 @@ public class CachedUnseqResourceMergeReader extends CachedPriorityMergeReader {
super(dataType);
int priorityValue = 1;
for (Chunk chunk : chunks) {
AbstractChunkReader chunkReader = new ChunkReader(chunk, null);
ChunkReader chunkReader = new ChunkReader(chunk, null);
addReaderWithPriority(new CachedDiskChunkReader(chunkReader), priorityValue++);
}
}
......
......@@ -88,13 +88,15 @@ public class NewUnseqResourceMergeReader implements IBatchReader {
if (tsFileResource.isClosed()) {
// get chunk metadata list of current closed tsfile
currentChunkMetaDataList = DeviceMetaDataCache.getInstance().get(tsFileResource, seriesPath);
// get modifications and apply to chunk metadatas
List<Modification> pathModifications = context
.getPathModifications(tsFileResource.getModFile(), seriesPath.getFullPath());
if (!pathModifications.isEmpty()) {
QueryUtils.modifyChunkMetaData(currentChunkMetaDataList, pathModifications);
}
} else {
// metadata list of already flushed chunk groups
// metadata list of already flushed chunks in unsealed file, already applied modifications
currentChunkMetaDataList = tsFileResource.getChunkMetaDataList();
}
......@@ -143,7 +145,7 @@ public class NewUnseqResourceMergeReader implements IBatchReader {
return true;
}
batchData = new BatchData(dataType, true);
batchData = new BatchData(dataType);
for (int rowCount = 0; rowCount < batchSize; rowCount++) {
if (priorityMergeReader.hasNext()) {
......
......@@ -132,27 +132,6 @@ public class SeqResourceIterateReader extends IterateReader {
}
}
/**
* Returns true if the start and end time of the series data in this sequence TsFile do not
* satisfy the filter condition. Returns false if satisfy.
* <p>
* This method is used to in <code>constructNextReader</code> to check whether this TsFile can be
* skipped.
*
* @param tsFile the TsFileResource corresponding to this TsFile
* @param filter filter condition. Null if no filter.
* @return True if the TsFile's start and end time do not satisfy the filter condition; False if
* satisfy.
*/
private boolean isTsFileNotSatisfied(TsFileResource tsFile, Filter filter) {
if (filter == null) {
return false;
}
long startTime = tsFile.getStartTimeMap().get(seriesPath.getDevice());
long endTime = tsFile.getEndTimeMap().get(seriesPath.getDevice());
return !filter.satisfyStartEndTime(startTime, endTime);
}
private IAggregateReader initSealedTsFileReader(TsFileResource sealedTsFile, Filter filter,
QueryContext context) throws IOException {
// prepare metaDataList
......
......@@ -133,7 +133,7 @@ public class SeriesReaderWithoutValueFilter implements IBatchReader, IPointReade
if (hasNextInSeq() && hasNextInUnSeq()) {
// if the count reaches batch data size
int count = 0;
BatchData batchData = new BatchData(seqBatchData.getDataType(), true);
BatchData batchData = new BatchData(seqBatchData.getDataType());
while (count < batchSize && hasNextInSeq() && hasNextInUnSeq()) {
long timeInSeq = seqBatchData.currentTime();
long timeInUnseq = unseqBatchData.currentTime();
......
......@@ -37,7 +37,6 @@ import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.reader.chunk.AbstractChunkReader;
import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
import org.slf4j.Logger;
......@@ -101,10 +100,10 @@ public class MergeUtils {
}
public static int writeChunkWithoutUnseq(Chunk chunk, IChunkWriter chunkWriter) throws IOException {
AbstractChunkReader chunkReader = new ChunkReader(chunk, null);
ChunkReader chunkReader = new ChunkReader(chunk, null);
int ptWritten = 0;
while (chunkReader.hasNextBatch()) {
BatchData batchData = chunkReader.nextBatch();
while (chunkReader.hasNextSatisfiedPage()) {
BatchData batchData = chunkReader.nextPageData();
for (int i = 0; i < batchData.length(); i++) {
writeBatchPoint(batchData, i, chunkWriter);
}
......
......@@ -21,7 +21,9 @@ package org.apache.iotdb.db.engine.storagegroup;
import static org.junit.Assert.assertFalse;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.iotdb.db.conf.adapter.ActiveTimeSeriesCounter;
......@@ -29,13 +31,17 @@ import org.apache.iotdb.db.constant.TestConstant;
import org.apache.iotdb.db.engine.MetadataManagerHelper;
import org.apache.iotdb.db.engine.merge.manage.MergeManager;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.exception.storageGroup.StorageGroupProcessorException;
import org.apache.iotdb.db.qp.physical.crud.BatchInsertPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.db.utils.TimeValuePair;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.record.TSRecord;
import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
import org.junit.After;
......@@ -71,6 +77,43 @@ public class StorageGroupProcessorTest {
}
@Test
public void testUnseqUnsealedDelete() throws QueryProcessException, IOException {
TSRecord record = new TSRecord(10000, deviceId);
record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(1000)));
processor.insert(new InsertPlan(record));
processor.waitForAllCurrentTsFileProcessorsClosed();
for (int j = 1; j <= 10; j++) {
record = new TSRecord(j, deviceId);
record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
processor.insert(new InsertPlan(record));
}
processor.getWorkUnSequenceTsFileProcessor().syncFlush();
for (int j = 11; j <= 20; j++) {
record = new TSRecord(j, deviceId);
record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
processor.insert(new InsertPlan(record));
}
processor.delete(deviceId, measurementId, 15L);
Pair<ReadOnlyMemChunk, List<ChunkMetaData>> pair = processor.getWorkUnSequenceTsFileProcessor()
.query(deviceId, measurementId, TSDataType.INT32, Collections.emptyMap(), new QueryContext());
List<TimeValuePair> timeValuePairs = pair.left.getSortedTimeValuePairList();
long time = 16;
for (TimeValuePair timeValuePair : timeValuePairs) {
Assert.assertEquals(time++, timeValuePair.getTimestamp());
}
Assert.assertEquals(0, pair.right.size());
}
@Test
public void testSequenceSyncClose() throws QueryProcessException {
for (int j = 1; j <= 10; j++) {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.integration;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.jdbc.Config;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class IoTDBSimpleQueryTest {
private IoTDB deamon;
@Before
public void setUp() throws Exception {
deamon = IoTDB.getInstance();
deamon.active();
EnvironmentUtils.envSetUp();
}
@After
public void tearDown() throws Exception {
deamon.stop();
EnvironmentUtils.cleanEnv();
}
@Test
public void testUnseqUnsealedDeleteQuery() throws SQLException, ClassNotFoundException {
Class.forName(Config.JDBC_DRIVER_NAME);
try(Connection connection = DriverManager
.getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/",
"root", "root");
Statement statement = connection.createStatement()){
statement.execute("SET STORAGE GROUP TO root.sg1");
statement.execute("CREATE TIMESERIES root.sg1.d0.s0 WITH DATATYPE=INT32,ENCODING=PLAIN");
// seq data
statement.execute("INSERT INTO root.sg1.d0(timestamp, s0) VALUES (1000, 1)");
statement.execute("flush");
for (int i = 1; i <= 10; i++) {
statement.execute(
String.format("INSERT INTO root.sg1.d0(timestamp, s0) VALUES (%d, %d)", i, i));
}
statement.execute("flush");
// unseq data
for (int i = 11; i <= 20; i++) {
statement.execute(
String.format("INSERT INTO root.sg1.d0(timestamp, s0) VALUES (%d, %d)", i, i));
}
statement.execute("delete from root.sg1.d0.s0 where time <= 15");
ResultSet resultSet = statement.executeQuery("select * from root");
long count = 0;
while(resultSet.next()) {
count++;
}
System.out.println(count);
}
}
}
......@@ -90,7 +90,7 @@ public class FakedIBatchPoint implements IBatchReader {
if (!hasEmptyBatch) {
num += 1;
}
batchData = new BatchData(TSDataType.INT64, true);
batchData = new BatchData(TSDataType.INT64);
while (num > 0 && iterator.hasNext()) {
TimeValuePair timeValuePair = iterator.next();
batchData.putTime(timeValuePair.getTimestamp());
......
......@@ -46,7 +46,6 @@ import org.apache.iotdb.tsfile.read.controller.ChunkLoaderImpl;
import org.apache.iotdb.tsfile.read.controller.IChunkLoader;
import org.apache.iotdb.tsfile.read.controller.IMetadataQuerier;
import org.apache.iotdb.tsfile.read.controller.MetadataQuerierByFileImpl;
import org.apache.iotdb.tsfile.read.reader.chunk.AbstractChunkReader;
import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
import org.apache.iotdb.tsfile.write.TsFileWriter;
import org.apache.iotdb.tsfile.write.record.TSRecord;
......@@ -165,7 +164,7 @@ public class UnseqTsFileRecoverTest {
int priorityValue = 1;
for (ChunkMetaData chunkMetaData : metadataQuerier.getChunkMetaDataList(path)) {
Chunk chunk = chunkLoader.getChunk(chunkMetaData);
AbstractChunkReader chunkReader = new ChunkReader(chunk, null);
ChunkReader chunkReader = new ChunkReader(chunk, null);
unSeqMergeReader
.addReaderWithPriority(new DiskChunkReader(chunkReader), priorityValue);
priorityValue++;
......
......@@ -32,7 +32,12 @@ import java.util.ArrayList;
* <code>BatchData</code> is a self-defined data structure which is optimized for different type of
* values. This class can be viewed as a collection which is more efficient than ArrayList.
*
* We don't return empty batch data. If you get a batch data, you can iterate the data as the following codes:
* This class records a time list and a value list, which could be replaced by TVList in the future
*
* When you use BatchData in query process, it does not contain duplicated timestamps. The batch data
* may be empty.
*
* If you get a batch data, you can iterate the data as the following codes:
*
* while (batchData.hasCurrent()) {
* long time = batchData.currentTime();
......@@ -45,7 +50,6 @@ public class BatchData implements Serializable {
private static final long serialVersionUID = -4620310601188394839L;
private int timeCapacity = 16;
private int valueCapacity = 16;
private int emptyTimeCapacity = 1;
private int capacityThreshold = 1024;
private TSDataType dataType;
......@@ -78,7 +82,6 @@ public class BatchData implements Serializable {
private int valueLength;
private ArrayList<long[]> timeRet;
private ArrayList<long[]> emptyTimeRet;
private ArrayList<boolean[]> booleanRet;
private ArrayList<int[]> intRet;
private ArrayList<long[]> longRet;
......@@ -90,22 +93,13 @@ public class BatchData implements Serializable {
dataType = null;
}
public BatchData(TSDataType type) {
dataType = type;
}
/**
* BatchData Constructor.
*
* @param type Data type to record for this BatchData
* @param recordTime whether to record time value for this BatchData
*/
public BatchData(TSDataType type, boolean recordTime) {
init(type, recordTime, false);
}
public BatchData(TSDataType type, boolean recordTime, boolean hasEmptyTime) {
init(type, recordTime, hasEmptyTime);
public BatchData(TSDataType type) {
init(type);
}
public boolean isEmpty() {
......@@ -176,10 +170,8 @@ public class BatchData implements Serializable {
* initialize batch data.
*
* @param type TSDataType
* @param recordTime if record time
* @param hasEmptyTime if has empty time
*/
public void init(TSDataType type, boolean recordTime, boolean hasEmptyTime) {
public void init(TSDataType type) {
this.dataType = type;
this.valueArrayIdx = 0;
this.curValueIdx = 0;
......@@ -187,18 +179,11 @@ public class BatchData implements Serializable {
this.curIdx = 0;
capacityThreshold = TSFileConfig.DYNAMIC_DATA_SIZE;
if (recordTime) {
timeRet = new ArrayList<>();
timeRet.add(new long[timeCapacity]);
timeArrayIdx = 0;
curTimeIdx = 0;
count = 0;
}
if (hasEmptyTime) {
emptyTimeRet = new ArrayList<>();
emptyTimeRet.add(new long[emptyTimeCapacity]);
}
timeRet = new ArrayList<>();
timeRet.add(new long[timeCapacity]);
timeArrayIdx = 0;
curTimeIdx = 0;
count = 0;
switch (dataType) {
case BOOLEAN:
......@@ -422,13 +407,6 @@ public class BatchData implements Serializable {
}
}
private void rangeCheckForEmptyTime(int idx) {
if (idx < 0) {
throw new IndexOutOfBoundsException(
"BatchData empty time range check, Index is negative: " + idx);
}
}
public boolean getBoolean() {
rangeCheck(curIdx);
return this.booleanRet.get(curIdx / timeCapacity)[curIdx % timeCapacity];
......@@ -493,24 +471,6 @@ public class BatchData implements Serializable {
this.timeRet.get(idx / timeCapacity)[idx % timeCapacity] = v;
}
public long getEmptyTime(int idx) {
rangeCheckForEmptyTime(idx);
return this.emptyTimeRet.get(idx / emptyTimeCapacity)[idx % emptyTimeCapacity];
}
/**
* get time as array in long[] structure.
*
* @return time array
*/
public long[] getTimeAsArray() {
long[] res = new long[count];
for (int i = 0; i < count; i++) {
res[i] = timeRet.get(i / timeCapacity)[i % timeCapacity];
}
return res;
}
/**
* put an object.
*
......@@ -541,45 +501,10 @@ public class BatchData implements Serializable {
}
}
/**
* set an object.
*
* @param idx object id
* @param v object value
*/
public void setAnObject(int idx, Comparable<?> v) {
switch (dataType) {
case BOOLEAN:
setBoolean(idx, (Boolean) v);
break;
case DOUBLE:
setDouble(idx, (Double) v);
break;
case TEXT:
setBinary(idx, (Binary) v);
break;
case FLOAT:
setFloat(idx, (Float) v);
break;
case INT32:
setInt(idx, (Integer) v);
break;
case INT64:
setLong(idx, (Long) v);
break;
default:
throw new UnSupportedDataTypeException(String.valueOf(dataType));
}
}
public int length() {
return this.count;
}
public int getCurIdx() {
return curIdx;
}
public long getTimeByIndex(int idx) {
rangeCheckForTime(idx);
return this.timeRet.get(idx / timeCapacity)[idx % timeCapacity];
......@@ -615,25 +540,6 @@ public class BatchData implements Serializable {
return booleanRet.get(idx / timeCapacity)[idx % timeCapacity];
}
public Object getValueByIndex(int idx) {
switch (dataType) {
case INT32:
return getIntByIndex(idx);
case INT64:
return getLongByIndex(idx);
case FLOAT:
return getFloatByIndex(idx);
case DOUBLE:
return getDoubleByIndex(idx);
case BOOLEAN:
return getBooleanByIndex(idx);
case TEXT:
return getBinaryByIndex(idx);
default:
return null;
}
}
public Object getValueInTimestamp(long time) {
while (hasCurrent()) {
if (currentTime() < time) {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.tsfile.read.reader.chunk;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.compress.IUnCompressor;
import org.apache.iotdb.tsfile.encoding.common.EndianType;
import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
import org.apache.iotdb.tsfile.file.header.ChunkHeader;
import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.reader.IBatchReader;
import org.apache.iotdb.tsfile.read.reader.page.PageReader;
public abstract class AbstractChunkReader implements IBatchReader {
private ChunkHeader chunkHeader;
private ByteBuffer chunkDataBuffer;
private IUnCompressor unCompressor;
private Decoder valueDecoder;
private Decoder timeDecoder = Decoder.getDecoderByType(
TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()),
TSDataType.INT64);
protected Filter filter;
private BatchData data;
private PageHeader pageHeader;
private boolean hasCachedPageHeader;
/**
* Data whose timestamp <= deletedAt should be considered deleted(not be returned).
*/
protected long deletedAt;
/**
* constructor of ChunkReader.
*
* @param chunk input Chunk object
* @param filter filter
*/
public AbstractChunkReader(Chunk chunk, Filter filter) {
this.filter = filter;
this.chunkDataBuffer = chunk.getData();
this.deletedAt = chunk.getDeletedAt();
EndianType endianType = chunk.getEndianType();
chunkHeader = chunk.getHeader();
this.unCompressor = IUnCompressor.getUnCompressor(chunkHeader.getCompressionType());
valueDecoder = Decoder
.getDecoderByType(chunkHeader.getEncodingType(), chunkHeader.getDataType());
valueDecoder.setEndianType(endianType);
data = new BatchData(chunkHeader.getDataType());
hasCachedPageHeader = false;
}
/**
* judge if has nextBatch.
*/
public boolean hasNextBatch() {
if (hasCachedPageHeader) {
return true;
}
// construct next satisfied page header
while (chunkDataBuffer.remaining() > 0) {
// deserialize a PageHeader from chunkDataBuffer
pageHeader = PageHeader.deserializeFrom(chunkDataBuffer, chunkHeader.getDataType());
// if the current page satisfies
if (pageSatisfied(pageHeader)) {
hasCachedPageHeader = true;
return true;
} else {
skipBytesInStreamByLength(pageHeader.getCompressedSize());
}
}
return false;
}
/**
* get next data batch.
*
* @return next data batch
* @throws IOException IOException
*/
public BatchData nextBatch() throws IOException {
PageReader pageReader = constructPageReaderForNextPage(pageHeader.getCompressedSize());
hasCachedPageHeader = false;
if (pageReader.hasNextBatch()) {
data = pageReader.nextBatch();
return data;
}
return data;
}
public PageHeader nextPageHeader() {
return pageHeader;
}
public void skipPageData() {
skipBytesInStreamByLength(pageHeader.getCompressedSize());
hasCachedPageHeader = false;
}
private void skipBytesInStreamByLength(long length) {
chunkDataBuffer.position(chunkDataBuffer.position() + (int) length);
}
public abstract boolean pageSatisfied(PageHeader pageHeader);
private PageReader constructPageReaderForNextPage(int compressedPageBodyLength)
throws IOException {
byte[] compressedPageBody = new byte[compressedPageBodyLength];
// already in memory
if (compressedPageBodyLength > chunkDataBuffer.remaining()) {
throw new IOException(
"unexpected byte read length when read compressedPageBody. Expected:"
+ Arrays.toString(compressedPageBody) + ". Actual:" + chunkDataBuffer
.remaining());
}
chunkDataBuffer.get(compressedPageBody, 0, compressedPageBodyLength);
valueDecoder.reset();
ByteBuffer pageData = ByteBuffer.wrap(unCompressor.uncompress(compressedPageBody));
PageReader reader = new PageReader(pageData, chunkHeader.getDataType(),
valueDecoder, timeDecoder, filter);
reader.setDeletedAt(deletedAt);
return reader;
}
public void close() {
}
public ChunkHeader getChunkHeader() {
return chunkHeader;
}
}
......@@ -16,28 +16,147 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.tsfile.read.reader.chunk;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.compress.IUnCompressor;
import org.apache.iotdb.tsfile.encoding.common.EndianType;
import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
import org.apache.iotdb.tsfile.file.header.ChunkHeader;
import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.reader.page.PageReader;
public class ChunkReader {
private ChunkHeader chunkHeader;
private ByteBuffer chunkDataBuffer;
private IUnCompressor unCompressor;
private Decoder valueDecoder;
private Decoder timeDecoder = Decoder.getDecoderByType(
TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()),
TSDataType.INT64);
public class ChunkReader extends AbstractChunkReader {
protected Filter filter;
private PageHeader pageHeader;
private boolean hasCachedPageHeader;
/**
* Data whose timestamp <= deletedAt should be considered deleted(not be returned).
*/
protected long deletedAt;
/**
* constructor of ChunkReader.
*
* @param chunk input Chunk object
* @param filter filter
*/
public ChunkReader(Chunk chunk, Filter filter) {
super(chunk, filter);
this.filter = filter;
this.chunkDataBuffer = chunk.getData();
this.deletedAt = chunk.getDeletedAt();
EndianType endianType = chunk.getEndianType();
chunkHeader = chunk.getHeader();
this.unCompressor = IUnCompressor.getUnCompressor(chunkHeader.getCompressionType());
valueDecoder = Decoder
.getDecoderByType(chunkHeader.getEncodingType(), chunkHeader.getDataType());
valueDecoder.setEndianType(endianType);
hasCachedPageHeader = false;
}
/**
* judge if has next page whose page header satisfies the filter.
*/
public boolean hasNextSatisfiedPage() {
if (hasCachedPageHeader) {
return true;
}
// construct next satisfied page header
while (chunkDataBuffer.remaining() > 0) {
// deserialize a PageHeader from chunkDataBuffer
pageHeader = PageHeader.deserializeFrom(chunkDataBuffer, chunkHeader.getDataType());
// if the current page satisfies
if (pageSatisfied(pageHeader)) {
hasCachedPageHeader = true;
return true;
} else {
skipBytesInStreamByLength(pageHeader.getCompressedSize());
}
}
return false;
}
/**
* get next data batch.
*
* @return next data batch
* @throws IOException IOException
*/
public BatchData nextPageData() throws IOException {
if(hasCachedPageHeader || hasNextSatisfiedPage()) {
PageReader pageReader = constructPageReaderForNextPage(pageHeader);
hasCachedPageHeader = false;
return pageReader.getAllSatisfiedPageData();
} else {
throw new IOException("no next page data");
}
}
public PageHeader nextPageHeader() {
return pageHeader;
}
public void skipPageData() {
skipBytesInStreamByLength(pageHeader.getCompressedSize());
hasCachedPageHeader = false;
}
private void skipBytesInStreamByLength(long length) {
chunkDataBuffer.position(chunkDataBuffer.position() + (int) length);
}
@Override
public boolean pageSatisfied(PageHeader pageHeader) {
if (pageHeader.getEndTime() < deletedAt) {
if (pageHeader.getEndTime() <= deletedAt) {
return false;
}
if (filter == null ) {
return true;
} else {
return filter.satisfy(pageHeader.getStatistics());
return filter == null || filter.satisfy(pageHeader.getStatistics());
}
private PageReader constructPageReaderForNextPage(PageHeader pageHeader)
throws IOException {
int compressedPageBodyLength = pageHeader.getCompressedSize();
byte[] compressedPageBody = new byte[compressedPageBodyLength];
// doesn't has a complete page body
if (compressedPageBodyLength > chunkDataBuffer.remaining()) {
throw new IOException("do not has a complete page body. Expected:" + compressedPageBodyLength
+ ". Actual:" + chunkDataBuffer.remaining());
}
chunkDataBuffer.get(compressedPageBody);
valueDecoder.reset();
ByteBuffer pageData = ByteBuffer.wrap(unCompressor.uncompress(compressedPageBody));
PageReader reader = new PageReader(pageData, chunkHeader.getDataType(),
valueDecoder, timeDecoder, filter);
reader.setDeletedAt(deletedAt);
return reader;
}
public void close() {
}
public ChunkHeader getChunkHeader() {
return chunkHeader;
}
}
......@@ -21,7 +21,7 @@ package org.apache.iotdb.tsfile.read.reader.chunk;
import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.read.common.Chunk;
public class ChunkReaderByTimestamp extends AbstractChunkReader {
public class ChunkReaderByTimestamp extends ChunkReader {
private long currentTimestamp;
......
......@@ -25,7 +25,9 @@ import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.filter.TimeFilter;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils;
......@@ -45,32 +47,24 @@ public class PageReader {
/** value column in memory */
private ByteBuffer valueBuffer;
private BatchData data = null;
private Filter filter = null;
private Filter filter;
/** Data whose timestamp <= deletedAt should be considered deleted(not be returned). */
private long deletedAt = Long.MIN_VALUE;
public PageReader(ByteBuffer pageData, TSDataType dataType, Decoder valueDecoder,
Decoder timeDecoder,
Filter filter) {
this(pageData, dataType, valueDecoder, timeDecoder);
this.filter = filter;
}
public PageReader(ByteBuffer pageData, TSDataType dataType, Decoder valueDecoder,
Decoder timeDecoder) {
Decoder timeDecoder, Filter filter) {
this.dataType = dataType;
this.valueDecoder = valueDecoder;
this.timeDecoder = timeDecoder;
this.filter = filter;
splitDataToTimeStampAndValue(pageData);
}
/**
* split pageContent into two stream: time and value
*
* @param pageData
* uncompressed bytes size of time column, time column, value column
* @param pageData uncompressed bytes size of time column, time column, value column
*/
private void splitDataToTimeStampAndValue(ByteBuffer pageData) {
int timeBufferLength = ReadWriteForEncodingUtils.readUnsignedVarInt(pageData);
......@@ -82,30 +76,12 @@ public class PageReader {
valueBuffer.position(timeBufferLength);
}
public boolean hasNextBatch() throws IOException {
return timeDecoder.hasNext(timeBuffer);
}
/**
* may return an empty BatchData
* @return the returned BatchData may be empty, but never be null
*/
public BatchData nextBatch() throws IOException {
if (filter == null) {
data = getAllPageData();
} else {
data = getAllPageDataWithFilter();
}
return data;
}
public BatchData currentBatch() {
return data;
}
private BatchData getAllPageData() throws IOException {
public BatchData getAllSatisfiedPageData() throws IOException {
BatchData pageData = new BatchData(dataType, true);
BatchData pageData = new BatchData(dataType);
while (timeDecoder.hasNext(timeBuffer)) {
long timestamp = timeDecoder.readLong(timeBuffer);
......@@ -113,42 +89,42 @@ public class PageReader {
switch (dataType) {
case BOOLEAN:
boolean aBoolean = valueDecoder.readBoolean(valueBuffer);
if (timestamp > deletedAt) {
if (timestamp > deletedAt && (filter == null || filter.satisfy(timestamp, aBoolean))) {
pageData.putTime(timestamp);
pageData.putBoolean(aBoolean);
}
break;
case INT32:
int anInt = valueDecoder.readInt(valueBuffer);
if (timestamp > deletedAt) {
if (timestamp > deletedAt && (filter == null || filter.satisfy(timestamp, anInt))) {
pageData.putTime(timestamp);
pageData.putInt(anInt);
}
break;
case INT64:
long aLong = valueDecoder.readLong(valueBuffer);
if (timestamp > deletedAt) {
if (timestamp > deletedAt && (filter == null || filter.satisfy(timestamp, aLong))) {
pageData.putTime(timestamp);
pageData.putLong(aLong);
}
break;
case FLOAT:
float aFloat = valueDecoder.readFloat(valueBuffer);
if (timestamp > deletedAt) {
if (timestamp > deletedAt && (filter == null || filter.satisfy(timestamp, aFloat))) {
pageData.putTime(timestamp);
pageData.putFloat(aFloat);
}
break;
case DOUBLE:
double aDouble = valueDecoder.readDouble(valueBuffer);
if (timestamp > deletedAt) {
if (timestamp > deletedAt && (filter == null || filter.satisfy(timestamp, aDouble))) {
pageData.putTime(timestamp);
pageData.putDouble(aDouble);
}
break;
case TEXT:
Binary aBinary = valueDecoder.readBinary(valueBuffer);
if (timestamp > deletedAt) {
if (timestamp > deletedAt && (filter == null || filter.satisfy(timestamp, aBinary))) {
pageData.putTime(timestamp);
pageData.putBinary(aBinary);
}
......@@ -160,86 +136,6 @@ public class PageReader {
return pageData;
}
private BatchData getAllPageDataWithFilter() throws IOException {
BatchData pageData = new BatchData(dataType, true);
while (timeDecoder.hasNext(timeBuffer)) {
long timestamp = timeDecoder.readLong(timeBuffer);
switch (dataType) {
case BOOLEAN:
readBoolean(pageData, timestamp);
break;
case INT32:
readInt(pageData, timestamp);
break;
case INT64:
readLong(pageData, timestamp);
break;
case FLOAT:
readFloat(pageData, timestamp);
break;
case DOUBLE:
readDouble(pageData, timestamp);
break;
case TEXT:
readText(pageData, timestamp);
break;
default:
throw new UnSupportedDataTypeException(String.valueOf(dataType));
}
}
return pageData;
}
private void readBoolean(BatchData pageData, long timestamp) {
boolean aBoolean = valueDecoder.readBoolean(valueBuffer);
if (timestamp > deletedAt && filter.satisfy(timestamp, aBoolean)) {
pageData.putTime(timestamp);
pageData.putBoolean(aBoolean);
}
}
private void readInt(BatchData pageData, long timestamp) {
int anInt = valueDecoder.readInt(valueBuffer);
if (timestamp > deletedAt && filter.satisfy(timestamp, anInt)) {
pageData.putTime(timestamp);
pageData.putInt(anInt);
}
}
private void readLong(BatchData pageData, long timestamp) {
long aLong = valueDecoder.readLong(valueBuffer);
if (timestamp > deletedAt && filter.satisfy(timestamp, aLong)) {
pageData.putTime(timestamp);
pageData.putLong(aLong);
}
}
private void readFloat(BatchData pageData, long timestamp) {
float aFloat = valueDecoder.readFloat(valueBuffer);
if (timestamp > deletedAt && filter.satisfy(timestamp, aFloat)) {
pageData.putTime(timestamp);
pageData.putFloat(aFloat);
}
}
private void readDouble(BatchData pageData, long timestamp) {
double aDouble = valueDecoder.readDouble(valueBuffer);
if (timestamp > deletedAt && filter.satisfy(timestamp, aDouble)) {
pageData.putTime(timestamp);
pageData.putDouble(aDouble);
}
}
private void readText(BatchData pageData, long timestamp) {
Binary aBinary = valueDecoder.readBinary(valueBuffer);
if (timestamp > deletedAt && filter.satisfy(timestamp, aBinary)) {
pageData.putTime(timestamp);
pageData.putBinary(aBinary);
}
}
public void close() {
timeBuffer = null;
......
......@@ -25,7 +25,7 @@ import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.controller.IChunkLoader;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.reader.IAggregateReader;
import org.apache.iotdb.tsfile.read.reader.chunk.AbstractChunkReader;
import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
import java.io.IOException;
import java.util.List;
......@@ -37,7 +37,7 @@ public abstract class AbstractFileSeriesReader implements IAggregateReader {
protected IChunkLoader chunkLoader;
protected List<ChunkMetaData> chunkMetaDataList;
protected AbstractChunkReader chunkReader;
protected ChunkReader chunkReader;
private int chunkToRead;
private BatchData data;
......@@ -63,7 +63,7 @@ public abstract class AbstractFileSeriesReader implements IAggregateReader {
public boolean hasNextBatch() throws IOException {
// current chunk has additional batch
if (chunkReader != null && chunkReader.hasNextBatch()) {
if (chunkReader != null && chunkReader.hasNextSatisfiedPage()) {
return true;
}
......@@ -75,7 +75,7 @@ public abstract class AbstractFileSeriesReader implements IAggregateReader {
// chunk metadata satisfy the condition
initChunkReader(chunkMetaData);
if (chunkReader.hasNextBatch()) {
if (chunkReader.hasNextSatisfiedPage()) {
return true;
}
}
......@@ -87,7 +87,7 @@ public abstract class AbstractFileSeriesReader implements IAggregateReader {
* get next batch data.
*/
public BatchData nextBatch() throws IOException {
data = chunkReader.nextBatch();
data = chunkReader.nextPageData();
return data;
}
......
......@@ -25,7 +25,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.controller.IChunkLoader;
import org.apache.iotdb.tsfile.read.reader.chunk.AbstractChunkReader;
import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderByTimestamp;
/**
......@@ -40,7 +40,7 @@ public class FileSeriesReaderByTimestamp {
protected List<ChunkMetaData> chunkMetaDataList;
private int currentChunkIndex = 0;
private AbstractChunkReader chunkReader;
private ChunkReader chunkReader;
private long currentTimestamp;
private BatchData data = null; // current batch data
......@@ -69,8 +69,8 @@ public class FileSeriesReaderByTimestamp {
return null;
}
if (chunkReader.hasNextBatch()) {
data = chunkReader.nextBatch();
if (chunkReader.hasNextSatisfiedPage()) {
data = chunkReader.nextPageData();
} else {
return null;
}
......@@ -93,8 +93,8 @@ public class FileSeriesReaderByTimestamp {
}
return null;
} else {
if (chunkReader.hasNextBatch()) {
data = chunkReader.nextBatch();
if (chunkReader.hasNextSatisfiedPage()) {
data = chunkReader.nextPageData();
} else if (!constructNextSatisfiedChunkReader()) {
return null;
}
......@@ -115,16 +115,16 @@ public class FileSeriesReaderByTimestamp {
if (data != null && data.hasCurrent()) {
return true;
}
while (chunkReader.hasNextBatch()) {
data = chunkReader.nextBatch();
while (chunkReader.hasNextSatisfiedPage()) {
data = chunkReader.nextPageData();
if (data != null && data.hasCurrent()) {
return true;
}
}
}
while (constructNextSatisfiedChunkReader()) {
while (chunkReader.hasNextBatch()) {
data = chunkReader.nextBatch();
while (chunkReader.hasNextSatisfiedPage()) {
data = chunkReader.nextPageData();
if (data != null && data.hasCurrent()) {
return true;
}
......
......@@ -95,7 +95,7 @@ public class NodeTest {
public FakedFileSeriesReader(long[] timestamps) {
super(null, null, null);
data = new BatchData(TSDataType.INT32, true);
data = new BatchData(TSDataType.INT32);
for (long time : timestamps) {
data.putTime(time);
}
......
......@@ -171,15 +171,12 @@ public class PageReaderTest {
ByteBuffer page = ByteBuffer.wrap(pageWriter.getUncompressedBytes().array());
PageReader pageReader = new PageReader(page, dataType, decoder,
new DeltaBinaryDecoder.LongDeltaDecoder());
new DeltaBinaryDecoder.LongDeltaDecoder(), null);
int index = 0;
long startTimestamp = System.currentTimeMillis();
BatchData data = null;
if (pageReader.hasNextBatch()) {
data = pageReader.nextBatch();
}
assert data != null;
BatchData data = pageReader.getAllSatisfiedPageData();
Assert.assertNotNull(data);
while (data.hasCurrent()) {
Assert.assertEquals(Long.valueOf(index), (Long) data.currentTime());
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册