提交 50f2da57 编写于 作者: R RUI, LEI 提交者: suyue

Refactor the reader package for readability (#241)

* Refactor the reader package for readability

The former code in the org.apache.iotdb.db.query.reader package is
somewhat diffcult to read.

Through this refactor, two main tasks are completed:

* the three levels of readers are made explicit: chunkRelated <
resoureRelated < seriesReader

* the two interfaces for creating the unsequence readers in
SeriesReaderFactoryImpl are removed. Instead, two classes UnseqResourceMergeReader
and UnseqResourceReaderByTimestamp are used. This change brings two
benefits: a) SeriesReaderFactoryImpl is thus focused purely on the
creation of series readers and acts as a middleman between query
executor and data reader.b) The classes for sequence and unsequence
classes are symmetric finally.

The future work includes further removing duplicated code and writing
better annotations.

* Deprecate the SeriesReaderFactoryImpl

The former code creates the three series related readers (
SeriesReaderWithoutValueFilter, SeriesReaderWithValueFilter and
SeriesReaderByTimestamp) by the help of SeriesReaderFactoryImpl.
However, thanks to the latest refactor of the reader package, it becomes
clear that the creation tasks can be handed over to the constructors
of the three series reader classes respectively. This change brings about
two benefits: 1) the role of the three series readers are more
clear and controllable since now the inputs of their constructors are not
pre-created readers but the raw resouces. 2) the SeriesReaderFactoryImpl
now can be removed or changed to perform other tasks such as code
deduplication.

update

* Add JAVA annotations and improve related tests.

reformat license

* fix the bug in getValueInTimestamp of SeqResourceReaderByTimestamp; further split the subpackage chunkRelated into two subpackages chunkRelated and fileRelated.

* unsealed sequence TsFile's endTimeMap can also be used in constructNextReader of SeqResourceReaderByTimestamp to speed up

* add resource filter step in the resource related reader construction process

* add more annotations for isTsFileNotSatisfied and constructNextReader
上级 72b30e14
......@@ -22,7 +22,7 @@ package org.apache.iotdb.db.query.aggregation;
import java.io.IOException;
import org.apache.iotdb.db.exception.ProcessorException;
import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.query.reader.IReaderByTimeStamp;
import org.apache.iotdb.db.query.reader.IReaderByTimestamp;
import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.BatchData;
......@@ -115,7 +115,7 @@ public abstract class AggregateFunction {
* @throws IOException TsFile data read error
*/
public abstract void calcAggregationUsingTimestamps(long[] timestamps, int length,
IReaderByTimeStamp dataReader) throws IOException;
IReaderByTimestamp dataReader) throws IOException;
/**
* Judge if aggregation results have been calculated. In other words, if the aggregated result
......
......@@ -23,7 +23,7 @@ import java.io.IOException;
import org.apache.iotdb.db.query.aggregation.AggreResultData;
import org.apache.iotdb.db.query.aggregation.AggregateFunction;
import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.query.reader.IReaderByTimeStamp;
import org.apache.iotdb.db.query.reader.IReaderByTimestamp;
import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.BatchData;
......@@ -134,7 +134,7 @@ public class CountAggrFunc extends AggregateFunction {
@Override
public void calcAggregationUsingTimestamps(long[] timestamps, int length,
IReaderByTimeStamp dataReader) throws IOException {
IReaderByTimestamp dataReader) throws IOException {
int cnt = 0;
for (int i = 0; i < length; i++) {
Object value = dataReader.getValueInTimestamp(timestamps[i]);
......
......@@ -24,7 +24,7 @@ import org.apache.iotdb.db.exception.ProcessorException;
import org.apache.iotdb.db.query.aggregation.AggreResultData;
import org.apache.iotdb.db.query.aggregation.AggregateFunction;
import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.query.reader.IReaderByTimeStamp;
import org.apache.iotdb.db.query.reader.IReaderByTimestamp;
import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.BatchData;
......@@ -133,7 +133,7 @@ public class FirstAggrFunc extends AggregateFunction {
@Override
public void calcAggregationUsingTimestamps(long[] timestamps, int length,
IReaderByTimeStamp dataReader) throws IOException {
IReaderByTimestamp dataReader) throws IOException {
if (resultData.isSetTime()) {
return;
}
......
......@@ -23,7 +23,7 @@ import java.io.IOException;
import org.apache.iotdb.db.query.aggregation.AggreResultData;
import org.apache.iotdb.db.query.aggregation.AggregateFunction;
import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.query.reader.IReaderByTimeStamp;
import org.apache.iotdb.db.query.reader.IReaderByTimestamp;
import org.apache.iotdb.db.utils.TimeValuePair;
import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
......@@ -117,7 +117,7 @@ public class LastAggrFunc extends AggregateFunction {
@Override
public void calcAggregationUsingTimestamps(long[] timestamps, int length,
IReaderByTimeStamp dataReader) throws IOException {
IReaderByTimestamp dataReader) throws IOException {
long time = -1;
Object lastVal = null;
......
......@@ -23,7 +23,7 @@ import java.io.IOException;
import org.apache.iotdb.db.query.aggregation.AggreResultData;
import org.apache.iotdb.db.query.aggregation.AggregateFunction;
import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.query.reader.IReaderByTimeStamp;
import org.apache.iotdb.db.query.reader.IReaderByTimestamp;
import org.apache.iotdb.db.utils.TimeValuePair;
import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
......@@ -106,7 +106,7 @@ public class MaxTimeAggrFunc extends AggregateFunction {
//TODO Consider how to reverse order in dataReader(IReaderByTimeStamp)
@Override
public void calcAggregationUsingTimestamps(long[] timestamps, int length,
IReaderByTimeStamp dataReader) throws IOException {
IReaderByTimestamp dataReader) throws IOException {
long time = -1;
for (int i = 0; i < length; i++) {
Object value = dataReader.getValueInTimestamp(timestamps[i]);
......
......@@ -23,7 +23,7 @@ import java.io.IOException;
import org.apache.iotdb.db.query.aggregation.AggreResultData;
import org.apache.iotdb.db.query.aggregation.AggregateFunction;
import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.query.reader.IReaderByTimeStamp;
import org.apache.iotdb.db.query.reader.IReaderByTimestamp;
import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.BatchData;
......@@ -140,7 +140,7 @@ public class MaxValueAggrFunc extends AggregateFunction {
@Override
public void calcAggregationUsingTimestamps(long[] timestamps, int length,
IReaderByTimeStamp dataReader) throws IOException {
IReaderByTimestamp dataReader) throws IOException {
Comparable<Object> maxVal = null;
for (int i = 0; i < length; i++) {
Object value = dataReader.getValueInTimestamp(timestamps[i]);
......
......@@ -23,7 +23,7 @@ import java.io.IOException;
import org.apache.iotdb.db.query.aggregation.AggreResultData;
import org.apache.iotdb.db.query.aggregation.AggregateFunction;
import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.query.reader.IReaderByTimeStamp;
import org.apache.iotdb.db.query.reader.IReaderByTimestamp;
import org.apache.iotdb.db.utils.TimeValuePair;
import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
......@@ -149,7 +149,7 @@ public class MeanAggrFunc extends AggregateFunction {
@Override
public void calcAggregationUsingTimestamps(long[] timestamps, int length,
IReaderByTimeStamp dataReader) throws IOException {
IReaderByTimestamp dataReader) throws IOException {
for (int i = 0; i < length; i++) {
Object value = dataReader.getValueInTimestamp(timestamps[i]);
if (value != null) {
......
......@@ -23,7 +23,7 @@ import java.io.IOException;
import org.apache.iotdb.db.query.aggregation.AggreResultData;
import org.apache.iotdb.db.query.aggregation.AggregateFunction;
import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.query.reader.IReaderByTimeStamp;
import org.apache.iotdb.db.query.reader.IReaderByTimestamp;
import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.BatchData;
......@@ -134,7 +134,7 @@ public class MinTimeAggrFunc extends AggregateFunction {
@Override
public void calcAggregationUsingTimestamps(long[] timestamps, int length,
IReaderByTimeStamp dataReader) throws IOException {
IReaderByTimestamp dataReader) throws IOException {
if (resultData.isSetValue()) {
return;
}
......
......@@ -23,7 +23,7 @@ import java.io.IOException;
import org.apache.iotdb.db.query.aggregation.AggreResultData;
import org.apache.iotdb.db.query.aggregation.AggregateFunction;
import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.query.reader.IReaderByTimeStamp;
import org.apache.iotdb.db.query.reader.IReaderByTimestamp;
import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.BatchData;
......@@ -135,7 +135,7 @@ public class MinValueAggrFunc extends AggregateFunction {
@Override
public void calcAggregationUsingTimestamps(long[] timestamps, int length,
IReaderByTimeStamp dataReader) throws IOException {
IReaderByTimestamp dataReader) throws IOException {
Comparable<Object> minVal = null;
for (int i = 0; i < length; i++) {
Object value = dataReader.getValueInTimestamp(timestamps[i]);
......
......@@ -20,7 +20,7 @@ package org.apache.iotdb.db.query.dataset;
import java.io.IOException;
import java.util.List;
import org.apache.iotdb.db.query.reader.IReaderByTimeStamp;
import org.apache.iotdb.db.query.reader.IReaderByTimestamp;
import org.apache.iotdb.db.query.timegenerator.EngineTimeGenerator;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Field;
......@@ -31,7 +31,7 @@ import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
public class EngineDataSetWithValueFilter extends QueryDataSet {
private EngineTimeGenerator timeGenerator;
private List<IReaderByTimeStamp> seriesReaderByTimestampList;
private List<IReaderByTimestamp> seriesReaderByTimestampList;
private boolean hasCachedRowRecord;
private RowRecord cachedRowRecord;
......@@ -44,7 +44,7 @@ public class EngineDataSetWithValueFilter extends QueryDataSet {
* @param readers readers in List(IReaderByTimeStamp) structure
*/
public EngineDataSetWithValueFilter(List<Path> paths, List<TSDataType> dataTypes,
EngineTimeGenerator timeGenerator, List<IReaderByTimeStamp> readers) {
EngineTimeGenerator timeGenerator, List<IReaderByTimestamp> readers) {
super(paths, dataTypes);
this.timeGenerator = timeGenerator;
this.seriesReaderByTimestampList = readers;
......@@ -78,7 +78,7 @@ public class EngineDataSetWithValueFilter extends QueryDataSet {
long timestamp = timeGenerator.next();
RowRecord rowRecord = new RowRecord(timestamp);
for (int i = 0; i < seriesReaderByTimestampList.size(); i++) {
IReaderByTimeStamp reader = seriesReaderByTimestampList.get(i);
IReaderByTimestamp reader = seriesReaderByTimestampList.get(i);
Object value = reader.getValueInTimestamp(timestamp);
if (value == null) {
rowRecord.addField(new Field(null));
......@@ -100,7 +100,7 @@ public class EngineDataSetWithValueFilter extends QueryDataSet {
return timeGenerator;
}
public List<IReaderByTimeStamp> getReaders() {
public List<IReaderByTimestamp> getReaders() {
return seriesReaderByTimestampList;
}
}
......@@ -23,14 +23,14 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.exception.ProcessorException;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.query.aggregation.AggregateFunction;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.db.query.factory.SeriesReaderFactoryImpl;
import org.apache.iotdb.db.query.reader.IReaderByTimeStamp;
import org.apache.iotdb.db.query.reader.IReaderByTimestamp;
import org.apache.iotdb.db.query.reader.seriesRelated.SeriesReaderByTimestamp;
import org.apache.iotdb.db.query.timegenerator.EngineTimeGenerator;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.common.RowRecord;
......@@ -40,7 +40,7 @@ import org.apache.iotdb.tsfile.utils.Pair;
public class GroupByWithValueFilterDataSet extends GroupByEngineDataSet {
private List<IReaderByTimeStamp> allDataReaderList;
private List<IReaderByTimestamp> allDataReaderList;
private TimeGenerator timestampGenerator;
/**
* cached timestamp for next group by partition.
......@@ -77,8 +77,11 @@ public class GroupByWithValueFilterDataSet extends GroupByEngineDataSet {
QueryResourceManager
.getInstance().beginQueryOfGivenQueryPaths(context.getJobId(), selectedSeries);
this.timestampGenerator = new EngineTimeGenerator(expression, context);
this.allDataReaderList = SeriesReaderFactoryImpl.getInstance()
.createSeriesReadersByTimestamp(selectedSeries, context);
this.allDataReaderList = new ArrayList<>();
for (Path path : selectedSeries) {
SeriesReaderByTimestamp seriesReaderByTimestamp = new SeriesReaderByTimestamp(path, context);
allDataReaderList.add(seriesReaderByTimestamp);
}
}
@Override
......
......@@ -19,18 +19,21 @@
package org.apache.iotdb.db.query.dataset.groupby;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.exception.ProcessorException;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.query.aggregation.AggreResultData;
import org.apache.iotdb.db.query.aggregation.AggregateFunction;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.db.query.factory.SeriesReaderFactoryImpl;
import org.apache.iotdb.db.query.reader.IAggregateReader;
import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.query.reader.sequence.SequenceSeriesReader;
import org.apache.iotdb.db.query.reader.resourceRelated.SeqResourceIterateReader;
import org.apache.iotdb.db.query.reader.resourceRelated.UnseqResourceMergeReader;
import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.common.Field;
......@@ -41,10 +44,6 @@ import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.utils.Pair;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet {
private List<IPointReader> unSequenceReaderList;
......@@ -86,17 +85,17 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet {
.getQueryDataSource(path, context);
// sequence reader for sealed tsfile, unsealed tsfile, memory
SequenceSeriesReader sequenceReader = new SequenceSeriesReader(
IAggregateReader seqResourceIterateReader = new SeqResourceIterateReader(
queryDataSource.getSeriesPath(), queryDataSource.getSeqResources(), timeFilter, context,
false);
// unseq reader for all chunk groups in unSeqFile, memory
IPointReader unSeqMergeReader = SeriesReaderFactoryImpl.getInstance()
.createUnseqSeriesReader(queryDataSource.getSeriesPath(), queryDataSource.getUnseqResources(), context,
timeFilter);
IPointReader unseqResourceMergeReader = new UnseqResourceMergeReader(
queryDataSource.getSeriesPath(), queryDataSource.getUnseqResources(), context,
timeFilter);
sequenceReaderList.add(sequenceReader);
unSequenceReaderList.add(unSeqMergeReader);
sequenceReaderList.add(seqResourceIterateReader);
unSequenceReaderList.add(unseqResourceMergeReader);
}
}
......
......@@ -19,11 +19,14 @@
package org.apache.iotdb.db.query.executor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.exception.ProcessorException;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.query.aggregation.AggreResultData;
import org.apache.iotdb.db.query.aggregation.AggregateFunction;
......@@ -34,11 +37,12 @@ import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.db.query.dataset.AggreResultDataPointReader;
import org.apache.iotdb.db.query.dataset.EngineDataSetWithoutValueFilter;
import org.apache.iotdb.db.query.factory.AggreFuncFactory;
import org.apache.iotdb.db.query.factory.SeriesReaderFactoryImpl;
import org.apache.iotdb.db.query.reader.IAggregateReader;
import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.query.reader.IReaderByTimeStamp;
import org.apache.iotdb.db.query.reader.sequence.SequenceSeriesReader;
import org.apache.iotdb.db.query.reader.IReaderByTimestamp;
import org.apache.iotdb.db.query.reader.resourceRelated.SeqResourceIterateReader;
import org.apache.iotdb.db.query.reader.resourceRelated.UnseqResourceMergeReader;
import org.apache.iotdb.db.query.reader.seriesRelated.SeriesReaderByTimestamp;
import org.apache.iotdb.db.query.timegenerator.EngineTimeGenerator;
import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
......@@ -49,10 +53,6 @@ import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class AggregateEngineExecutor {
private List<Path> selectedSeries;
......@@ -104,22 +104,21 @@ public class AggregateEngineExecutor {
.getQueryDataSource(selectedSeries.get(i), context);
// sequence reader for sealed tsfile, unsealed tsfile, memory
SequenceSeriesReader sequenceReader;
IAggregateReader seqResourceIterateReader;
if (function instanceof MaxTimeAggrFunc || function instanceof LastAggrFunc) {
sequenceReader = new SequenceSeriesReader(queryDataSource.getSeriesPath(),
seqResourceIterateReader = new SeqResourceIterateReader(queryDataSource.getSeriesPath(),
queryDataSource.getSeqResources(), timeFilter, context, true);
} else {
sequenceReader = new SequenceSeriesReader(queryDataSource.getSeriesPath(),
seqResourceIterateReader = new SeqResourceIterateReader(queryDataSource.getSeriesPath(),
queryDataSource.getSeqResources(), timeFilter, context, false);
}
// unseq reader for all chunk groups in unSeqFile, memory
IPointReader unSeqMergeReader = SeriesReaderFactoryImpl.getInstance()
.createUnseqSeriesReader(queryDataSource.getSeriesPath(), queryDataSource.getUnseqResources(), context,
timeFilter);
IPointReader unseqResourceMergeReader= new UnseqResourceMergeReader(queryDataSource.getSeriesPath(),
queryDataSource.getUnseqResources(), context, timeFilter);
readersOfSequenceData.add(sequenceReader);
readersOfUnSequenceData.add(unSeqMergeReader);
readersOfSequenceData.add(seqResourceIterateReader);
readersOfUnSequenceData.add(unseqResourceMergeReader);
}
List<AggreResultData> aggreResultDataList = new ArrayList<>();
//TODO use multi-thread
......@@ -264,8 +263,11 @@ public class AggregateEngineExecutor {
QueryResourceManager.getInstance().beginQueryOfGivenExpression(context.getJobId(), expression);
EngineTimeGenerator timestampGenerator = new EngineTimeGenerator(expression, context);
List<IReaderByTimeStamp> readersOfSelectedSeries = SeriesReaderFactoryImpl.getInstance()
.createSeriesReadersByTimestamp(selectedSeries, context);
List<IReaderByTimestamp> readersOfSelectedSeries = new ArrayList<>();
for (Path path : selectedSeries) {
SeriesReaderByTimestamp seriesReaderByTimestamp = new SeriesReaderByTimestamp(path, context);
readersOfSelectedSeries.add(seriesReaderByTimestamp);
}
List<AggregateFunction> aggregateFunctions = new ArrayList<>();
for (int i = 0; i < selectedSeries.size(); i++) {
......@@ -286,7 +288,7 @@ public class AggregateEngineExecutor {
private List<AggreResultData> aggregateWithValueFilter(
List<AggregateFunction> aggregateFunctions,
EngineTimeGenerator timestampGenerator,
List<IReaderByTimeStamp> readersOfSelectedSeries)
List<IReaderByTimestamp> readersOfSelectedSeries)
throws IOException {
while (timestampGenerator.hasNext()) {
......
......@@ -21,16 +21,17 @@ package org.apache.iotdb.db.query.executor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.db.query.dataset.EngineDataSetWithValueFilter;
import org.apache.iotdb.db.query.dataset.EngineDataSetWithoutValueFilter;
import org.apache.iotdb.db.query.factory.SeriesReaderFactoryImpl;
import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.query.reader.IReaderByTimeStamp;
import org.apache.iotdb.db.query.reader.IReaderByTimestamp;
import org.apache.iotdb.db.query.reader.seriesRelated.SeriesReaderByTimestamp;
import org.apache.iotdb.db.query.reader.seriesRelated.SeriesReaderWithoutValueFilter;
import org.apache.iotdb.db.query.timegenerator.EngineTimeGenerator;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Path;
......@@ -55,6 +56,8 @@ public class EngineExecutor {
*/
public QueryDataSet executeWithoutValueFilter(QueryContext context)
throws StorageEngineException, IOException {
QueryResourceManager.getInstance()
.beginQueryOfGivenQueryPaths(context.getJobId(), queryExpression.getSelectedSeries());
Filter timeFilter = null;
if (queryExpression.hasQueryFilter()) {
......@@ -63,21 +66,15 @@ public class EngineExecutor {
List<IPointReader> readersOfSelectedSeries = new ArrayList<>();
List<TSDataType> dataTypes = new ArrayList<>();
QueryResourceManager.getInstance()
.beginQueryOfGivenQueryPaths(context.getJobId(), queryExpression.getSelectedSeries());
for (Path path : queryExpression.getSelectedSeries()) {
// add data type
try {
// add data type
dataTypes.add(MManager.getInstance().getSeriesType(path.getFullPath()));
} catch (PathErrorException e) {
throw new StorageEngineException(e);
}
IPointReader reader = SeriesReaderFactoryImpl.getInstance()
.createSeriesReaderWithoutValueFilter(path, timeFilter, context);
IPointReader reader = new SeriesReaderWithoutValueFilter(path, timeFilter, context);
readersOfSelectedSeries.add(reader);
}
......@@ -102,22 +99,21 @@ public class EngineExecutor {
QueryResourceManager.getInstance()
.beginQueryOfGivenExpression(context.getJobId(), queryExpression.getExpression());
EngineTimeGenerator timestampGenerator;
List<IReaderByTimeStamp> readersOfSelectedSeries;
timestampGenerator = new EngineTimeGenerator(queryExpression.getExpression(), context);
readersOfSelectedSeries = SeriesReaderFactoryImpl.getInstance()
.createSeriesReadersByTimestamp(queryExpression.getSelectedSeries(), context);
EngineTimeGenerator timestampGenerator = new EngineTimeGenerator(
queryExpression.getExpression(), context);
List<IReaderByTimestamp> readersOfSelectedSeries = new ArrayList<>();
List<TSDataType> dataTypes = new ArrayList<>();
for (Path path : queryExpression.getSelectedSeries()) {
try {
// add data type
dataTypes.add(MManager.getInstance().getSeriesType(path.getFullPath()));
} catch (PathErrorException e) {
throw new StorageEngineException(e);
}
SeriesReaderByTimestamp seriesReaderByTimestamp = new SeriesReaderByTimestamp(path, context);
readersOfSelectedSeries.add(seriesReaderByTimestamp);
}
return new EngineDataSetWithValueFilter(queryExpression.getSelectedSeries(), dataTypes,
timestampGenerator,
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.query.factory;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.query.reader.IReaderByTimeStamp;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import java.io.IOException;
import java.util.List;
/**
* This class defines the interface of constructing readers for different data source. Note that, job
* id equals -1 meant that this method is used for IoTDB merge process, it's no need to maintain the
* opened file stream.
*/
public interface ISeriesReaderFactory {
/*
* This method is used to read unsequence data for IoTDB request, such as query, aggregation
* and groupby request.
*
* @param seriesPath the path of the time series to be read
* @param unseqResources unsequence data in the seriesPath
* @param context query context
* @param filter It can a combination of time and value filter or null.
* @return unsequence series reader
*/
IPointReader createUnseqSeriesReader(Path seriesPath, List<TsFileResource> unseqResources,
QueryContext context,
Filter filter) throws IOException;
/**
* construct a list of SeriesReaderByTimestamp, including sequence data and unsequence data.
*
* @param paths the paths of the time series to be read
* @param context query context
* @return a list of IReaderByTimeStamp
*/
List<IReaderByTimeStamp> createSeriesReadersByTimestamp(List<Path> paths,
QueryContext context) throws StorageEngineException, IOException;
/**
* construct IPointReader with <b>only time filter or no filter</b>, including sequence data
* and unsequence data. This reader won't filter the result of merged sequence data and
* unsequence data reader.
*
* @param path the path of the time series to be read
* @param timeFilter time filter or null
* @param context query context
* @return data reader including seq and unseq data source.
*/
IPointReader createSeriesReaderWithoutValueFilter(Path path, Filter timeFilter,
QueryContext context) throws StorageEngineException, IOException;
/**
* construct IPointReader with <b>value filter</b>, include sequence data and unsequence
* data. This reader will filter the result of merged sequence data and unsequence data
* reader, so if only time filter is used, please call createSeriesReaderWithoutValueFilter().
*
* @param path selected series path
* @param filter filter that contains at least a value filter
* @param context query context
* @return data reader including sequence and unsequence data source.
*/
IPointReader createSeriesReaderWithValueFilter(Path path, Filter filter, QueryContext context)
throws StorageEngineException, IOException;
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.query.factory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.query.reader.IReaderByTimeStamp;
import org.apache.iotdb.db.query.reader.SeriesReaderWithValueFilter;
import org.apache.iotdb.db.query.reader.SeriesReaderWithoutValueFilter;
import org.apache.iotdb.db.query.reader.mem.MemChunkReader;
import org.apache.iotdb.db.query.reader.mem.MemChunkReaderByTimestamp;
import org.apache.iotdb.db.query.reader.merge.SeriesReaderByTimestamp;
import org.apache.iotdb.db.query.reader.sequence.SequenceSeriesReader;
import org.apache.iotdb.db.query.reader.sequence.SequenceSeriesReaderByTimestamp;
import org.apache.iotdb.db.query.reader.unsequence.DiskChunkReader;
import org.apache.iotdb.db.query.reader.unsequence.DiskChunkReaderByTimestamp;
import org.apache.iotdb.db.query.reader.unsequence.UnsequenceSeriesReader;
import org.apache.iotdb.db.utils.QueryUtils;
import org.apache.iotdb.tsfile.common.constant.StatisticConstant;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.controller.ChunkLoaderImpl;
import org.apache.iotdb.tsfile.read.controller.MetadataQuerierByFileImpl;
import org.apache.iotdb.tsfile.read.filter.DigestForFilter;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderByTimestamp;
import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderWithFilter;
import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderWithoutFilter;
public class SeriesReaderFactoryImpl implements ISeriesReaderFactory {
private SeriesReaderFactoryImpl() {
}
public static SeriesReaderFactoryImpl getInstance() {
return SeriesReaderFactoryHelper.INSTANCE;
}
@Override
public IPointReader createUnseqSeriesReader(Path seriesPath, List<TsFileResource> unseqResources,
QueryContext context,
Filter filter) throws IOException {
UnsequenceSeriesReader unseqMergeReader = new UnsequenceSeriesReader();
int priorityValue = 1;
for (TsFileResource tsFileResource : unseqResources) {
priorityValue = constructReaders(tsFileResource, unseqMergeReader, context, filter,
seriesPath, priorityValue);
}
// TODO add external sort when needed
return unseqMergeReader;
}
private int constructReaders(TsFileResource tsFileResource,
UnsequenceSeriesReader unseqMergeReader, QueryContext context, Filter filter,
Path seriesPath, int priorityValue)
throws IOException {
int newPriority = constructChunkReader(tsFileResource, seriesPath,
context, filter, unseqMergeReader, priorityValue);
// add reader for MemTable
if (!tsFileResource.isClosed()) {
unseqMergeReader.addReaderWithPriority(
new MemChunkReader(tsFileResource.getReadOnlyMemChunk(), filter), newPriority++);
}
return newPriority;
}
private int constructChunkReader(TsFileResource tsFileResource, Path seriesPath,
QueryContext context, Filter filter, UnsequenceSeriesReader unseqMergeReader, int priority)
throws IOException {
int newPriority = priority;
// store only one opened file stream into manager, to avoid too many opened files
TsFileSequenceReader tsFileReader = FileReaderManager.getInstance()
.get(tsFileResource.getFile().getPath(), tsFileResource.isClosed());
// get modified chunk metadatas
List<ChunkMetaData> metaDataList;
if (tsFileResource.isClosed()) {
MetadataQuerierByFileImpl metadataQuerier = new MetadataQuerierByFileImpl(tsFileReader);
metaDataList = metadataQuerier.getChunkMetaDataList(seriesPath);
// mod
List<Modification> pathModifications = context
.getPathModifications(tsFileResource.getModFile(),
seriesPath.getFullPath());
if (!pathModifications.isEmpty()) {
QueryUtils.modifyChunkMetaData(metaDataList, pathModifications);
}
} else {
metaDataList = tsFileResource.getChunkMetaDatas();
}
// add readers for chunks
// TODO future advancement: decrease the duplicated code
ChunkLoaderImpl chunkLoader = new ChunkLoaderImpl(tsFileReader);
for (ChunkMetaData chunkMetaData : metaDataList) {
DigestForFilter digest = new DigestForFilter(chunkMetaData.getStartTime(),
chunkMetaData.getEndTime(),
chunkMetaData.getDigest().getStatistics().get(StatisticConstant.MIN_VALUE),
chunkMetaData.getDigest().getStatistics().get(StatisticConstant.MAX_VALUE),
chunkMetaData.getTsDataType());
if (filter != null && !filter.satisfy(digest)) {
continue;
}
Chunk chunk = chunkLoader.getChunk(chunkMetaData);
ChunkReader chunkReader = filter != null ? new ChunkReaderWithFilter(chunk, filter)
: new ChunkReaderWithoutFilter(chunk);
unseqMergeReader.addReaderWithPriority(new DiskChunkReader(chunkReader), newPriority);
newPriority++;
}
return newPriority;
}
private SeriesReaderByTimestamp createUnseqSeriesReaderByTimestamp(Path seriesPath,
List<TsFileResource> unseqResources, QueryContext context) throws IOException {
SeriesReaderByTimestamp unseqMergeReader = new SeriesReaderByTimestamp();
int priorityValue = 1;
for (TsFileResource tsFileResource : unseqResources) {
// store only one opened file stream into manager, to avoid too many opened files
TsFileSequenceReader tsFileReader = FileReaderManager.getInstance()
.get(tsFileResource.getFile().getPath(), tsFileResource.isClosed());
List<ChunkMetaData> metaDataList;
if (tsFileResource.isClosed()) {
MetadataQuerierByFileImpl metadataQuerier = new MetadataQuerierByFileImpl(tsFileReader);
metaDataList = metadataQuerier.getChunkMetaDataList(seriesPath);
// mod
List<Modification> pathModifications = context
.getPathModifications(tsFileResource.getModFile(),
seriesPath.getFullPath());
if (!pathModifications.isEmpty()) {
QueryUtils.modifyChunkMetaData(metaDataList, pathModifications);
}
} else {
metaDataList = tsFileResource.getChunkMetaDatas();
}
// add reader for chunk
ChunkLoaderImpl chunkLoader = new ChunkLoaderImpl(tsFileReader);
for (ChunkMetaData chunkMetaData : metaDataList) {
Chunk chunk = chunkLoader.getChunk(chunkMetaData);
ChunkReaderByTimestamp chunkReader = new ChunkReaderByTimestamp(chunk);
unseqMergeReader.addReaderWithPriority(new DiskChunkReaderByTimestamp(chunkReader),
priorityValue);
priorityValue++;
}
// add reader for MemTable
if (!tsFileResource.isClosed()) {
unseqMergeReader.addReaderWithPriority(
new MemChunkReaderByTimestamp(tsFileResource.getReadOnlyMemChunk()), priorityValue++);
}
}
// TODO add external sort when needed
return unseqMergeReader;
}
@Override
public List<IReaderByTimeStamp> createSeriesReadersByTimestamp(List<Path> paths,
QueryContext context) throws StorageEngineException, IOException {
List<IReaderByTimeStamp> readersOfSelectedSeries = new ArrayList<>();
for (Path path : paths) {
QueryDataSource queryDataSource = QueryResourceManager.getInstance()
.getQueryDataSource(path,
context);
SeriesReaderByTimestamp mergeReaderByTimestamp = new SeriesReaderByTimestamp();
// reader for sequence data
SequenceSeriesReaderByTimestamp tsFilesReader = new SequenceSeriesReaderByTimestamp(path,
queryDataSource.getSeqResources(), context);
mergeReaderByTimestamp.addReaderWithPriority(tsFilesReader, 1);
// reader for unSequence data
SeriesReaderByTimestamp unSeqMergeReader = createUnseqSeriesReaderByTimestamp(path,
queryDataSource.getUnseqResources(), context);
mergeReaderByTimestamp.addReaderWithPriority(unSeqMergeReader, 2);
readersOfSelectedSeries.add(mergeReaderByTimestamp);
}
return readersOfSelectedSeries;
}
@Override
public IPointReader createSeriesReaderWithoutValueFilter(Path path, Filter timeFilter,
QueryContext context)
throws StorageEngineException, IOException {
QueryDataSource queryDataSource = QueryResourceManager.getInstance()
.getQueryDataSource(path, context);
// sequence reader for one sealed tsfile
SequenceSeriesReader tsFilesReader;
tsFilesReader = new SequenceSeriesReader(queryDataSource.getSeriesPath(),
queryDataSource.getSeqResources(),
timeFilter, context);
// unseq reader for all chunk groups in unseqFile
IPointReader unseqMergeReader;
unseqMergeReader = createUnseqSeriesReader(path, queryDataSource.getUnseqResources(), context,
timeFilter);
if (!tsFilesReader.hasNext()) {
//only have unsequence data.
return unseqMergeReader;
} else {
//merge sequence data with unsequence data.
return new SeriesReaderWithoutValueFilter(tsFilesReader, unseqMergeReader);
}
}
@Override
public IPointReader createSeriesReaderWithValueFilter(Path path, Filter filter, QueryContext context)
throws StorageEngineException, IOException {
QueryDataSource queryDataSource = QueryResourceManager.getInstance()
.getQueryDataSource(path, context);
// sequence reader for one sealed tsfile
SequenceSeriesReader tsFilesReader;
tsFilesReader = new SequenceSeriesReader(queryDataSource.getSeriesPath(),
queryDataSource.getSeqResources(),
filter, context);
// unseq reader for all chunk groups in unseqFile. Filter for unseqMergeReader is null, because
// we won't push down filter in unsequence data source.
IPointReader unseqMergeReader;
unseqMergeReader = createUnseqSeriesReader(path, queryDataSource.getUnseqResources(), context, null);
return new SeriesReaderWithValueFilter(tsFilesReader, unseqMergeReader, filter);
}
private static class SeriesReaderFactoryHelper {
private static final SeriesReaderFactoryImpl INSTANCE = new SeriesReaderFactoryImpl();
private SeriesReaderFactoryHelper() {
}
}
}
......@@ -22,8 +22,8 @@ package org.apache.iotdb.db.query.fill;
import java.io.IOException;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.factory.SeriesReaderFactoryImpl;
import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.query.reader.seriesRelated.SeriesReaderWithoutValueFilter;
import org.apache.iotdb.db.utils.TimeValuePair;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Path;
......@@ -53,8 +53,7 @@ public abstract class IFill {
void constructReaders(Path path, QueryContext context, long beforeRange)
throws IOException, StorageEngineException {
Filter timeFilter = constructFilter(beforeRange);
allDataReader = SeriesReaderFactoryImpl.getInstance()
.createSeriesReaderWithoutValueFilter(path, timeFilter, context);
allDataReader = new SeriesReaderWithoutValueFilter(path, timeFilter, context);
}
public abstract IPointReader getFillResult() throws IOException;
......
......@@ -16,7 +16,6 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.query.reader;
import java.io.IOException;
......@@ -25,8 +24,10 @@ import org.apache.iotdb.tsfile.file.header.PageHeader;
public interface IAggregateReader extends IBatchReader {
/**
* Returns meta-information of batch data. If batch data comes from memory, return null. If batch
* data comes from page data, return pageHeader.
* Returns meta-information of batch data.
* <p>
* Returns null if batch data comes from memory. Returns pageHeader if batch data comes from page
* data.
*/
PageHeader nextPageHeader() throws IOException;
......
......@@ -16,7 +16,6 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.query.reader;
import java.io.IOException;
......
......@@ -16,13 +16,12 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.query.reader;
import java.io.IOException;
import org.apache.iotdb.db.utils.TimeValuePair;
public interface IPointReader{
public interface IPointReader {
boolean hasNext() throws IOException;
......
......@@ -20,14 +20,19 @@ package org.apache.iotdb.db.query.reader;
import java.io.IOException;
public interface IReaderByTimeStamp {
public interface IReaderByTimestamp {
/**
* Given a timestamp, the reader is supposed to return the corresponding value in the timestamp.
* If no value in this timestamp, null will be returned.
*
* Note that when the higher layer needs to call this function multiple times, it is required that the timestamps
* be in strictly increasing order.
* Returns the corresponding value under this timestamp. Returns null if no value under this
* timestamp.
* <p>
* Note that calling this method will change the status of this reader irreversibly just like
* <code>next</code>. The difference is that <code>next</code> moves one step forward while
* <code>getValueInTimestamp</code> advances towards the given timestamp.
* <p>
* Attention: DO call this method with monotonically increasing timestamps. There is no guarantee
* of correctness with any other way of calling. For example, DO NOT call this method twice with
* the same timestamp.
*/
Object getValueInTimestamp(long timestamp) throws IOException;
......
......@@ -16,19 +16,23 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.query.reader.unsequence;
package org.apache.iotdb.db.query.reader.chunkRelated;
import java.io.IOException;
import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.utils.TimeValuePair;
import org.apache.iotdb.db.utils.TimeValuePairUtils;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
import java.io.IOException;
/**
* IPointReader of data in a chunk.
* Note that currently this class is only used by unsequence query logic.
* To read chunk data on disk, this class implements an interface {@link IPointReader} based on the
* data reader {@link ChunkReader}.
* <p>
* Note that <code>ChunkReader</code> is an abstract class with three concrete classes, two of which
* are used here: <code>ChunkReaderWithoutFilter</code> and <code>ChunkReaderWithFilter</code>.
* <p>
* This class is used in {@link org.apache.iotdb.db.query.reader.resourceRelated.UnseqResourceMergeReader}.
*/
public class DiskChunkReader implements IPointReader {
......@@ -66,7 +70,7 @@ public class DiskChunkReader implements IPointReader {
}
@Override
public void close() throws IOException {
public void close() {
this.chunkReader.close();
}
}
......@@ -16,30 +16,28 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.query.reader.unsequence;
package org.apache.iotdb.db.query.reader.chunkRelated;
import org.apache.iotdb.db.query.reader.IReaderByTimeStamp;
import java.io.IOException;
import org.apache.iotdb.db.query.reader.IReaderByTimestamp;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderByTimestamp;
import java.io.IOException;
/**
* IReaderByTimeStamp of data in a chunk.
* Note that currently this class is only used by unsequence query logic.
* To read chunk data on disk by timestamp, this class implements an interface {@link
* IReaderByTimestamp} based on the data reader {@link ChunkReaderByTimestamp}.
* <p>
* This class is used in {@link org.apache.iotdb.db.query.reader.resourceRelated.UnseqResourceReaderByTimestamp}.
*/
public class DiskChunkReaderByTimestamp implements IReaderByTimeStamp {
public class DiskChunkReaderByTimestamp implements IReaderByTimestamp {
private ChunkReaderByTimestamp chunkReader;
private ChunkReaderByTimestamp chunkReaderByTimestamp;
private BatchData data;
public DiskChunkReaderByTimestamp(ChunkReaderByTimestamp chunkReader) {
this.chunkReader = chunkReader;
public DiskChunkReaderByTimestamp(ChunkReaderByTimestamp chunkReaderByTimestamp) {
this.chunkReaderByTimestamp = chunkReaderByTimestamp;
}
/**
* get value with time equals timestamp. If there is no such point, return null.
*/
@Override
public Object getValueInTimestamp(long timestamp) throws IOException {
......@@ -55,9 +53,9 @@ public class DiskChunkReaderByTimestamp implements IReaderByTimeStamp {
if (data.hasNext()) {
return null;
} else {
chunkReader.setCurrentTimestamp(timestamp);
if (chunkReader.hasNextBatch()) {
data = chunkReader.nextBatch();
chunkReaderByTimestamp.setCurrentTimestamp(timestamp);
if (chunkReaderByTimestamp.hasNextBatch()) {
data = chunkReaderByTimestamp.nextBatch();
} else {
return null;
}
......@@ -72,10 +70,10 @@ public class DiskChunkReaderByTimestamp implements IReaderByTimeStamp {
if (data != null && data.hasNext()) {
return true;
}
if (chunkReader != null && chunkReader.hasNextBatch()) {
data = chunkReader.nextBatch();
if (chunkReaderByTimestamp != null && chunkReaderByTimestamp.hasNextBatch()) {
data = chunkReaderByTimestamp.nextBatch();
return true;
}
return false;
}
}
}
\ No newline at end of file
......@@ -16,20 +16,26 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.query.reader.chunkRelated;
package org.apache.iotdb.db.query.reader.mem;
import java.util.Iterator;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.query.reader.IAggregateReader;
import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.query.reader.fileRelated.UnSealedTsFileIterateReader;
import org.apache.iotdb.db.utils.TimeValuePair;
import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import java.util.Iterator;
/**
* To read chunk data in memory, this class implements two interfaces {@link IPointReader} and
* {@link IAggregateReader} based on the data source {@link ReadOnlyMemChunk}.
* <p>
* This class is used in {@link UnSealedTsFileIterateReader} and {@link
* org.apache.iotdb.db.query.reader.resourceRelated.UnseqResourceMergeReader}.
*/
public class MemChunkReader implements IPointReader, IAggregateReader {
private Iterator<TimeValuePair> timeValuePairIterator;
......@@ -39,9 +45,6 @@ public class MemChunkReader implements IPointReader, IAggregateReader {
private TSDataType dataType;
/**
* memory data reader.
*/
public MemChunkReader(ReadOnlyMemChunk readableChunk, Filter filter) {
timeValuePairIterator = readableChunk.getIterator();
this.filter = filter;
......@@ -117,4 +120,4 @@ public class MemChunkReader implements IPointReader, IAggregateReader {
public void skipPageData() {
nextBatch();
}
}
}
\ No newline at end of file
......@@ -16,21 +16,28 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.query.reader.mem;
package org.apache.iotdb.db.query.reader.chunkRelated;
import java.util.Iterator;
import org.apache.iotdb.db.engine.memtable.TimeValuePairSorter;
import org.apache.iotdb.db.query.reader.IReaderByTimeStamp;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.query.reader.IReaderByTimestamp;
import org.apache.iotdb.db.query.reader.fileRelated.UnSealedTsFileReaderByTimestamp;
import org.apache.iotdb.db.utils.TimeValuePair;
public class MemChunkReaderByTimestamp implements IReaderByTimeStamp {
/**
* To read data in memory by timestamp, this class implements an interface {@link
* IReaderByTimestamp} based on the data source {@link ReadOnlyMemChunk}.
* <p>
* This class is used in {@link UnSealedTsFileReaderByTimestamp} and {@link
* org.apache.iotdb.db.query.reader.resourceRelated.UnseqResourceReaderByTimestamp}.
*/
public class MemChunkReaderByTimestamp implements IReaderByTimestamp {
private Iterator<TimeValuePair> timeValuePairIterator;
private boolean hasCachedTimeValuePair;
private TimeValuePair cachedTimeValuePair;
public MemChunkReaderByTimestamp(TimeValuePairSorter readableChunk) {
public MemChunkReaderByTimestamp(ReadOnlyMemChunk readableChunk) {
timeValuePairIterator = readableChunk.getIterator();
}
......
......@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.query.reader.sequence.adapter;
package org.apache.iotdb.db.query.reader.fileRelated;
import java.io.IOException;
import org.apache.iotdb.db.query.reader.IAggregateReader;
......@@ -25,7 +25,14 @@ import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.reader.series.FileSeriesReader;
/**
* This class is used to convert FileSeriesReader (in TsFile) to IAggregateReader.
* To read a sequence TsFile's on-disk data, this class implements an interface {@link
* IAggregateReader} based on the data reader {@link FileSeriesReader}.
* <p>
* Note that <code>FileSeriesReader</code> is an abstract class with two concrete classes:
* <code>FileSeriesReaderWithoutFilter</code> and <code>FileSeriesReaderWithFilter</code>.
* <p>
* This class is used in {@link UnSealedTsFileIterateReader} and {@link
* org.apache.iotdb.db.query.reader.resourceRelated.SeqResourceIterateReader}.
*/
public class FileSeriesReaderAdapter implements IAggregateReader {
......
......@@ -16,21 +16,24 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.query.reader.sequence.adapter;
package org.apache.iotdb.db.query.reader.fileRelated;
import java.io.IOException;
import org.apache.iotdb.db.query.reader.IReaderByTimeStamp;
import org.apache.iotdb.db.query.reader.IReaderByTimestamp;
import org.apache.iotdb.tsfile.read.reader.series.FileSeriesReaderByTimestamp;
/**
* This class is used to convert FileSeriesReaderByTimestamp (in TsFile) to IReaderByTimeStamp.
* To read a sequence TsFile's on-disk data by timestamp, this class implements an interface {@link
* IReaderByTimestamp} based on the data reader {@link FileSeriesReaderByTimestamp}.
* <p>
* This class is used in {@link org.apache.iotdb.db.query.reader.resourceRelated.SeqResourceReaderByTimestamp}.
*/
public class FileSeriesReaderByTimestampAdapter implements IReaderByTimeStamp {
public class FileSeriesReaderByTimestampAdapter implements IReaderByTimestamp {
private FileSeriesReaderByTimestamp fileSeriesReaderByTimestamp;
public FileSeriesReaderByTimestampAdapter(FileSeriesReaderByTimestamp fileSeriesReaderByTimestamp) {
public FileSeriesReaderByTimestampAdapter(
FileSeriesReaderByTimestamp fileSeriesReaderByTimestamp) {
this.fileSeriesReaderByTimestamp = fileSeriesReaderByTimestamp;
}
......
......@@ -16,18 +16,16 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.query.reader.sequence;
package org.apache.iotdb.db.query.reader.fileRelated;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.db.query.reader.IAggregateReader;
import org.apache.iotdb.db.query.reader.mem.MemChunkReader;
import org.apache.iotdb.db.query.reader.sequence.adapter.FileSeriesReaderAdapter;
import org.apache.iotdb.db.query.reader.chunkRelated.MemChunkReader;
import org.apache.iotdb.db.query.reader.universal.IterateReader;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.controller.ChunkLoader;
......@@ -38,59 +36,95 @@ import org.apache.iotdb.tsfile.read.reader.series.FileSeriesReaderWithFilter;
import org.apache.iotdb.tsfile.read.reader.series.FileSeriesReaderWithoutFilter;
/**
* batch reader of data in: 1) the data in unseal tsfile part which has been flushed to disk; 2) the
* data in flushing memtable list.
* To read an unsealed sequence TsFile, this class extends {@link IterateReader} to implement {@link
* IAggregateReader} for the TsFile.
* <p>
* Note that an unsealed sequence TsFile consists of two parts of data in chronological order: 1)
* data that has been flushed to disk and 2) data in the flushing memtable list.
* <p>
* This class is used in {@link org.apache.iotdb.db.query.reader.resourceRelated.SeqResourceIterateReader}.
*/
public class UnSealedTsFileReader extends IterateReader {
public class UnSealedTsFileIterateReader extends IterateReader {
private TsFileResource unsealedTsFile;
private Filter filter;
List<IAggregateReader> unSealedResources;
/**
* Whether the reverse order is enabled.
* <p>
* True to iterate over chunk data in reverse chronological order (from newest to oldest); False
* to iterate over chunk data in chronological order (from oldest to newest).
*/
private boolean enableReverse;
/**
* Construct funtion for UnSealedTsFileReader.
* Constructor function.
* <p>
* An unsealed sequence TsFile consists of two parts of data in chronological order: 1) data that
* has been flushed to disk and 2) data in the flushing memtable list. <code>IterateReader</code>
* is used to iterate over the two parts. Therefore, this method calls the parent class
* <code>IterateReader</code>'s constructor to set <code>readerSize</code> to be 2. Readers for
* the two parts of data are created in order later in the method <code>constructNextReader</code>.
*
* @param unsealedTsFile -param to initial
* @param filter -filter
* @param isReverse true-traverse chunks from behind forward; false-traverse chunks from front to
* back;
* @param unsealedTsFile the TsFileResource corresponding to the unsealed TsFile
* @param filter filter condition
* @param isReverse True to iterate over chunk data in reverse chronological order (from newest to
* oldest); False to iterate over chunk data in chronological order (from oldest to newest).
*/
public UnSealedTsFileReader(TsFileResource unsealedTsFile, Filter filter, boolean isReverse)
throws IOException {
public UnSealedTsFileIterateReader(TsFileResource unsealedTsFile, Filter filter,
boolean isReverse) {
super(2);
TsFileSequenceReader unClosedTsFileReader = FileReaderManager.getInstance()
.get(unsealedTsFile.getFile().getPath(), false);
ChunkLoader chunkLoader = new ChunkLoaderImpl(unClosedTsFileReader);
this.enableReverse = isReverse;
this.unsealedTsFile = unsealedTsFile;
this.filter = filter;
}
List<ChunkMetaData> metaDataList = unsealedTsFile.getChunkMetaDatas();
// reverse chunk metadata list if traversing chunks from behind forward
if (isReverse && metaDataList != null && !metaDataList.isEmpty()) {
Collections.reverse(metaDataList);
@Override
protected boolean constructNextReader(int idx) throws IOException {
if (idx == 0) {
if (enableReverse) {
// data in memory first if it is to iterate over chunk data in reverse chronological order
currentSeriesReader = new MemChunkReader(unsealedTsFile.getReadOnlyMemChunk(), filter);
} else {
// data on disk first if it is to iterate over chunk data in chronological order
currentSeriesReader = initUnSealedTsFileDiskReader(unsealedTsFile, filter);
}
} else { // idx=1
if (enableReverse) {
currentSeriesReader = initUnSealedTsFileDiskReader(unsealedTsFile, filter);
} else {
currentSeriesReader = new MemChunkReader(unsealedTsFile.getReadOnlyMemChunk(), filter);
}
}
return true;
}
// data in unseal tsfile which has been flushed to disk
FileSeriesReader unSealedReader;
if (filter == null) {
unSealedReader = new FileSeriesReaderWithoutFilter(chunkLoader, metaDataList);
} else {
unSealedReader = new FileSeriesReaderWithFilter(chunkLoader, metaDataList, filter);
/**
* Creates <code>IAggregateReader</code> for an unsealed sequence TsFile's on-disk data.
*/
private IAggregateReader initUnSealedTsFileDiskReader(TsFileResource unSealedTsFile,
Filter filter)
throws IOException {
// prepare metaDataList
List<ChunkMetaData> metaDataList = unSealedTsFile.getChunkMetaDatas();
if (enableReverse && metaDataList != null && !metaDataList.isEmpty()) {
Collections.reverse(metaDataList);
}
unSealedResources = new ArrayList<>();
// data in flushing memtable
MemChunkReader memChunkReader = new MemChunkReader(unsealedTsFile.getReadOnlyMemChunk(),
filter);
if (isReverse) {
unSealedResources.add(memChunkReader);
unSealedResources.add(new FileSeriesReaderAdapter(unSealedReader));
} else {
unSealedResources.add(new FileSeriesReaderAdapter(unSealedReader));
unSealedResources.add(memChunkReader);
// prepare chunkLoader
TsFileSequenceReader unClosedTsFileReader = FileReaderManager.getInstance()
.get(unSealedTsFile.getFile().getPath(), false);
ChunkLoader chunkLoader = new ChunkLoaderImpl(unClosedTsFileReader);
// init fileSeriesReader
FileSeriesReader fileSeriesReader;
if (filter == null) {
fileSeriesReader = new FileSeriesReaderWithoutFilter(chunkLoader, metaDataList);
} else {
fileSeriesReader = new FileSeriesReaderWithFilter(chunkLoader, metaDataList, filter);
}
}
@Override
public boolean constructNextReader(int idx) {
currentSeriesReader = unSealedResources.get(idx);
return true;
return new FileSeriesReaderAdapter(fileSeriesReader);
}
}
......@@ -16,78 +16,81 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.query.reader.fileRelated;
package org.apache.iotdb.db.query.reader.sequence;
import java.io.IOException;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.db.query.reader.IReaderByTimeStamp;
import org.apache.iotdb.db.query.reader.mem.MemChunkReaderByTimestamp;
import org.apache.iotdb.db.query.reader.IReaderByTimestamp;
import org.apache.iotdb.db.query.reader.chunkRelated.MemChunkReaderByTimestamp;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.controller.ChunkLoader;
import org.apache.iotdb.tsfile.read.controller.ChunkLoaderImpl;
import org.apache.iotdb.tsfile.read.reader.series.FileSeriesReaderByTimestamp;
import java.io.IOException;
/**
* IReaderByTimeStamp of data in: 1) the data in unseal tsfile part which has been flushed to disk;
* 2) the data in flushing memtable list.
* To read an unsealed sequence TsFile by timestamp, this class implements an interface {@link
* IReaderByTimestamp} for the TsFile.
* <p>
* Note that an unsealed sequence TsFile consists of two parts of data in chronological order: 1)
* data that has been flushed to disk and 2) data in the flushing memtable list.
* <p>
* This class is used in {@link org.apache.iotdb.db.query.reader.resourceRelated.SeqResourceReaderByTimestamp}.
*/
public class UnSealedTsFileReaderByTimestamp implements IReaderByTimeStamp {
public class UnSealedTsFileReaderByTimestamp implements IReaderByTimestamp {
protected Path seriesPath;
/**
* reader the data of unseal tsfile part which has been flushed to disk
*/
private FileSeriesReaderByTimestamp unSealedReader;
/**
* reader of the data in flushing memtable list
* <code>FileSeriesReaderByTimestamp</code> for data which has been flushed to disk.
*/
private IReaderByTimeStamp memSeriesReader;
private FileSeriesReaderByTimestamp unSealedTsFileDiskReaderByTs;
/**
* whether unSealedReader has been used. True if current reader is memSeriesReader,
* false if current reader is unSealedReader.
* <code>IReaderByTimestamp</code> for data in the flushing memtable list.
*/
private boolean unSealedReaderEnded;
private IReaderByTimestamp unSealedTsFileMemReaderByTs;
/**
* Construct funtion for UnSealedTsFileReader.
*
* @param tsFileResource -unclosed tsfile resource
* Whether unSealedTsFileDiskReaderByTs has been run out of.
* <p>
* True means the current reader is unSealedTsFileMemReaderByTs; False means the current reader is
* still unSealedTsFileDiskReaderByTs.
*/
public UnSealedTsFileReaderByTimestamp(TsFileResource tsFileResource) throws IOException {
private boolean unSealedTsFileDiskReaderEnded;
public UnSealedTsFileReaderByTimestamp(TsFileResource unsealedTsFile) throws IOException {
// create IReaderByTimestamp for data in the flushing memtable list
unSealedTsFileMemReaderByTs = new MemChunkReaderByTimestamp(
unsealedTsFile.getReadOnlyMemChunk());
// create FileSeriesReaderByTimestamp for data which has been flushed to disk
TsFileSequenceReader unClosedTsFileReader = FileReaderManager.getInstance()
.get(tsFileResource.getFile().getPath(), false);
.get(unsealedTsFile.getFile().getPath(), false);
ChunkLoader chunkLoader = new ChunkLoaderImpl(unClosedTsFileReader);
unSealedReader = new FileSeriesReaderByTimestamp(chunkLoader,
tsFileResource.getChunkMetaDatas());
unSealedTsFileDiskReaderByTs = new FileSeriesReaderByTimestamp(chunkLoader,
unsealedTsFile.getChunkMetaDatas());
memSeriesReader = new MemChunkReaderByTimestamp(tsFileResource.getReadOnlyMemChunk());
unSealedReaderEnded = false;
unSealedTsFileDiskReaderEnded = false;
}
@Override
public Object getValueInTimestamp(long timestamp) throws IOException {
Object value = null;
if (!unSealedReaderEnded) {
value = unSealedReader.getValueInTimestamp(timestamp);
}
if (value != null || unSealedReader.hasNext()) {
return value;
} else {
unSealedReaderEnded = true;
if (!unSealedTsFileDiskReaderEnded) {
Object value = unSealedTsFileDiskReaderByTs.getValueInTimestamp(timestamp);
if (value != null || unSealedTsFileDiskReaderByTs.hasNext()) {
return value;
} else {
unSealedTsFileDiskReaderEnded = true;
}
}
return memSeriesReader.getValueInTimestamp(timestamp);
return unSealedTsFileMemReaderByTs.getValueInTimestamp(timestamp);
}
@Override
public boolean hasNext() throws IOException {
if (unSealedReaderEnded) {
return memSeriesReader.hasNext();
if (unSealedTsFileDiskReaderEnded) {
return unSealedTsFileMemReaderByTs.hasNext();
}
return (unSealedReader.hasNext() || memSeriesReader.hasNext());
return (unSealedTsFileDiskReaderByTs.hasNext() || unSealedTsFileMemReaderByTs.hasNext());
}
}
......@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
......@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.query.reader.sequence;
package org.apache.iotdb.db.query.reader.resourceRelated;
import java.io.IOException;
import java.util.Collections;
......@@ -25,7 +25,10 @@ import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.db.query.reader.sequence.adapter.FileSeriesReaderAdapter;
import org.apache.iotdb.db.query.reader.IAggregateReader;
import org.apache.iotdb.db.query.reader.fileRelated.FileSeriesReaderAdapter;
import org.apache.iotdb.db.query.reader.fileRelated.UnSealedTsFileIterateReader;
import org.apache.iotdb.db.query.reader.universal.IterateReader;
import org.apache.iotdb.db.utils.QueryUtils;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
......@@ -39,15 +42,26 @@ import org.apache.iotdb.tsfile.read.reader.series.FileSeriesReaderWithFilter;
import org.apache.iotdb.tsfile.read.reader.series.FileSeriesReaderWithoutFilter;
/**
* batch reader of data in: 1) sealed tsfile. 2) unsealed tsfile, which include data on disk of
* unsealed file and in memtables that will be flushing to unsealed tsfile.
* To read a chronologically ordered list of sequence TsFiles, this class extends {@link
* IterateReader} to implements <code>IAggregateReader</code> for the TsFiles.
* <p>
* Notes: 1) The list of sequence TsFiles is in strict chronological order. 2) The data in a
* sequence TsFile is also organized in chronological order. 3) A sequence TsFile can be either
* sealed or unsealed. 4) An unsealed sequence TsFile consists of two parts of data in chronological
* order: data that has been flushed to disk and data in the flushing memtable list.
* <p>
* This class is used in {@link org.apache.iotdb.db.query.reader.seriesRelated.SeriesReaderWithoutValueFilter}.
*/
public class SequenceSeriesReader extends IterateReader {
public class SeqResourceIterateReader extends IterateReader {
private Path seriesPath;
/**
* Is reverse the sequence of tsfiles(include sealed and unsealed tsfile) and chunks in tsfiles.
* True-traverse chunks from behind forward. False-traverse chunks from front to back.
* Whether the reverse order is enabled.
* <p>
* True to iterate over the list of sequence TsFiles and chunks in TsFiles in reverse
* chronological order (from newest to oldest); False to iterate in chronological order (from
* oldest to newest).
*/
private boolean enableReverse;
......@@ -55,101 +69,118 @@ public class SequenceSeriesReader extends IterateReader {
private Filter filter;
private QueryContext context;
/**
* init with globalSortedSeriesDataSource, filter, context and isReverse.
* Constructor function.
* <p>
* <code>IterateReader</code> is used to iterate over the chronologically ordered list of
* sequence TsFiles. Therefore, this method calls the parent class <code>IterateReader</code>'s
* constructor to set <code>readerSize</code> to be the size of the TsFile list. Readers for the
* TsFiles are created in order later in the method <code>constructNextReader</code>.
*
* @param seriesPath data source
* @param seqResources sealed and unsealed tsfile resources
* @param filter null if no filter
* @param seriesPath the path of the series data
* @param seqResources a list of sequence TsFile resources in chronological order
* @param filter filter condition. Null if no filter.
* @param context query context
* @param isReverse true-traverse chunks from behind forward, false-traverse chunks from front to
* back.
* @param isReverse True to iterate over data in reverse chronological order (from newest to
* oldest); False to iterate over data in chronological order (from oldest to newest).
*/
public SequenceSeriesReader(Path seriesPath, List<TsFileResource> seqResources,
public SeqResourceIterateReader(Path seriesPath, List<TsFileResource> seqResources,
Filter filter, QueryContext context, boolean isReverse) {
super(seqResources.size());
this.seriesPath = seriesPath;
this.enableReverse = isReverse;
if (isReverse) {
Collections.reverse(seqResources);
}
this.seqResources = seqResources;
this.seriesPath = seriesPath;
this.enableReverse = isReverse;
this.context = context;
this.filter = filter;
this.context = context;
}
/**
* traverse chunks from front to back.
*/
public SequenceSeriesReader(Path seriesPath, List<TsFileResource> seqResources,
public SeqResourceIterateReader(Path seriesPath, List<TsFileResource> seqResources,
Filter timeFilter, QueryContext context) {
this(seriesPath, seqResources, timeFilter, context, false);
}
/**
* If the idx-th TsFile in the <code>seqResources</code> might satisfy this <code>filter</code>,
* then construct <code>IAggregateReader</code> for it, assign to <code>currentSeriesReader</code>
* and return true. Otherwise, return false.
*
* @param idx the index of the TsFile in the resource list
* @return True if the reader is constructed; False if not.
*/
@Override
public boolean constructNextReader(int idx) throws IOException {
TsFileResource tsFileResource = seqResources.get(idx);
if (tsFileResource.isClosed()) {
if (singleTsFileSatisfied(tsFileResource, filter)) {
currentSeriesReader = new FileSeriesReaderAdapter(
initSealedTsFileReader(tsFileResource, filter, context));
return true;
} else {
if (isTsFileNotSatisfied(tsFileResource, filter)) {
return false;
}
currentSeriesReader = initSealedTsFileReader(tsFileResource, filter, context);
return true;
} else {
currentSeriesReader = new UnSealedTsFileReader(tsFileResource, filter, enableReverse);
// an unsealed sequence TsFile's endTimeMap size may be equal to 0 or greater than 0
// If endTimeMap size is 0, conservatively assume that this TsFile might satisfy this filter.
// If endTimeMap size is not 0, call isTsFileNotSatisfied to check.
if (tsFileResource.getEndTimeMap().size() != 0) {
if (isTsFileNotSatisfied(tsFileResource, filter)) {
return false;
}
}
currentSeriesReader = new UnSealedTsFileIterateReader(tsFileResource, filter,
enableReverse);
return true;
}
}
/**
* check if skip the tsfile.
* Returns true if the start and end time of the series data in this sequence TsFile do not
* satisfy the filter condition. Returns false if satisfy.
* <p>
* This method is used to in <code>constructNextReader</code> to check whether this TsFile can be
* skipped.
*
* @param tsfile tsfile resource.
* @param filter filter condition. If no filter, the filed is null.
* @param tsFile the TsFileResource corresponding to this TsFile
* @param filter filter condition. Null if no filter.
* @return True if the TsFile's start and end time do not satisfy the filter condition; False if
* satisfy.
*/
private boolean singleTsFileSatisfied(TsFileResource tsfile, Filter filter) {
private boolean isTsFileNotSatisfied(TsFileResource tsFile, Filter filter) {
if (filter == null) {
return true;
return false;
}
long startTime = tsfile.getStartTimeMap().get(seriesPath.getDevice());
long endTime = tsfile.getEndTimeMap().get(seriesPath.getDevice());
return filter.satisfyStartEndTime(startTime, endTime);
long startTime = tsFile.getStartTimeMap().get(seriesPath.getDevice());
long endTime = tsFile.getEndTimeMap().get(seriesPath.getDevice());
return !filter.satisfyStartEndTime(startTime, endTime);
}
private FileSeriesReader initSealedTsFileReader(TsFileResource tsfile, Filter filter,
QueryContext context)
throws IOException {
// to avoid too many opened files
private IAggregateReader initSealedTsFileReader(TsFileResource sealedTsFile, Filter filter,
QueryContext context) throws IOException {
// prepare metaDataList
TsFileSequenceReader tsFileReader = FileReaderManager.getInstance()
.get(tsfile.getFile().getPath(), true);
.get(sealedTsFile.getFile().getPath(), true);
MetadataQuerierByFileImpl metadataQuerier = new MetadataQuerierByFileImpl(tsFileReader);
List<ChunkMetaData> metaDataList = metadataQuerier.getChunkMetaDataList(seriesPath);
List<Modification> pathModifications = context.getPathModifications(tsfile.getModFile(),
List<Modification> pathModifications = context.getPathModifications(sealedTsFile.getModFile(),
seriesPath.getFullPath());
if (!pathModifications.isEmpty()) {
QueryUtils.modifyChunkMetaData(metaDataList, pathModifications);
}
ChunkLoader chunkLoader = new ChunkLoaderImpl(tsFileReader);
if (enableReverse) {
Collections.reverse(metaDataList);
}
// prepare chunkLoader
ChunkLoader chunkLoader = new ChunkLoaderImpl(tsFileReader);
FileSeriesReader seriesReader;
// init fileSeriesReader
FileSeriesReader fileSeriesReader;
if (filter == null) {
seriesReader = new FileSeriesReaderWithoutFilter(chunkLoader, metaDataList);
fileSeriesReader = new FileSeriesReaderWithoutFilter(chunkLoader, metaDataList);
} else {
seriesReader = new FileSeriesReaderWithFilter(chunkLoader, metaDataList, filter);
fileSeriesReader = new FileSeriesReaderWithFilter(chunkLoader, metaDataList, filter);
}
return seriesReader;
return new FileSeriesReaderAdapter(fileSeriesReader);
}
}
}
\ No newline at end of file
......@@ -16,15 +16,17 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.query.reader.resourceRelated;
package org.apache.iotdb.db.query.reader.sequence;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import java.io.IOException;
import java.util.List;
import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.db.query.reader.IReaderByTimeStamp;
import org.apache.iotdb.db.query.reader.sequence.adapter.FileSeriesReaderByTimestampAdapter;
import org.apache.iotdb.db.query.reader.IReaderByTimestamp;
import org.apache.iotdb.db.query.reader.fileRelated.FileSeriesReaderByTimestampAdapter;
import org.apache.iotdb.db.query.reader.fileRelated.UnSealedTsFileReaderByTimestamp;
import org.apache.iotdb.db.utils.QueryUtils;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
......@@ -34,52 +36,67 @@ import org.apache.iotdb.tsfile.read.controller.ChunkLoaderImpl;
import org.apache.iotdb.tsfile.read.controller.MetadataQuerierByFileImpl;
import org.apache.iotdb.tsfile.read.reader.series.FileSeriesReaderByTimestamp;
import java.io.IOException;
import java.util.List;
/**
* IReaderByTimeStamp of data in: 1) sealed tsfile. 2) unsealed tsfile, which include data on disk of
* unsealed file and in memtables that will be flushing to unsealed tsfile.
* To read a chronologically ordered list of sequence TsFiles by timestamp, this class implements
* <code>IReaderByTimestamp</code> for the TsFiles.
* <p>
* Notes: 1) The list of sequence TsFiles is in strict chronological order. 2) The data in a
* sequence TsFile is also organized in chronological order. 3) A sequence TsFile can be either
* sealed or unsealed. 4) An unsealed sequence TsFile consists of two parts of data in chronological
* order: data that has been flushed to disk and data in the flushing memtable list.
* <p>
* This class is used in {@link org.apache.iotdb.db.query.reader.seriesRelated.SeriesReaderByTimestamp}.
*/
public class SequenceSeriesReaderByTimestamp implements IReaderByTimeStamp {
public class SeqResourceReaderByTimestamp implements IReaderByTimestamp {
protected Path seriesPath;
private List<TsFileResource> tsFileResourceList;
private int nextIntervalFileIndex;
protected IReaderByTimeStamp seriesReader;
private List<TsFileResource> seqResources;
private QueryContext context;
private int nextIntervalFileIndex;
private IReaderByTimestamp seriesReader;
/**
* init with seriesPath and tsfile list which include sealed tsfile and unseadled tsfile.
* Constructor function.
* <p>
*
* @param seriesPath the path of the series data
* @param seqResources a list of sequence TsFile resources in chronological order
* @param context query context
*/
public SequenceSeriesReaderByTimestamp(Path seriesPath,
List<TsFileResource> tsFileResourceList,
QueryContext context) {
public SeqResourceReaderByTimestamp(Path seriesPath, List<TsFileResource> seqResources,
QueryContext context) {
this.seriesPath = seriesPath;
this.tsFileResourceList = tsFileResourceList;
this.seqResources = seqResources;
this.context = context;
this.nextIntervalFileIndex = 0;
this.seriesReader = null;
this.context = context;
}
@Override
public Object getValueInTimestamp(long timestamp) throws IOException {
Object value = null;
if (seriesReader != null) {
value = seriesReader.getValueInTimestamp(timestamp);
// if get value or no value in this timestamp, return.
// if get value or no value in this timestamp but has next, return.
if (value != null || seriesReader.hasNext()) {
return value;
}
}
constructReader(timestamp);
if (seriesReader != null) {
value = seriesReader.getValueInTimestamp(timestamp);
if (value != null || seriesReader.hasNext()) {
return value;
// Because the sequence TsFile resources are chronologically globally ordered, there exists at
// most one TsFile resource that overlaps this timestamp.
while (nextIntervalFileIndex < seqResources.size()) {
boolean isConstructed = constructNextReader(nextIntervalFileIndex++, timestamp);
if (isConstructed) {
value = seriesReader.getValueInTimestamp(timestamp);
// if get value or no value in this timestamp but has next, return.
if (value != null || seriesReader.hasNext()) {
return value;
}
}
}
return value;
}
......@@ -88,9 +105,14 @@ public class SequenceSeriesReaderByTimestamp implements IReaderByTimeStamp {
if (seriesReader != null && seriesReader.hasNext()) {
return true;
}
while (nextIntervalFileIndex < tsFileResourceList.size()) {
initSealedTsFileReader(tsFileResourceList.get(nextIntervalFileIndex), context);
nextIntervalFileIndex++;
while (nextIntervalFileIndex < seqResources.size()) {
TsFileResource tsFileResource = seqResources.get(nextIntervalFileIndex++);
if (tsFileResource.isClosed()) {
seriesReader = initSealedTsFileReaderByTimestamp(tsFileResource, context);
} else {
seriesReader = new UnSealedTsFileReaderByTimestamp(tsFileResource);
}
if (seriesReader.hasNext()) {
return true;
}
......@@ -99,58 +121,69 @@ public class SequenceSeriesReaderByTimestamp implements IReaderByTimeStamp {
}
/**
* construct reader with the file that might overlap this timestamp.
* If the idx-th TsFile in the <code>seqResources</code> might overlap this
* <code>timestamp</code>, then construct <code>IReaderByTimestamp</code> for it, assign to the
* <code>currentSeriesReader</code> and return true. Otherwise, return false.
* <p>
* Note that the list of sequence TsFiles is chronologically ordered, so there will be at most one
* TsFile that overlaps this timestamp.
*
* @param idx the index of the TsFile in the resource list
* @param timestamp check whether or not to construct the reader according to this timestamp
* @return True if the reader is constructed; False if not.
*/
private void constructReader(long timestamp) throws IOException {
while (nextIntervalFileIndex < tsFileResourceList.size()) {
TsFileResource tsFile = tsFileResourceList.get(nextIntervalFileIndex);
nextIntervalFileIndex++;
// init unsealed tsfile.
if (!tsFile.isClosed()) {
initUnSealedTsFileReader(tsFile);
return;
private boolean constructNextReader(int idx, long timestamp) throws IOException {
TsFileResource tsFileResource = seqResources.get(idx);
if (tsFileResource.isClosed()) {
if (isTsFileNotSatisfied(tsFileResource, timestamp)) {
return false;
}
// init sealed tsfile.
if (singleTsFileSatisfied(tsFile, timestamp)) {
initSealedTsFileReader(tsFile, context);
return;
seriesReader = initSealedTsFileReaderByTimestamp(tsFileResource, context);
return true;
} else {
// an unsealed sequence TsFile's endTimeMap size may be equal to 0 or greater than 0
// If endTimeMap size is 0, conservatively assume that this TsFile might overlap this timestamp.
// If endTimeMap size is not 0, call isTsFileNotSatisfied to check.
if (tsFileResource.getEndTimeMap().size() != 0) {
if (isTsFileNotSatisfied(tsFileResource, timestamp)) {
return false;
}
}
seriesReader = new UnSealedTsFileReaderByTimestamp(tsFileResource);
return true;
}
}
/**
* Judge whether the file should be skipped.
* Returns true if the end time of the series data in this sequence TsFile is smaller than this
* timestamp.
* <p>
* Note that <code>seqResources</code> is a list of chronologically ordered sequence TsFiles, so
* there will be at most one TsFile that overlaps this timestamp.
* <p>
* This method is used to in <code>constructNextReader</code> to check whether this TsFile can be
* skipped.
*/
private boolean singleTsFileSatisfied(TsFileResource fileNode, long timestamp) {
if (fileNode.isClosed()) {
return fileNode.getEndTimeMap().get(seriesPath.getDevice()) >= timestamp;
}
return true;
}
private void initUnSealedTsFileReader(TsFileResource tsFile)
throws IOException {
seriesReader = new UnSealedTsFileReaderByTimestamp(tsFile);
private boolean isTsFileNotSatisfied(TsFileResource tsFile, long timestamp) {
return tsFile.getEndTimeMap().get(seriesPath.getDevice()) < timestamp;
}
private void initSealedTsFileReader(TsFileResource fileNode, QueryContext context)
throws IOException {
// to avoid too many opened files
private IReaderByTimestamp initSealedTsFileReaderByTimestamp(TsFileResource sealedTsFile,
QueryContext context) throws IOException {
// prepare metaDataList
TsFileSequenceReader tsFileReader = FileReaderManager.getInstance()
.get(fileNode.getFile().getPath(), true);
.get(sealedTsFile.getFile().getPath(), true);
MetadataQuerierByFileImpl metadataQuerier = new MetadataQuerierByFileImpl(tsFileReader);
List<ChunkMetaData> metaDataList = metadataQuerier.getChunkMetaDataList(seriesPath);
List<Modification> pathModifications = context.getPathModifications(fileNode.getModFile(),
seriesPath.getFullPath());
List<Modification> pathModifications = context.getPathModifications(sealedTsFile.getModFile(),
seriesPath.getFullPath());
if (!pathModifications.isEmpty()) {
QueryUtils.modifyChunkMetaData(metaDataList, pathModifications);
}
// prepare chunkLoader
ChunkLoader chunkLoader = new ChunkLoaderImpl(tsFileReader);
seriesReader = new FileSeriesReaderByTimestampAdapter(
new FileSeriesReaderByTimestamp(chunkLoader, metaDataList));
return new FileSeriesReaderByTimestampAdapter(
new FileSeriesReaderByTimestamp(chunkLoader, metaDataList));
}
}
}
\ No newline at end of file
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.query.reader.resourceRelated;
import java.io.IOException;
import java.util.List;
import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.db.query.reader.chunkRelated.DiskChunkReader;
import org.apache.iotdb.db.query.reader.chunkRelated.MemChunkReader;
import org.apache.iotdb.db.query.reader.universal.PriorityMergeReader;
import org.apache.iotdb.db.utils.QueryUtils;
import org.apache.iotdb.tsfile.common.constant.StatisticConstant;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.controller.ChunkLoaderImpl;
import org.apache.iotdb.tsfile.read.controller.MetadataQuerierByFileImpl;
import org.apache.iotdb.tsfile.read.filter.DigestForFilter;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderWithFilter;
import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderWithoutFilter;
/**
* To read a list of unsequence TsFiles, this class extends {@link PriorityMergeReader} to
* implement
* <code>IPointReader</code> for the TsFiles.
* <p>
* Note that an unsequence TsFile can be either closed or unclosed. An unclosed unsequence TsFile
* consists of data on disk and data in memtables that will be flushed to this unclosed TsFile.
* <p>
* This class is used in {@link org.apache.iotdb.db.query.reader.seriesRelated.SeriesReaderWithoutValueFilter}.
*/
public class UnseqResourceMergeReader extends PriorityMergeReader {
private Path seriesPath;
public UnseqResourceMergeReader(Path seriesPath, List<TsFileResource> unseqResources,
QueryContext context, Filter filter) throws IOException {
this.seriesPath = seriesPath;
int priorityValue = 1;
for (TsFileResource tsFileResource : unseqResources) {
TsFileSequenceReader tsFileReader = FileReaderManager.getInstance()
.get(tsFileResource.getFile().getPath(), tsFileResource.isClosed());
// prepare metaDataList
List<ChunkMetaData> metaDataList;
if (tsFileResource.isClosed()) {
if (isTsFileNotSatisfied(tsFileResource, filter)) {
continue;
}
MetadataQuerierByFileImpl metadataQuerier = new MetadataQuerierByFileImpl(tsFileReader);
metaDataList = metadataQuerier.getChunkMetaDataList(seriesPath);
List<Modification> pathModifications = context
.getPathModifications(tsFileResource.getModFile(), seriesPath.getFullPath());
if (!pathModifications.isEmpty()) {
QueryUtils.modifyChunkMetaData(metaDataList, pathModifications);
}
} else {
if (tsFileResource.getEndTimeMap().size() != 0) {
if (isTsFileNotSatisfied(tsFileResource, filter)) {
continue;
}
}
metaDataList = tsFileResource.getChunkMetaDatas();
}
// create and add ChunkReader with priority
ChunkLoaderImpl chunkLoader = new ChunkLoaderImpl(tsFileReader);
for (ChunkMetaData chunkMetaData : metaDataList) {
DigestForFilter digest = new DigestForFilter(chunkMetaData.getStartTime(),
chunkMetaData.getEndTime(),
chunkMetaData.getDigest().getStatistics().get(StatisticConstant.MIN_VALUE),
chunkMetaData.getDigest().getStatistics().get(StatisticConstant.MAX_VALUE),
chunkMetaData.getTsDataType());
if (filter != null && !filter.satisfy(digest)) {
continue;
}
Chunk chunk = chunkLoader.getChunk(chunkMetaData);
ChunkReader chunkReader = filter != null ? new ChunkReaderWithFilter(chunk, filter)
: new ChunkReaderWithoutFilter(chunk);
addReaderWithPriority(new DiskChunkReader(chunkReader), priorityValue);
priorityValue++;
}
if (!tsFileResource.isClosed()) {
// create and add MemChunkReader with priority
addReaderWithPriority(
new MemChunkReader(tsFileResource.getReadOnlyMemChunk(), filter), priorityValue++);
}
}
}
/**
* Returns true if the start and end time of the series data in this unsequence TsFile do not
* satisfy the filter condition. Returns false if satisfy.
* <p>
* This method is used to in the constructor function to check whether this TsFile can be
* skipped.
*
* @param tsFile the TsFileResource corresponding to this TsFile
* @param filter filter condition. Null if no filter.
* @return True if the TsFile's start and end time do not satisfy the filter condition; False if
* satisfy.
*/
// TODO future work: deduplicate code. See SeqResourceIterateReader.
private boolean isTsFileNotSatisfied(TsFileResource tsFile, Filter filter) {
if (filter == null) {
return false;
}
long startTime = tsFile.getStartTimeMap().get(seriesPath.getDevice());
long endTime = tsFile.getEndTimeMap().get(seriesPath.getDevice());
return !filter.satisfyStartEndTime(startTime, endTime);
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.query.reader.resourceRelated;
import java.io.IOException;
import java.util.List;
import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.db.query.reader.chunkRelated.DiskChunkReaderByTimestamp;
import org.apache.iotdb.db.query.reader.chunkRelated.MemChunkReaderByTimestamp;
import org.apache.iotdb.db.query.reader.universal.PriorityMergeReaderByTimestamp;
import org.apache.iotdb.db.utils.QueryUtils;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.controller.ChunkLoaderImpl;
import org.apache.iotdb.tsfile.read.controller.MetadataQuerierByFileImpl;
import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderByTimestamp;
/**
* To read a list of unsequence TsFiles by timestamp, this class extends {@link
* PriorityMergeReaderByTimestamp} to implement <code>IReaderByTimestamp</code> for the TsFiles.
* <p>
* Note that an unsequence TsFile can be either closed or unclosed. An unclosed unsequence TsFile
* consists of data on disk and data in memtables that will be flushed to this unclosed TsFile.
* <p>
* This class is used in {@link org.apache.iotdb.db.query.reader.seriesRelated.SeriesReaderByTimestamp}.
*/
public class UnseqResourceReaderByTimestamp extends PriorityMergeReaderByTimestamp {
public UnseqResourceReaderByTimestamp(Path seriesPath,
List<TsFileResource> unseqResources, QueryContext context) throws IOException {
int priorityValue = 1;
for (TsFileResource tsFileResource : unseqResources) {
TsFileSequenceReader tsFileReader = FileReaderManager.getInstance()
.get(tsFileResource.getFile().getPath(), tsFileResource.isClosed());
// prepare metaDataList
List<ChunkMetaData> metaDataList;
if (tsFileResource.isClosed()) {
MetadataQuerierByFileImpl metadataQuerier = new MetadataQuerierByFileImpl(tsFileReader);
metaDataList = metadataQuerier.getChunkMetaDataList(seriesPath);
List<Modification> pathModifications = context
.getPathModifications(tsFileResource.getModFile(), seriesPath.getFullPath());
if (!pathModifications.isEmpty()) {
QueryUtils.modifyChunkMetaData(metaDataList, pathModifications);
}
} else {
metaDataList = tsFileResource.getChunkMetaDatas();
}
// create and add ChunkReaderByTimestamp with priority
ChunkLoaderImpl chunkLoader = new ChunkLoaderImpl(tsFileReader);
for (ChunkMetaData chunkMetaData : metaDataList) {
Chunk chunk = chunkLoader.getChunk(chunkMetaData);
ChunkReaderByTimestamp chunkReader = new ChunkReaderByTimestamp(chunk);
addReaderWithPriority(new DiskChunkReaderByTimestamp(chunkReader),
priorityValue);
priorityValue++;
}
if (!tsFileResource.isClosed()) {
// create and add MemChunkReader with priority
addReaderWithPriority(
new MemChunkReaderByTimestamp(tsFileResource.getReadOnlyMemChunk()), priorityValue++);
}
}
// TODO add external sort when needed
// TODO future work: create reader when getValueInTimestamp so that resources
// whose start and end time do not satisfy can be skipped.
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.query.reader.seriesRelated;
import java.io.IOException;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.db.query.reader.resourceRelated.SeqResourceReaderByTimestamp;
import org.apache.iotdb.db.query.reader.resourceRelated.UnseqResourceReaderByTimestamp;
import org.apache.iotdb.db.query.reader.universal.PriorityMergeReaderByTimestamp;
import org.apache.iotdb.tsfile.read.common.Path;
/**
* To read series data by timestamp, this class extends {@link PriorityMergeReaderByTimestamp} to
* implement <code>IReaderByTimestamp</code> for the data.
* <p>
* Note that series data consists of sequence and unsequence TsFile resources.
* <p>
* This class is used in conjunction with {@link org.apache.iotdb.db.query.timegenerator.EngineTimeGenerator}.
*/
public class SeriesReaderByTimestamp extends PriorityMergeReaderByTimestamp {
public SeriesReaderByTimestamp(Path seriesPath, QueryContext context)
throws StorageEngineException, IOException {
QueryDataSource queryDataSource = QueryResourceManager.getInstance()
.getQueryDataSource(seriesPath, context);
// reader for sequence resources
SeqResourceReaderByTimestamp seqResourceReaderByTimestamp = new SeqResourceReaderByTimestamp(
seriesPath, queryDataSource.getSeqResources(), context);
// reader for unsequence resources
UnseqResourceReaderByTimestamp unseqResourceReaderByTimestamp = new UnseqResourceReaderByTimestamp(
seriesPath, queryDataSource.getUnseqResources(), context);
addReaderWithPriority(seqResourceReaderByTimestamp, 1);
addReaderWithPriority(unseqResourceReaderByTimestamp, 2);
}
}
......@@ -16,16 +16,28 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.query.reader.seriesRelated;
package org.apache.iotdb.db.query.reader;
import java.io.IOException;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.reader.IBatchReader;
import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.utils.TimeValuePair;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import java.io.IOException;
/**
* A value filter reader for read data source, including sequence data and unsequence data.
* To read series data with value filter, this class extends {@link SeriesReaderWithoutValueFilter}
* to implement <code>IPointReader</code> for the data.
* <p>
* Note that the difference between <code>SeriesReaderWithValueFilter</code> and
* <code>SeriesReaderWithoutValueFilter</code> is that the former executes in the form of
* filter(merge(filter(seqResource),unseqResource)) while the latter executes in the form of
* merge(filter(seqResource),filter(unseqResource)) (when <code>pushdownUnseq</code> is True). More
* see JIRA IOTDB-121. This difference is necessary to guarantee the correctness of the result.
* <p>
* This class is used by {@link org.apache.iotdb.db.query.timegenerator.EngineTimeGenerator}.
*/
public class SeriesReaderWithValueFilter extends SeriesReaderWithoutValueFilter {
......@@ -33,12 +45,15 @@ public class SeriesReaderWithValueFilter extends SeriesReaderWithoutValueFilter
private boolean hasCachedValue;
private TimeValuePair timeValuePair;
/**
* merge sequence reader, unsequence reader.
*/
public SeriesReaderWithValueFilter(IBatchReader seqSeriesReader, IPointReader unseqSeriesReader,
Filter filter) {
super(seqSeriesReader, unseqSeriesReader);
public SeriesReaderWithValueFilter(Path seriesPath, Filter filter, QueryContext context)
throws StorageEngineException, IOException {
super(seriesPath, filter, context, false);
this.filter = filter;
}
public SeriesReaderWithValueFilter(IBatchReader seqResourceIterateReader,
IPointReader unseqResourceMergeReader, Filter filter) {
super(seqResourceIterateReader, unseqResourceMergeReader);
this.filter = filter;
}
......@@ -67,9 +82,8 @@ public class SeriesReaderWithValueFilter extends SeriesReaderWithoutValueFilter
}
}
@Override
public TimeValuePair current() throws IOException {
public TimeValuePair current() {
return timeValuePair;
}
}
}
\ No newline at end of file
......@@ -16,34 +16,80 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.query.reader.seriesRelated;
package org.apache.iotdb.db.query.reader;
import java.io.IOException;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.db.query.reader.IBatchReader;
import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.query.reader.resourceRelated.SeqResourceIterateReader;
import org.apache.iotdb.db.query.reader.resourceRelated.UnseqResourceMergeReader;
import org.apache.iotdb.db.utils.TimeValuePair;
import org.apache.iotdb.db.utils.TimeValuePairUtils;
import org.apache.iotdb.tsfile.read.common.BatchData;
import java.io.IOException;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
/**
* It used to merge sequence data and unsequence data with <br>only time filter or no
* filter.</br>
* To read series data without value filter, this class implements {@link IPointReader} for the
* data.
* <p>
* Note that filters include value filter and time filter. "without value filter" is equivalent to
* "with global time filter or simply without any filter".
*/
public class SeriesReaderWithoutValueFilter implements IPointReader {
private IBatchReader seqSeriesReader;
private IPointReader unseqSeriesReader;
private boolean hasCachedBatchData;
private BatchData batchData;
private IBatchReader seqResourceIterateReader;
private IPointReader unseqResourceMergeReader;
public SeriesReaderWithoutValueFilter(IBatchReader seqResourceIterateReader,
IPointReader unseqResourceMergeReader) {
this.seqResourceIterateReader = seqResourceIterateReader;
this.unseqResourceMergeReader = unseqResourceMergeReader;
this.hasCachedBatchData = false;
}
public SeriesReaderWithoutValueFilter(Path seriesPath, Filter timeFilter, QueryContext context)
throws StorageEngineException, IOException {
this(seriesPath, timeFilter, context, true);
}
/**
* merge sequence reader, unsequence reader.
* Constructor function.
*
* @param seriesPath the path of the series data
* @param filter filter condition
* @param context query context
* @param pushdownUnseq True to push down the filter on the unsequence TsFile resource; False not
* to.
*/
public SeriesReaderWithoutValueFilter(IBatchReader seqSeriesReader, IPointReader unseqSeriesReader) {
this.seqSeriesReader = seqSeriesReader;
this.unseqSeriesReader = unseqSeriesReader;
protected SeriesReaderWithoutValueFilter(Path seriesPath, Filter filter, QueryContext context,
boolean pushdownUnseq) throws StorageEngineException, IOException {
QueryDataSource queryDataSource = QueryResourceManager.getInstance()
.getQueryDataSource(seriesPath, context);
// reader for sequence resources
IBatchReader seqResourceIterateReader = new SeqResourceIterateReader(
queryDataSource.getSeriesPath(), queryDataSource.getSeqResources(), filter, context);
// reader for unsequence resources
IPointReader unseqResourceMergeReader;
if (pushdownUnseq) {
unseqResourceMergeReader = new UnseqResourceMergeReader(seriesPath,
queryDataSource.getUnseqResources(), context, filter);
} else {
unseqResourceMergeReader = new UnseqResourceMergeReader(seriesPath,
queryDataSource.getUnseqResources(), context, null);
}
this.seqResourceIterateReader = seqResourceIterateReader;
this.unseqResourceMergeReader = unseqResourceMergeReader;
this.hasCachedBatchData = false;
}
......@@ -52,56 +98,47 @@ public class SeriesReaderWithoutValueFilter implements IPointReader {
if (hasNextInBatchDataOrBatchReader()) {
return true;
}
// has value in pointReader
return unseqSeriesReader != null && unseqSeriesReader.hasNext();
return unseqResourceMergeReader != null && unseqResourceMergeReader.hasNext();
}
@Override
public TimeValuePair next() throws IOException {
boolean hasNextBatch = hasNextInBatchDataOrBatchReader();
boolean hasNextPoint = unseqResourceMergeReader != null && unseqResourceMergeReader.hasNext();
// has next in both batch reader and point reader
if (hasNextInBothReader()) {
long timeInPointReader = unseqSeriesReader.current().getTimestamp();
if (hasNextBatch && hasNextPoint) {
long timeInPointReader = unseqResourceMergeReader.current().getTimestamp();
long timeInBatchData = batchData.currentTime();
if (timeInPointReader > timeInBatchData) {
TimeValuePair timeValuePair = TimeValuePairUtils.getCurrentTimeValuePair(batchData);
batchData.next();
return timeValuePair;
} else if (timeInPointReader == timeInBatchData) {
// Note that batchData here still moves next even though the current data to be read is
// overwritten by unsequence data source. Only in this way can hasNext() work correctly.
batchData.next();
return unseqSeriesReader.next();
return unseqResourceMergeReader.next();
} else {
return unseqSeriesReader.next();
return unseqResourceMergeReader.next();
}
}
// only has next in batch reader
if (hasNextInBatchDataOrBatchReader()) {
if (hasNextBatch) {
TimeValuePair timeValuePair = TimeValuePairUtils.getCurrentTimeValuePair(batchData);
batchData.next();
return timeValuePair;
}
// only has next in point reader
if (unseqSeriesReader != null && unseqSeriesReader.hasNext()) {
return unseqSeriesReader.next();
if (hasNextPoint) {
return unseqResourceMergeReader.next();
}
return null;
}
/**
* judge if has next in both batch record and pointReader.
*/
private boolean hasNextInBothReader() throws IOException {
if (!hasNextInBatchDataOrBatchReader()) {
return false;
}
return unseqSeriesReader != null && unseqSeriesReader.hasNext();
return null;
}
/**
* judge if has next in batch record, either in batch data or in batch reader.
*/
private boolean hasNextInBatchDataOrBatchReader() throws IOException {
// has value in batchData
if (hasCachedBatchData && batchData.hasNext()) {
......@@ -111,8 +148,8 @@ public class SeriesReaderWithoutValueFilter implements IPointReader {
}
// has value in batchReader
while (seqSeriesReader != null && seqSeriesReader.hasNext()) {
batchData = seqSeriesReader.nextBatch();
while (seqResourceIterateReader != null && seqResourceIterateReader.hasNext()) {
batchData = seqResourceIterateReader.nextBatch();
if (batchData.hasNext()) {
hasCachedBatchData = true;
return true;
......@@ -128,7 +165,7 @@ public class SeriesReaderWithoutValueFilter implements IPointReader {
@Override
public void close() throws IOException {
seqSeriesReader.close();
unseqSeriesReader.close();
seqResourceIterateReader.close();
unseqResourceMergeReader.close();
}
}
......@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
......@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.query.reader.sequence;
package org.apache.iotdb.db.query.reader.universal;
import java.io.IOException;
import org.apache.iotdb.db.query.reader.IAggregateReader;
......@@ -24,7 +24,7 @@ import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.read.common.BatchData;
/**
* get data sequentially from the reader list.
* This class implements {@link IAggregateReader} for sequential data sources.
*/
public abstract class IterateReader implements IAggregateReader {
......@@ -50,9 +50,8 @@ public abstract class IterateReader implements IAggregateReader {
}
while (nextSeriesReaderIndex < readerSize) {
//seqResourceSeriesReaderList.get(nextSeriesReaderIndex++)
boolean statisfyed = constructNextReader(nextSeriesReaderIndex++);
if (statisfyed && currentSeriesReader.hasNext()) {
boolean isConstructed = constructNextReader(nextSeriesReaderIndex++);
if (isConstructed && currentSeriesReader.hasNext()) {
curReaderInitialized = true;
return true;
}
......@@ -60,7 +59,14 @@ public abstract class IterateReader implements IAggregateReader {
return false;
}
public abstract boolean constructNextReader(int idx) throws IOException;
/**
* If the idx-th data source in order needs reading, construct <code>IAggregateReader</code> for
* it, assign to <code>currentSeriesReader</code> and return true. Otherwise, return false.
*
* @param idx the index of the data source
* @return True if the reader is constructed; False if not.
*/
protected abstract boolean constructNextReader(int idx) throws IOException;
@Override
public BatchData nextBatch() throws IOException {
......
......@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.query.reader.unsequence;
package org.apache.iotdb.db.query.reader.universal;
import java.io.IOException;
import java.util.ArrayList;
......@@ -25,25 +25,20 @@ import java.util.PriorityQueue;
import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.utils.TimeValuePair;
/**
* This class is used to read unsequence data in: 1) sealed tsfile resources,
* 2) unsealed tsfile resources, which include data on disk and in memtables
* that will be flushing to unsealed tsfile resources.
/**
* This class implements {@link IPointReader} for data sources with different priorities.
*/
public class UnsequenceSeriesReader implements IPointReader {
public class PriorityMergeReader implements IPointReader {
private List<IPointReader> unseqResourceSeriesReaderList = new ArrayList<>();
private List<IPointReader> readerList = new ArrayList<>();
private List<Integer> priorityList = new ArrayList<>();
private PriorityQueue<Element> heap = new PriorityQueue<>();
/**
* The bigger the priority value is, the higher the priority of this reader is.
*/
public void addReaderWithPriority(IPointReader reader, int priority) throws IOException {
if (reader.hasNext()) {
heap.add(new Element(unseqResourceSeriesReaderList.size(), reader.next(), priority));
heap.add(new Element(readerList.size(), reader.next(), priority));
}
unseqResourceSeriesReaderList.add(reader);
readerList.add(reader);
priorityList.add(priority);
}
......@@ -68,7 +63,7 @@ public class UnsequenceSeriesReader implements IPointReader {
while (!heap.isEmpty() && heap.peek().timeValuePair.getTimestamp() == top.timeValuePair
.getTimestamp()) {
Element e = heap.poll();
IPointReader reader = unseqResourceSeriesReaderList.get(e.index);
IPointReader reader = readerList.get(e.index);
if (reader.hasNext()) {
heap.add(new Element(e.index, reader.next(), priorityList.get(e.index)));
}
......@@ -77,7 +72,7 @@ public class UnsequenceSeriesReader implements IPointReader {
@Override
public void close() throws IOException {
for (IPointReader reader : unseqResourceSeriesReaderList) {
for (IPointReader reader : readerList) {
reader.close();
}
}
......@@ -95,7 +90,8 @@ public class UnsequenceSeriesReader implements IPointReader {
}
@Override
public int compareTo(Element o) {
public int compareTo(
Element o) {
if (this.timeValuePair.getTimestamp() > o.timeValuePair.getTimestamp()) {
return 1;
......
......@@ -16,33 +16,22 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.query.reader.merge;
import org.apache.iotdb.db.query.reader.IReaderByTimeStamp;
package org.apache.iotdb.db.query.reader.universal;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.iotdb.db.query.reader.IReaderByTimestamp;
/**
* <p>
* Usage: Get value in timestamp by sorting time-value pair in multiple readers with time and
* priority. (1) merge multiple chunk group readers in the unsequence file, e.g.,
* SeriesReaderFactoryImpl.createUnseqSeriesReaderByTimestamp; (2)merge sequence reader and
* unsequence reader, e.g., SeriesReaderFactoryImpl.createSeriesReaderByTimestamp.
* </p>
* This class implements {@link IReaderByTimestamp} for data sources with different priorities.
*/
public class SeriesReaderByTimestamp implements IReaderByTimeStamp {
public class PriorityMergeReaderByTimestamp implements IReaderByTimestamp {
private List<IReaderByTimeStamp> readerList = new ArrayList<>();
private List<IReaderByTimestamp> readerList = new ArrayList<>();
private List<Integer> priorityList = new ArrayList<>();
/**
* NOTE: This function doesn't sort reader by priority. So you have to call this function in order of
* reader priority from small to large.
*/
public void addReaderWithPriority(IReaderByTimeStamp reader, int priority) {
public void addReaderWithPriority(IReaderByTimestamp reader, int priority) {
readerList.add(reader);
priorityList.add(priority);
}
......@@ -53,20 +42,34 @@ public class SeriesReaderByTimestamp implements IReaderByTimeStamp {
for (int i = readerList.size() - 1; i >= 0; i--) {
value = readerList.get(i).getValueInTimestamp(timestamp);
if (value != null) {
// Note that the remaining readers do not perform getValueInTimestamp. As a result,
// the traditional implementation of hasNext will lead to unregulated results.
return value;
}
}
return value;
}
/**
* This is an empty method.
* <p>
* Two reasons why this is left as an empty method:
* <p>
* 1) Because of the <code>getValueInTimestamp</code> in this class, the traditional
* implementation of hasNext will lead to unregulated results here.
* <p>
* 2) <code>hasNext</code> of <code>PriorityMergeReaderByTimestamp</code> is not used.
* <p>
* In contrast, <code>hasNext</code> of <code>DiskChunkReaderByTimestamp</code> is used in
* <code>getValueInTimestamp</code> of <code>UnSealedTsFileReaderByTimestamp</code> because disk
* chunks are chronologically ordered in an unsealed TsFile.
* <p>
* <code>hasNext</code> of <code>UnSealedTsFileReaderByTimestamp</code> and
* <code>FileSeriesReaderByTimestampAdapter</code> are used in <code>getValueInTimestamp</code>
* of <code>SeqResourceReaderByTimestamp</code>, because they can be chronologically ordered too.
*/
@Override
public boolean hasNext() throws IOException {
for (int i = readerList.size() - 1; i >= 0; i--) {
if (readerList.get(i).hasNext()) {
return true;
}
}
return false;
throw new IOException("hasNext in PriorityMergeReaderByTimestamp is an empty method.");
}
}
......@@ -24,7 +24,7 @@ import static org.apache.iotdb.tsfile.read.expression.ExpressionType.SERIES;
import java.io.IOException;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.factory.SeriesReaderFactoryImpl;
import org.apache.iotdb.db.query.reader.seriesRelated.SeriesReaderWithValueFilter;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.expression.IExpression;
import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
......@@ -52,9 +52,7 @@ public class EngineNodeConstructor extends AbstractNodeConstructor {
try {
Filter filter = ((SingleSeriesExpression) expression).getFilter();
Path path = ((SingleSeriesExpression) expression).getSeriesPath();
return new EngineLeafNode(
SeriesReaderFactoryImpl.getInstance()
.createSeriesReaderWithValueFilter(path, filter, context));
return new EngineLeafNode(new SeriesReaderWithValueFilter(path, filter, context));
} catch (IOException e) {
throw new StorageEngineException(e);
}
......
......@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.query.reader;
import java.io.IOException;
......
......@@ -16,7 +16,8 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.query.reader.sequence;
package org.apache.iotdb.db.query.reader.fileRelated;
import java.io.IOException;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
......@@ -28,16 +29,16 @@ import org.apache.iotdb.tsfile.read.common.BatchData;
import org.junit.Assert;
import org.junit.Test;
public class UnsealedSeqReaderTest extends ReaderTestHelper {
public class UnSealedTsFileReaderTest extends ReaderTestHelper {
private QueryContext context = EnvironmentUtils.TEST_QUERY_CONTEXT;
@Test
public void testUnSealedReader() throws IOException {
public void testUnSealedTsFileIterateReader() throws IOException {
QueryDataSource queryDataSource = storageGroupProcessor.query(deviceId, measurementId, context);
TsFileResource resource = queryDataSource.getSeqResources().get(0);
Assert.assertEquals(false, resource.isClosed());
UnSealedTsFileReader reader = new UnSealedTsFileReader(resource, null, false);
UnSealedTsFileIterateReader reader = new UnSealedTsFileIterateReader(resource, null, false);
long time = 999;
while (reader.hasNext()) {
BatchData batchData = reader.nextBatch();
......@@ -51,19 +52,27 @@ public class UnsealedSeqReaderTest extends ReaderTestHelper {
}
@Test
public void testUnSealedByTimestampReader() throws IOException {
public void testUnSealedTsFileReaderByTimestamp() throws IOException {
QueryDataSource queryDataSource = storageGroupProcessor.query(deviceId, measurementId, context);
TsFileResource resource = queryDataSource.getSeqResources().get(0);
Assert.assertEquals(false, resource.isClosed());
UnSealedTsFileReaderByTimestamp reader = new UnSealedTsFileReaderByTimestamp(
resource);
for (int time = 1000; time <= 3020; time += 10) {
// unSealedTsFileDiskReaderByTs
for (int time = 1000; time <= 3019; time += 10) {
int value = (int) reader.getValueInTimestamp(time);
Assert.assertEquals(time, value);
}
// unSealedTsFileMemReaderByTs
for (int time = 3020; time <= 3029; time += 5) {
int value = (int) reader.getValueInTimestamp(time);
Assert.assertEquals(time, value);
}
Assert.assertEquals(true, reader.hasNext());
for (int time = 3050; time <= 3080; time += 10) {
Integer value = (Integer) reader.getValueInTimestamp(time);
Assert.assertEquals(null, value);
......@@ -86,7 +95,7 @@ public class UnsealedSeqReaderTest extends ReaderTestHelper {
insertOneRecord(j, j);
}
storageGroupProcessor.getWorkSequenceTsFileProcessor().syncFlush();
for (int j = 3020; j <= 3029; j++) {
for (int j = 3020; j <= 3029; j = j + 1) {
insertOneRecord(j, j);
}
}
......
......@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.query.reader.sequence;
package org.apache.iotdb.db.query.reader.resourceRelated;
import java.io.IOException;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
......@@ -28,17 +28,16 @@ import org.apache.iotdb.tsfile.read.common.Path;
import org.junit.Assert;
import org.junit.Test;
public class SeqDataReaderTest extends ReaderTestHelper {
public class SeqResourceReaderTest extends ReaderTestHelper {
private QueryContext context = EnvironmentUtils.TEST_QUERY_CONTEXT;
@Test
public void testSeqReader() throws IOException {
public void testSeqResourceIterateReader() throws IOException {
QueryDataSource queryDataSource = storageGroupProcessor.query(deviceId, measurementId, context);
Path path = new Path(deviceId, measurementId);
SequenceSeriesReader reader = new SequenceSeriesReader(path,
queryDataSource.getSeqResources(), null,
EnvironmentUtils.TEST_QUERY_CONTEXT);
SeqResourceIterateReader reader = new SeqResourceIterateReader(path,
queryDataSource.getSeqResources(), null, EnvironmentUtils.TEST_QUERY_CONTEXT);
long time = 999;
while (reader.hasNext()) {
BatchData batchData = reader.nextBatch();
......@@ -48,29 +47,32 @@ public class SeqDataReaderTest extends ReaderTestHelper {
batchData.next();
}
}
Assert.assertEquals(3029L, time);
Assert.assertEquals(5049L, time);
}
@Test
public void testSeqByTimestampReader() throws IOException {
public void testSeqResourceReaderByTimestamp() throws IOException {
QueryDataSource queryDataSource = storageGroupProcessor.query(deviceId, measurementId, context);
Path path = new Path(deviceId, measurementId);
SequenceSeriesReaderByTimestamp reader = new SequenceSeriesReaderByTimestamp(path,
SeqResourceReaderByTimestamp reader = new SeqResourceReaderByTimestamp(path,
queryDataSource.getSeqResources(), EnvironmentUtils.TEST_QUERY_CONTEXT);
for (int time = 1000; time <= 3020; time += 10) {
for (int time = 1000; time <= 3019; time += 1) {
int value = (int) reader.getValueInTimestamp(time);
Assert.assertEquals(time, value);
}
// skip reading
for (int time = 5040; time <= 5049; time += 2) {
int value = (int) reader.getValueInTimestamp(time);
Assert.assertEquals(time, value);
}
Assert.assertTrue(reader.hasNext());
Assert.assertEquals(true, reader.hasNext());
for (int time = 3050; time <= 3080; time += 10) {
Integer value = (Integer) reader.getValueInTimestamp(time);
Assert.assertEquals(null, value);
for (int time = 5050; time <= 5059; time += 1) {
Assert.assertNull(reader.getValueInTimestamp(time));
}
Assert.assertEquals(false, reader.hasNext());
}
}
@Override
protected void insertData() throws IOException {
......@@ -91,7 +93,17 @@ public class SeqDataReaderTest extends ReaderTestHelper {
assert storageGroupProcessor.getWorkSequenceTsFileProcessor() == null;
for (int j = 3020; j <= 3029; j++) {
for (int j = 3020; j <= 5029; j++) {
insertOneRecord(j, j);
}
storageGroupProcessor.putAllWorkingTsFileProcessorIntoClosingList();
for (int j = 5030; j <= 5039; j++) { // usually this is unsealed
insertOneRecord(j, j);
}
storageGroupProcessor.putAllWorkingTsFileProcessorIntoClosingList();
for (int j = 5040; j <= 5049; j++) {
insertOneRecord(j, j);
}
}
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.query.reader.resourceRelated;
import java.io.IOException;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.query.reader.IReaderByTimestamp;
import org.apache.iotdb.db.query.reader.ReaderTestHelper;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.db.utils.TimeValuePair;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.filter.TimeFilter;
import org.junit.Assert;
import org.junit.Test;
public class UnseqResourceReaderTest extends ReaderTestHelper {
private QueryContext context = EnvironmentUtils.TEST_QUERY_CONTEXT;
@Override
protected void insertData() throws IOException {
for (int j = 1; j <= 100; j++) {
insertOneRecord(j, j);
}
storageGroupProcessor.getWorkSequenceTsFileProcessor().syncFlush();
for (int j = 10; j >= 1; j--) {
insertOneRecord(j, j);
}
for (int j = 11; j <= 20; j++) {
insertOneRecord(j, j);
}
storageGroupProcessor.putAllWorkingTsFileProcessorIntoClosingList();
for (int j = 21; j <= 30; j += 2) {
insertOneRecord(j, 0); // will be covered when read
}
storageGroupProcessor.waitForAllCurrentTsFileProcessorsClosed();
for (int j = 21; j <= 30; j += 2) {
insertOneRecord(j, j);
}
storageGroupProcessor.waitForAllCurrentTsFileProcessorsClosed();
insertOneRecord(2, 100);
}
@Test
public void testUnseqResourceMergeReaderWithGlobalTimeFilter() throws IOException {
Path path = new Path(deviceId, measurementId);
QueryDataSource queryDataSource = storageGroupProcessor.query(deviceId, measurementId, context);
IPointReader reader = new UnseqResourceMergeReader(path,
queryDataSource.getUnseqResources(), EnvironmentUtils.TEST_QUERY_CONTEXT, TimeFilter.lt(4));
int cnt = 0;
while (reader.hasNext()) {
cnt++;
TimeValuePair timeValuePair = reader.next();
Assert.assertEquals(cnt, timeValuePair.getTimestamp());
if (cnt == 2) {
Assert.assertEquals(100, timeValuePair.getValue().getInt());
} else {
Assert.assertEquals(cnt, timeValuePair.getValue().getInt());
}
}
Assert.assertEquals(3, cnt);
}
@Test
public void testUnseqResourceMergeReaderWithoutFilter() throws IOException {
Path path = new Path(deviceId, measurementId);
QueryDataSource queryDataSource = storageGroupProcessor
.query(deviceId, measurementId, context);
IPointReader reader = new UnseqResourceMergeReader(path,
queryDataSource.getUnseqResources(), EnvironmentUtils.TEST_QUERY_CONTEXT, null);
int cnt = 0;
while (reader.hasNext()) {
cnt++;
TimeValuePair timeValuePair = reader.next();
if (cnt == 2) {
Assert.assertEquals(2, timeValuePair.getTimestamp());
Assert.assertEquals(100, timeValuePair.getValue().getInt());
} else if (cnt < 21) {
Assert.assertEquals(cnt, timeValuePair.getTimestamp());
Assert.assertEquals(cnt, timeValuePair.getValue().getInt());
}
}
Assert.assertEquals(25, cnt);
}
@Test
public void testUnseqResourceReaderByTimestamp() throws IOException, StorageEngineException {
Path path = new Path(deviceId, measurementId);
QueryDataSource queryDataSource = storageGroupProcessor.query(deviceId, measurementId, context);
IReaderByTimestamp reader = new UnseqResourceReaderByTimestamp(path,
queryDataSource.getUnseqResources(), EnvironmentUtils.TEST_QUERY_CONTEXT);
for (long time = 0; time <= 25; time++) {
Integer value = (Integer) reader.getValueInTimestamp(time);
if (time == 0) {
Assert.assertNull(value);
} else if (time == 2) {
Assert.assertEquals(100, (int) value);
} else if (time >= 21 && time % 2 == 0) {
Assert.assertNull(value);
} else {
Assert.assertEquals(time, (int) value);
}
}
Assert.assertNull(reader.getValueInTimestamp(26));
Assert.assertEquals(27, (int) reader.getValueInTimestamp(27));
// Note that read by the same timestamp twice is an error demonstration.
Assert.assertEquals(0, (int) reader.getValueInTimestamp(27));
Assert.assertEquals(29, (int) reader.getValueInTimestamp(29));
// Note that read by the same timestamp twice is an error demonstration.
Assert.assertEquals(0, (int) reader.getValueInTimestamp(29));
}
}
......@@ -17,18 +17,22 @@
* under the License.
*/
package org.apache.iotdb.db.query.reader;
package org.apache.iotdb.db.query.reader.seriesRelated;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import org.apache.iotdb.db.query.reader.IBatchReader;
import org.apache.iotdb.db.utils.TimeValuePair;
import org.apache.iotdb.db.utils.TsPrimitiveType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.junit.Assert;
/**
* This is a test utility class.
*/
public class FakedIBatchPoint implements IBatchReader {
private Iterator<TimeValuePair> iterator;
......@@ -37,22 +41,19 @@ public class FakedIBatchPoint implements IBatchReader {
private boolean hasEmptyBatch;
private Random random;
private TimeValuePair timeValuePair;
public FakedIBatchPoint(long startTime, int size, int interval, int modValue,
boolean hasEmptyBatch) {
long time = startTime;
List<TimeValuePair> list = new ArrayList<>();
for (int i = 0; i < size; i++) {
list.add(
new TimeValuePair(time, TsPrimitiveType.getByType(TSDataType.INT64,
time % modValue)));
new TimeValuePair(time, TsPrimitiveType.getByType(TSDataType.INT64, time % modValue)));
time += interval;
}
iterator = list.iterator();
this.hasCachedBatchData = false;
this.hasEmptyBatch = hasEmptyBatch;
this.random = new Random();
this.hasCachedBatchData = false;
}
public FakedIBatchPoint(long startTime, int size, int interval, int modValue) {
......@@ -84,11 +85,6 @@ public class FakedIBatchPoint implements IBatchReader {
}
@Override
public void close() {
}
private void constructBatchData() {
int num = random.nextInt(10);
if (!hasEmptyBatch) {
......@@ -96,7 +92,7 @@ public class FakedIBatchPoint implements IBatchReader {
}
batchData = new BatchData(TSDataType.INT64, true);
while (num > 0 && iterator.hasNext()) {
timeValuePair = iterator.next();
TimeValuePair timeValuePair = iterator.next();
batchData.putTime(timeValuePair.getTimestamp());
batchData.putLong(timeValuePair.getValue().getLong());
num--;
......@@ -105,4 +101,10 @@ public class FakedIBatchPoint implements IBatchReader {
Assert.assertTrue(batchData.hasNext());
}
}
@Override
public void close() {
}
}
......@@ -17,19 +17,23 @@
* under the License.
*/
package org.apache.iotdb.db.query.reader;
package org.apache.iotdb.db.query.reader.seriesRelated;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.utils.TimeValuePair;
import org.apache.iotdb.db.utils.TsPrimitiveType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
/**
* This is a test utility class.
*/
public class FakedIPointReader implements IPointReader {
private Iterator<TimeValuePair> iterator;
private boolean hasCachedTimeValuePair = false;
private boolean hasCachedTimeValuePair;
private TimeValuePair cachedTimeValuePair;
public FakedIPointReader(long startTime, int size, int interval, int modValue) {
......@@ -41,6 +45,7 @@ public class FakedIPointReader implements IPointReader {
time += interval;
}
iterator = list.iterator();
hasCachedTimeValuePair = false;
}
@Override
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.query.reader.seriesRelated;
import java.io.IOException;
import org.apache.iotdb.db.query.reader.IBatchReader;
import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.utils.TimeValuePair;
import org.apache.iotdb.tsfile.read.filter.ValueFilter;
import org.junit.Assert;
import org.junit.Test;
public class SeriesReaderWithValueFilterTest {
private SeriesReaderWithoutValueFilter reader;
private void init() {
// (100,0),(105,1),(110,0),(115,1),(120,0),...
IBatchReader batchReader = new FakedIBatchPoint(100, 1000, 5, 2);
// (100,0),(105,1),(110,2),(115,3),(120,0),...
IPointReader pointReader = new FakedIPointReader(100, 500, 5, 4);
reader = new SeriesReaderWithValueFilter(batchReader, pointReader, ValueFilter.eq(0L));
}
@Test
public void test() throws IOException {
init();
int cnt = 0;
long startTime = 100; // 100-20
while (reader.hasNext()) {
TimeValuePair timeValuePair = reader.next();
if (cnt < 125) {
Assert.assertEquals(startTime, timeValuePair.getTimestamp());
startTime += 20;
} else {
Assert.assertEquals(startTime, timeValuePair.getTimestamp());
startTime += 10;
}
cnt++;
}
Assert.assertEquals(375, cnt);
}
}
......@@ -17,9 +17,11 @@
* under the License.
*/
package org.apache.iotdb.db.query.reader;
package org.apache.iotdb.db.query.reader.seriesRelated;
import java.io.IOException;
import org.apache.iotdb.db.query.reader.IBatchReader;
import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.utils.TimeValuePair;
import org.junit.Assert;
import org.junit.Test;
......@@ -31,21 +33,21 @@ public class SeriesReaderWithoutValueFilterTest {
private void init() {
IBatchReader batchReader1 = new FakedIBatchPoint(100, 1000, 7, 11);
IBatchReader batchReader2 = new FakedIBatchPoint(100, 1000, 7, 11);
IPointReader pointReader = new FakedIPointReader(20, 500, 11, 19);
reader1 = new SeriesReaderWithoutValueFilter(batchReader1, pointReader);
IBatchReader batchReader2 = new FakedIBatchPoint(100, 1000, 7, 11);
reader2 = new SeriesReaderWithoutValueFilter(batchReader2, null);
}
@Test
public void test() throws IOException {
init();
testWithOutNullReader();
testWithoutNullReader();
testWithNullPointReader();
}
private void testWithOutNullReader() throws IOException {
private void testWithoutNullReader() throws IOException {
int cnt = 0;
while (reader1.hasNext()) {
TimeValuePair timeValuePair = reader1.next();
......@@ -60,7 +62,6 @@ public class SeriesReaderWithoutValueFilterTest {
}
}
Assert.assertEquals(1430, cnt);
System.out.println("testWithOutNullReader-cnt:" + cnt);
}
private void testWithNullPointReader() throws IOException {
......@@ -71,6 +72,5 @@ public class SeriesReaderWithoutValueFilterTest {
cnt++;
}
Assert.assertEquals(1000, cnt);
System.out.println("testWithNullPointReader-cnt:" + cnt);
}
}
\ No newline at end of file
......@@ -17,17 +17,20 @@
* under the License.
*/
package org.apache.iotdb.db.query.reader;
package org.apache.iotdb.db.query.reader.universal;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.iotdb.db.query.reader.IReaderByTimestamp;
import org.apache.iotdb.db.utils.TimeValuePair;
import org.apache.iotdb.db.utils.TsPrimitiveType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
public class FakedSeriesReaderByTimestamp implements IReaderByTimeStamp {
/**
* This is a test utility class.
*/
public class FakedSeriesReaderByTimestamp implements IReaderByTimestamp {
private Iterator<TimeValuePair> iterator;
private boolean hasCachedTimeValuePair = false;
......
......@@ -16,7 +16,8 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.query.reader.merge;
package org.apache.iotdb.db.query.reader.universal;
import java.io.IOException;
import java.util.ArrayList;
......@@ -24,25 +25,22 @@ import java.util.Iterator;
import java.util.List;
import java.util.Random;
import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.query.reader.IReaderByTimeStamp;
import org.apache.iotdb.db.query.reader.IReaderByTimestamp;
import org.apache.iotdb.db.utils.TimeValuePair;
import org.apache.iotdb.db.utils.TsPrimitiveType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.junit.Assert;
import org.junit.Test;
public class SeriesReaderByTimestampTest {
public class PriorityMergeReaderByTimestampTest {
@Test
public void test() throws IOException {
FakedPrioritySeriesReaderByTimestamp reader1 = new FakedPrioritySeriesReaderByTimestamp(100,
200, 5, 11);
FakedPrioritySeriesReaderByTimestamp reader2 = new FakedPrioritySeriesReaderByTimestamp(850,
200, 7, 19);
FakedPrioritySeriesReaderByTimestamp reader3 = new FakedPrioritySeriesReaderByTimestamp(1080,
200, 13, 31);
SeriesReaderByTimestamp priorityReader = new SeriesReaderByTimestamp();
IReaderByTimestamp reader1 = new FakedReaderByTimestamp(100, 200, 5, 11);
IReaderByTimestamp reader2 = new FakedReaderByTimestamp(850, 200, 7, 19);
IReaderByTimestamp reader3 = new FakedReaderByTimestamp(1080, 200, 13, 31);
PriorityMergeReaderByTimestamp priorityReader = new PriorityMergeReaderByTimestamp();
priorityReader.addReaderWithPriority(reader1, 1);
priorityReader.addReaderWithPriority(reader2, 2);
priorityReader.addReaderWithPriority(reader3, 3);
......@@ -50,9 +48,9 @@ public class SeriesReaderByTimestampTest {
Random random = new Random();
for (long time = 4; time < 1080 + 200 * 13 + 600; ) {
Long value = (Long) priorityReader.getValueInTimestamp(time);
if (time < 1080 + 199 * 13) {
Assert.assertTrue(priorityReader.hasNext());
}
// if (time < 1080 + 199 * 13) {
// Assert.assertTrue(priorityReader.hasNext());
// }
//System.out.println("time = " + time + " value = " + value);
if (time < 100) {
......@@ -93,7 +91,7 @@ public class SeriesReaderByTimestampTest {
}
public static class FakedPrioritySeriesReaderByTimestamp implements IReaderByTimeStamp,
public static class FakedReaderByTimestamp implements IReaderByTimestamp,
IPointReader {
private Iterator<TimeValuePair> iterator;
......@@ -101,7 +99,7 @@ public class SeriesReaderByTimestampTest {
private boolean hasCachedTimeValuePair;
private TimeValuePair cachedTimeValuePair;
public FakedPrioritySeriesReaderByTimestamp(long startTime, int size, int interval,
public FakedReaderByTimestamp(long startTime, int size, int interval,
int modValue) {
long time = startTime;
List<TimeValuePair> list = new ArrayList<>();
......
......@@ -16,17 +16,17 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.query.reader.merge;
package org.apache.iotdb.db.query.reader.universal;
import java.io.IOException;
import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.query.reader.unsequence.UnsequenceSeriesReader;
import org.apache.iotdb.db.utils.TimeValuePair;
import org.apache.iotdb.db.utils.TsPrimitiveType;
import org.junit.Assert;
import org.junit.Test;
public class SeriesMergeSortReaderTest {
public class PriorityMergeReaderTest {
@Test
public void test2S() throws IOException {
......@@ -52,15 +52,14 @@ public class SeriesMergeSortReaderTest {
}
private void test(long[] retTimestamp, long[] retValue, long[]... sources) throws IOException {
UnsequenceSeriesReader seriesMergeSortReader = new UnsequenceSeriesReader();
PriorityMergeReader priorityMergeReader = new PriorityMergeReader();
for (int i = 0; i < sources.length; i++) {
seriesMergeSortReader.addReaderWithPriority(
new FakedSeriesReader(sources[i], i + 1), i + 1);
priorityMergeReader.addReaderWithPriority(new FakedSeriesReader(sources[i], i + 1), i + 1);
}
int i = 0;
while (seriesMergeSortReader.hasNext()) {
TimeValuePair timeValuePair = seriesMergeSortReader.next();
while (priorityMergeReader.hasNext()) {
TimeValuePair timeValuePair = priorityMergeReader.next();
Assert.assertEquals(retTimestamp[i], timeValuePair.getTimestamp());
Assert.assertEquals(retValue[i], timeValuePair.getValue().getValue());
i++;
......
......@@ -16,24 +16,21 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.query.reader.merge;
package org.apache.iotdb.db.query.reader.universal;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.query.reader.unsequence.UnsequenceSeriesReader;
import org.apache.iotdb.db.utils.TimeValuePair;
import org.apache.iotdb.db.utils.TsPrimitiveType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.junit.Assert;
import org.junit.Test;
/**
* Test {@code UnsequenceSeriesReader}
*/
public class UnsequenceSeriesReaderTest {
public class PriorityMergeReaderTest2 {
@Test
public void test() throws IOException {
......@@ -41,14 +38,14 @@ public class UnsequenceSeriesReaderTest {
FakedPrioritySeriesReader reader2 = new FakedPrioritySeriesReader(150, 60, 6, 19);
FakedPrioritySeriesReader reader3 = new FakedPrioritySeriesReader(180, 50, 7, 31);
UnsequenceSeriesReader unsequenceSeriesReader = new UnsequenceSeriesReader();
unsequenceSeriesReader.addReaderWithPriority(reader1, 3);
unsequenceSeriesReader.addReaderWithPriority(reader2, 2);
unsequenceSeriesReader.addReaderWithPriority(reader3, 1);
PriorityMergeReader priorityMergeReader = new PriorityMergeReader();
priorityMergeReader.addReaderWithPriority(reader1, 3);
priorityMergeReader.addReaderWithPriority(reader2, 2);
priorityMergeReader.addReaderWithPriority(reader3, 1);
int cnt = 0;
while (unsequenceSeriesReader.hasNext()) {
TimeValuePair timeValuePair = unsequenceSeriesReader.next();
while (priorityMergeReader.hasNext()) {
TimeValuePair timeValuePair = priorityMergeReader.next();
long time = timeValuePair.getTimestamp();
long value = timeValuePair.getValue().getLong();
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.query.reader.unsequence;
import org.apache.iotdb.db.engine.MetadataManagerHelper;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.query.factory.SeriesReaderFactoryImpl;
import org.apache.iotdb.db.query.reader.IReaderByTimeStamp;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.write.record.TSRecord;
import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class UnseqSeriesReaderByTimestampTest {
private String systemDir = "data/info";
private String deviceId = "root.vehicle.d0";
private String measurementId = "s0";
@Before
public void setUp() throws Exception {
MetadataManagerHelper.initMetadata();
EnvironmentUtils.envSetUp();
}
@After
public void tearDown() throws Exception {
EnvironmentUtils.cleanEnv();
EnvironmentUtils.cleanDir(systemDir);
}
@Test
public void testUnseqSeriesReaderByTimestamp() throws IOException, StorageEngineException {
// write
for (int j = 1; j <= 10; j++) {
TSRecord record = new TSRecord(j, deviceId);
record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
StorageEngine.getInstance().insert(new InsertPlan(record));
StorageEngine.getInstance().asyncFlushAndSealAllFiles();
}
// for (int j = 10; j >= 1; j--) {
// TSRecord record = new TSRecord(j, deviceId);
// record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
// StorageEngine.getInstance().insert(new InsertPlan(record));
// StorageEngine.getInstance().asyncFlushAndSealAllFiles();
// }
TSRecord record = new TSRecord(2, deviceId);
record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(100)));
StorageEngine.getInstance().insert(new InsertPlan(record));
// StorageEngine.getInstance().asyncFlushAndSealAllFiles();
// query
List<Path> paths = new ArrayList<>();
paths.add(new Path(deviceId, measurementId));
List<IReaderByTimeStamp> readers = SeriesReaderFactoryImpl.getInstance().
createSeriesReadersByTimestamp(paths, EnvironmentUtils.TEST_QUERY_CONTEXT);
Assert.assertEquals(1, readers.size());
IReaderByTimeStamp reader = readers.get(0);
for (long time = 1; time <= 10; time++) {
// NOTE that the timestamps should be in be in strictly increasing order.
Integer value = (Integer) reader.getValueInTimestamp(time);
if (time == 2) {
Assert.assertEquals(100, (int) value);
} else {
Assert.assertEquals(time, (int) value);
}
}
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.query.reader.unsequence;
import java.io.IOException;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.factory.SeriesReaderFactoryImpl;
import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.query.reader.ReaderTestHelper;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.db.utils.TimeValuePair;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.filter.TimeFilter;
import org.junit.Assert;
import org.junit.Test;
public class UnsequenceSeriesReaderTest extends ReaderTestHelper {
private QueryContext context = EnvironmentUtils.TEST_QUERY_CONTEXT;
@Override
protected void insertData() {
for (int j = 1; j <= 10; j++) {
insertOneRecord(j, j);
}
storageGroupProcessor.getWorkSequenceTsFileProcessor().asyncFlush();
for (int j = 10; j >= 1; j--) {
insertOneRecord(j, j);
}
storageGroupProcessor.getWorkSequenceTsFileProcessor().asyncFlush();
insertOneRecord(2, 100);
}
@Test
public void testUnseqSeriesReaderWithGlobalTimeFilter() throws IOException {
Path path = new Path(deviceId, measurementId);
QueryDataSource queryDataSource = storageGroupProcessor
.query(deviceId, measurementId, context);
IPointReader reader = SeriesReaderFactoryImpl.getInstance().createUnseqSeriesReader(path,
queryDataSource.getUnseqResources(), EnvironmentUtils.TEST_QUERY_CONTEXT,
TimeFilter.eq(4));
int cnt = 0;
while (reader.hasNext()) {
cnt++;
TimeValuePair timeValuePair = reader.next();
Assert.assertEquals(4, timeValuePair.getTimestamp());
Assert.assertEquals(4, timeValuePair.getValue().getInt());
}
Assert.assertEquals(1, cnt);
}
@Test
public void testUnseqSeriesReaderWithoutFilter() throws IOException {
Path path = new Path(deviceId, measurementId);
QueryDataSource queryDataSource = storageGroupProcessor
.query(deviceId, measurementId, context);
IPointReader reader = SeriesReaderFactoryImpl.getInstance().createUnseqSeriesReader(path,
queryDataSource.getUnseqResources(), EnvironmentUtils.TEST_QUERY_CONTEXT, null);
int cnt = 0;
while (reader.hasNext()) {
cnt++;
TimeValuePair timeValuePair = reader.next();
if (cnt == 2) {
Assert.assertEquals(2, timeValuePair.getTimestamp());
Assert.assertEquals(100, timeValuePair.getValue().getInt());
} else {
Assert.assertEquals(cnt, timeValuePair.getTimestamp());
Assert.assertEquals(cnt, timeValuePair.getValue().getInt());
}
}
Assert.assertEquals(10, cnt);
}
// Note that createUnSeqByTimestampReader is of private use.
}
......@@ -28,8 +28,8 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.engine.version.VersionController;
import org.apache.iotdb.db.exception.ProcessorException;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.query.reader.unsequence.UnsequenceSeriesReader;
import org.apache.iotdb.db.query.reader.unsequence.DiskChunkReader;
import org.apache.iotdb.db.query.reader.chunkRelated.DiskChunkReader;
import org.apache.iotdb.db.query.reader.universal.PriorityMergeReader;
import org.apache.iotdb.db.utils.TimeValuePair;
import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
import org.apache.iotdb.db.writelog.node.WriteLogNode;
......@@ -135,18 +135,16 @@ public class UnseqTsFileRecoverTest {
Path path = new Path("device1", "sensor1");
UnsequenceSeriesReader unSeqMergeReader = new UnsequenceSeriesReader();
PriorityMergeReader unSeqMergeReader = new PriorityMergeReader();
int priorityValue = 1;
for (ChunkMetaData chunkMetaData : metadataQuerier.getChunkMetaDataList(path)) {
Chunk chunk = chunkLoader.getChunk(chunkMetaData);
ChunkReader chunkReader = new ChunkReaderWithoutFilter(chunk);
unSeqMergeReader
.addReaderWithPriority(new DiskChunkReader(chunkReader),
priorityValue);
unSeqMergeReader.addReaderWithPriority(new DiskChunkReader(chunkReader), priorityValue);
priorityValue++;
}
for (int i = 0; i < 10; i++) {
TimeValuePair timeValuePair = unSeqMergeReader.current();
assertEquals(i, timeValuePair.getTimestamp());
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册