提交 f951bd37 编写于 作者: 刘涛华's avatar 刘涛华 提交者: Jialin Qiao

[IOTDB-306] new reader with ChunkMetaData for aggregateFunction (#614)

* new hasNextChunk and hasNextPageInCurrentChunk for IAggregateReader
上级 122fba05
......@@ -24,6 +24,7 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.query.reader.IReaderByTimestamp;
import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.BatchData;
......@@ -119,4 +120,7 @@ public abstract class AggregateFunction {
public TSDataType getResultDataType() {
return resultDataType;
}
public abstract void calculateValueFromChunkMetaData(
ChunkMetaData chunkMetaData) throws QueryProcessException;
}
......@@ -26,6 +26,7 @@ import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.query.reader.IReaderByTimestamp;
import org.apache.iotdb.db.utils.TimeValuePair;
import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.BatchData;
......@@ -163,6 +164,12 @@ public class AvgAggrFunc extends AggregateFunction {
return false;
}
@Override
public void calculateValueFromChunkMetaData(ChunkMetaData chunkMetaData) {
sum += chunkMetaData.getStatistics().getSumValue();
cnt += chunkMetaData.getNumOfPoints();
}
/**
* Return type name of aggregation
*/
......
......@@ -25,6 +25,7 @@ import org.apache.iotdb.db.query.aggregation.AggregateFunction;
import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.query.reader.IReaderByTimestamp;
import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.slf4j.Logger;
......@@ -152,4 +153,11 @@ public class CountAggrFunc extends AggregateFunction {
public boolean isCalculatedAggregationResult() {
return false;
}
@Override
public void calculateValueFromChunkMetaData(ChunkMetaData chunkMetaData) {
long preValue = resultData.getLongRet();
preValue += chunkMetaData.getNumOfPoints();
resultData.setLongRet(preValue);
}
}
......@@ -26,6 +26,7 @@ import org.apache.iotdb.db.query.aggregation.AggregateFunction;
import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.query.reader.IReaderByTimestamp;
import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.BatchData;
......@@ -151,4 +152,18 @@ public class FirstValueAggrFunc extends AggregateFunction {
public boolean isCalculatedAggregationResult() {
return resultData.isSetTime();
}
@Override
public void calculateValueFromChunkMetaData(ChunkMetaData chunkMetaData)
throws QueryProcessException {
if (resultData.isSetTime()) {
return;
}
Object firstVal = chunkMetaData.getStatistics().getFirstValue();
if (firstVal == null) {
throw new QueryProcessException("chunkMetaData contains no FIRST value");
}
resultData.putTimeAndValue(0, firstVal);
}
}
......@@ -26,6 +26,7 @@ import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.query.reader.IReaderByTimestamp;
import org.apache.iotdb.db.utils.TimeValuePair;
import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.BatchData;
......@@ -138,6 +139,12 @@ public class LastValueAggrFunc extends AggregateFunction {
return false;
}
@Override
public void calculateValueFromChunkMetaData(ChunkMetaData chunkMetaData) {
Object lastVal = chunkMetaData.getStatistics().getLastValue();
updateLastResult(chunkMetaData.getEndTime(), lastVal);
}
private void updateLastResult(long time, Object value) {
if (!resultData.isSetTime()) {
resultData.putTimeAndValue(time, value);
......
......@@ -26,6 +26,7 @@ import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.query.reader.IReaderByTimestamp;
import org.apache.iotdb.db.utils.TimeValuePair;
import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.BatchData;
......@@ -126,6 +127,12 @@ public class MaxTimeAggrFunc extends AggregateFunction {
return false;
}
@Override
public void calculateValueFromChunkMetaData(ChunkMetaData chunkMetaData) {
long maxTimestamp = chunkMetaData.getEndTime();
updateMaxTimeResult(0, maxTimestamp);
}
private void updateMaxTimeResult(long time, long value) {
if (!resultData.isSetValue() || value >= resultData.getLongRet()) {
resultData.setTimestamp(time);
......
......@@ -25,6 +25,7 @@ import org.apache.iotdb.db.query.aggregation.AggregateFunction;
import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.query.reader.IReaderByTimestamp;
import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.BatchData;
......@@ -159,6 +160,12 @@ public class MaxValueAggrFunc extends AggregateFunction {
return false;
}
@Override
public void calculateValueFromChunkMetaData(ChunkMetaData chunkMetaData) {
Comparable<Object> maxVal = (Comparable<Object>) chunkMetaData.getStatistics().getMaxValue();
updateResult(maxVal);
}
private void updateResult(Comparable<Object> maxVal) {
if (maxVal == null) {
return;
......
......@@ -25,6 +25,7 @@ import org.apache.iotdb.db.query.aggregation.AggregateFunction;
import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.query.reader.IReaderByTimestamp;
import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.BatchData;
......@@ -153,4 +154,13 @@ public class MinTimeAggrFunc extends AggregateFunction {
return resultData.isSetValue();
}
@Override
public void calculateValueFromChunkMetaData(ChunkMetaData chunkMetaData) {
if (resultData.isSetValue()) {
return;
}
long time = chunkMetaData.getStartTime();
resultData.putTimeAndValue(0, time);
}
}
......@@ -25,6 +25,7 @@ import org.apache.iotdb.db.query.aggregation.AggregateFunction;
import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.query.reader.IReaderByTimestamp;
import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.BatchData;
......@@ -154,6 +155,12 @@ public class MinValueAggrFunc extends AggregateFunction {
return false;
}
@Override
public void calculateValueFromChunkMetaData(ChunkMetaData chunkMetaData) {
Comparable<Object> minVal = (Comparable<Object>) chunkMetaData.getStatistics().getMinValue();
updateResult(minVal);
}
private void updateResult(Comparable<Object> minVal) {
if (minVal == null) {
return;
......
......@@ -30,8 +30,10 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.query.aggregation.AggreResultData;
import org.apache.iotdb.db.query.aggregation.AggregateFunction;
import org.apache.iotdb.db.query.aggregation.impl.FirstValueAggrFunc;
import org.apache.iotdb.db.query.aggregation.impl.LastValueAggrFunc;
import org.apache.iotdb.db.query.aggregation.impl.MaxTimeAggrFunc;
import org.apache.iotdb.db.query.aggregation.impl.MinTimeAggrFunc;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.db.query.dataset.AggreResultDataPointReader;
......@@ -45,6 +47,7 @@ import org.apache.iotdb.db.query.reader.resourceRelated.UnseqResourceMergeReader
import org.apache.iotdb.db.query.reader.seriesRelated.SeriesReaderByTimestamp;
import org.apache.iotdb.db.query.timegenerator.EngineTimeGenerator;
import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.common.Path;
......@@ -134,10 +137,10 @@ public class AggregateEngineExecutor {
/**
* calculation aggregate result with only time filter or no filter for one series.
*
* @param function aggregate function
* @param sequenceReader sequence data reader
* @param function aggregate function
* @param sequenceReader sequence data reader
* @param unSequenceReader unsequence data reader
* @param filter time filter or null
* @param filter time filter or null
* @return one series aggregate result data
*/
private AggreResultData aggregateWithoutValueFilter(AggregateFunction function,
......@@ -148,23 +151,38 @@ public class AggregateEngineExecutor {
filter);
}
while (sequenceReader.hasNext()) {
PageHeader pageHeader = sequenceReader.nextPageHeader();
// judge if overlap with unsequence data
if (canUseHeader(function, pageHeader, unSequenceReader, filter)) {
// cal by pageHeader
function.calculateValueFromPageHeader(pageHeader);
sequenceReader.skipPageData();
} else {
// cal by pageData
function.calculateValueFromPageData(sequenceReader.nextBatch(), unSequenceReader);
while (sequenceReader.hasNextChunk()) {
ChunkMetaData chunkMetaData = sequenceReader.nextChunkMeta();
if (chunkMetaData != null && canUseHeader(function, chunkMetaData.getStartTime(),
chunkMetaData.getEndTime(), unSequenceReader, filter)) {
function.calculateValueFromChunkMetaData(chunkMetaData);
if (isEarlyBreakFunc(function)) {
break;
}
continue;
}
while (sequenceReader.hasNextPageInCurrentChunk()) {
PageHeader pageHeader = sequenceReader.nextPageHeader();
// judge if overlap with unsequence data
if (pageHeader != null && canUseHeader(function, pageHeader.getStartTime(),
pageHeader.getEndTime(),
unSequenceReader, filter)) {
// cal by pageHeader
function.calculateValueFromPageHeader(pageHeader);
if (isEarlyBreakFunc(function)) {
break;
}
sequenceReader.skipPageData();
} else {
// cal by pageData
function.calculateValueFromPageData(sequenceReader.nextBatch(), unSequenceReader);
}
if (function.isCalculatedAggregationResult()) {
return function.getResult();
if (function.isCalculatedAggregationResult()) {
return function.getResult();
}
}
}
// cal with unsequence data
if (unSequenceReader.hasNext()) {
function.calculateValueFromUnsequenceReader(unSequenceReader);
......@@ -172,20 +190,19 @@ public class AggregateEngineExecutor {
return function.getResult();
}
private boolean isEarlyBreakFunc(AggregateFunction function) {
if (function instanceof FirstValueAggrFunc || function instanceof MinTimeAggrFunc) {
return true;
}
return false;
}
/**
* determine whether pageHeader can be used to compute aggregation results.
*/
private boolean canUseHeader(AggregateFunction function, PageHeader pageHeader,
private boolean canUseHeader(AggregateFunction function, long minTime, long maxTime,
IPointReader unSequenceReader, Filter filter)
throws IOException, QueryProcessException {
// if page data is memory data.
if (pageHeader == null) {
return false;
}
long minTime = pageHeader.getStartTime();
long maxTime = pageHeader.getEndTime();
// If there are points in the page that do not satisfy the time filter,
// page header cannot be used to calculate.
if (filter != null && !filter.containStartEndTime(minTime, maxTime)) {
......@@ -202,8 +219,8 @@ public class AggregateEngineExecutor {
/**
* handle last and max_time aggregate function with only time filter or no filter.
*
* @param function aggregate function
* @param sequenceReader sequence data reader
* @param function aggregate function
* @param sequenceReader sequence data reader
* @param unSequenceReader unsequence data reader
* @return BatchData-aggregate result
*/
......@@ -211,41 +228,46 @@ public class AggregateEngineExecutor {
IAggregateReader sequenceReader, IPointReader unSequenceReader, Filter timeFilter)
throws IOException, QueryProcessException {
long lastBatchTimeStamp = Long.MIN_VALUE;
boolean isChunkEnd = false;
while (sequenceReader.hasNext()) {
PageHeader pageHeader = sequenceReader.nextPageHeader();
// judge if overlap with unsequence data
if (canUseHeader(function, pageHeader, unSequenceReader, timeFilter)) {
// cal by pageHeader
function.calculateValueFromPageHeader(pageHeader);
sequenceReader.skipPageData();
if (lastBatchTimeStamp > pageHeader.getStartTime()) {
// the chunk is end.
isChunkEnd = true;
} else {
// current page and last page are in the same chunk.
while (sequenceReader.hasNextChunk()) {
ChunkMetaData chunkMetaData = sequenceReader.nextChunkMeta();
// if can use chunkMetaData ,skip this chunk
if (chunkMetaData != null && canUseHeader(function, chunkMetaData.getStartTime(),
chunkMetaData.getEndTime(), unSequenceReader, timeFilter)) {
function.calculateValueFromChunkMetaData(chunkMetaData);
//if this chunk is last chunk, broken out of the loop
if (lastBatchTimeStamp > chunkMetaData.getStartTime()) {
break;
}
lastBatchTimeStamp = chunkMetaData.getStartTime();
continue;
}
//if can't use chunkMetaData, try to use pageHeader
while (sequenceReader.hasNextPageInCurrentChunk()) {
PageHeader pageHeader = sequenceReader.nextPageHeader();
if (pageHeader != null && canUseHeader(function, pageHeader.getStartTime(),
pageHeader.getEndTime(),
unSequenceReader, timeFilter)) {
// cal by pageHeader
function.calculateValueFromPageHeader(pageHeader);
sequenceReader.skipPageData();
//if this page is last chunk, broken out of the loop
if (lastBatchTimeStamp > pageHeader.getStartTime()) {
break;
}
lastBatchTimeStamp = pageHeader.getStartTime();
continue;
}
} else {
// cal by pageData
BatchData batchData = sequenceReader.nextBatch();
if (batchData.length() > 0) {
if (lastBatchTimeStamp > batchData.currentTime()) {
// the chunk is end.
isChunkEnd = true;
} else {
// current page and last page are in the same chunk.
lastBatchTimeStamp = batchData.currentTime();
break;
}
lastBatchTimeStamp = batchData.currentTime();
function.calculateValueFromPageData(batchData, unSequenceReader);
}
}
if (isChunkEnd) {
break;
}
}
// cal with unsequence data
if (unSequenceReader.hasNext()) {
function.calculateValueFromUnsequenceReader(unSequenceReader);
......
......@@ -20,9 +20,15 @@ package org.apache.iotdb.db.query.reader;
import java.io.IOException;
import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
public interface IAggregateReader extends IBatchReader {
boolean hasNextChunk() throws IOException;
ChunkMetaData nextChunkMeta();
boolean hasNextPageInCurrentChunk() throws IOException;
/**
* Returns meta-information of batch data.
* <p>
......
......@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.query.reader.chunkRelated;
import java.io.IOException;
import java.util.Iterator;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.query.reader.IAggregateReader;
......@@ -25,6 +26,7 @@ import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.query.reader.fileRelated.UnSealedTsFileIterateReader;
import org.apache.iotdb.db.utils.TimeValuePair;
import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
......@@ -111,6 +113,21 @@ public class MemChunkReader implements IPointReader, IAggregateReader {
// Do nothing because mem chunk reader will not open files
}
@Override
public boolean hasNextChunk() throws IOException {
return hasNext();
}
@Override
public ChunkMetaData nextChunkMeta() {
return null;
}
@Override
public boolean hasNextPageInCurrentChunk() throws IOException {
return hasNext();
}
@Override
public PageHeader nextPageHeader() {
return null;
......
......@@ -21,6 +21,7 @@ package org.apache.iotdb.db.query.reader.fileRelated;
import java.io.IOException;
import org.apache.iotdb.db.query.reader.IAggregateReader;
import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.reader.series.FileSeriesReader;
......@@ -42,6 +43,21 @@ public class FileSeriesReaderAdapter implements IAggregateReader {
this.fileSeriesReader = fileSeriesReader;
}
@Override
public boolean hasNextChunk() throws IOException {
return fileSeriesReader.hasNextChunk();
}
@Override
public ChunkMetaData nextChunkMeta() {
return fileSeriesReader.currentChunkMeta();
}
@Override
public boolean hasNextPageInCurrentChunk() throws IOException {
return fileSeriesReader.hasNextPageInCurrentChunk();
}
@Override
public PageHeader nextPageHeader() throws IOException {
return fileSeriesReader.nextPageHeader();
......
......@@ -21,6 +21,7 @@ package org.apache.iotdb.db.query.reader.universal;
import java.io.IOException;
import org.apache.iotdb.db.query.reader.IAggregateReader;
import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.read.common.BatchData;
/**
......@@ -83,6 +84,35 @@ public abstract class IterateReader implements IAggregateReader {
currentSeriesReader.skipPageData();
}
@Override
public boolean hasNextChunk() throws IOException {
if (curReaderInitialized && currentSeriesReader.hasNextChunk()) {
return true;
} else {
curReaderInitialized = false;
}
while (nextSeriesReaderIndex < readerSize) {
boolean isConstructed = constructNextReader(nextSeriesReaderIndex++);
if (isConstructed && currentSeriesReader.hasNextChunk()) {
curReaderInitialized = true;
return true;
}
}
return false;
}
@Override
public boolean hasNextPageInCurrentChunk() throws IOException {
return currentSeriesReader.hasNextPageInCurrentChunk();
}
@Override
public ChunkMetaData nextChunkMeta() {
return currentSeriesReader.nextChunkMeta();
}
@Override
public void close() {
// file stream is managed in QueryResourceManager.
......
......@@ -38,6 +38,7 @@ public abstract class FileSeriesReader {
private int chunkToRead;
private BatchData data;
private ChunkMetaData chunkMetaData;
/**
* constructor of FileSeriesReader.
......@@ -49,9 +50,9 @@ public abstract class FileSeriesReader {
}
/**
* check if current chunk has next batch data.
* check if all chunks has next batch data.
*
* @return True if current chunk has next batch data
* @return True if all chunks has next batch data
*/
public boolean hasNextBatch() throws IOException {
......@@ -107,4 +108,39 @@ public abstract class FileSeriesReader {
private ChunkMetaData nextChunkMeta() {
return chunkMetaDataList.get(chunkToRead++);
}
/**
* check current file has next chunk.
*
* @return True if current file has next chunk data
*/
public boolean hasNextPageInCurrentChunk() throws IOException {
if (chunkReader != null && chunkReader.hasNextBatch()) {
return true;
}
return false;
}
/**
* check current chunk has next batch data.
*
* @return
*/
public boolean hasNextChunk() throws IOException {
// current file still have chunks, init new chunk reader
while (chunkToRead < chunkMetaDataList.size()) {
chunkMetaData = nextChunkMeta();
if (chunkSatisfied(chunkMetaData)) {
// chunk metadata satisfy the condition
initChunkReader(chunkMetaData);
return true;
}
}
return false;
}
public ChunkMetaData currentChunkMeta() {
return chunkMetaData;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册