提交 8fc0c7e6 编写于 作者: R RUI, LEI 提交者: 铁头乔

fix unstable unit test and polish ReadOnlyTsFile query logic (#177)

* modify logic: add params in ReadOnlyTsFile's query function not in constructor

* remove loadMode; update the calculation of remaining time ranges

* fix sonar
上级 e7886cab
......@@ -95,10 +95,6 @@ private[tsfile] class DefaultSource extends FileFormat with DataSourceRegister {
}
}
val params = new java.util.HashMap[java.lang.String, java.lang.Long]()
params.put(QueryConstant.PARTITION_START_OFFSET, file.start.asInstanceOf[java.lang.Long])
params.put(QueryConstant.PARTITION_END_OFFSET, (file.start + file.length).asInstanceOf[java.lang.Long])
val tsFileMetaData = reader.readFileMetadata
// get queriedSchema from requiredSchema
......@@ -107,8 +103,9 @@ private[tsfile] class DefaultSource extends FileFormat with DataSourceRegister {
// construct queryExpression based on queriedSchema and filters
val queryExpression = Converter.toQueryExpression(queriedSchema, filters)
val readTsFile: ReadOnlyTsFile = new ReadOnlyTsFile(reader, params)
val queryDataSet = readTsFile.query(queryExpression)
val readTsFile: ReadOnlyTsFile = new ReadOnlyTsFile(reader)
val queryDataSet = readTsFile.query(queryExpression, file.start.asInstanceOf[java.lang.Long],
(file.start + file.length).asInstanceOf[java.lang.Long])
new Iterator[InternalRow] {
private val rowBuffer = Array.fill[Any](requiredSchema.length)(null)
......
......@@ -19,12 +19,6 @@
package org.apache.iotdb.tsfile.common.constant;
public class QueryConstant {
// The start offset for the partition
public static final String PARTITION_START_OFFSET = "partition_start_offset";
// The end offset for the partition
public static final String PARTITION_END_OFFSET = "partition_end_offset";
public static final String RESERVED_TIME = "time";
public static final String BOOLEAN = "BOOLEAN";
......
......@@ -19,7 +19,6 @@
package org.apache.iotdb.tsfile.read;
import java.io.IOException;
import java.util.HashMap;
import org.apache.iotdb.tsfile.read.controller.ChunkLoader;
import org.apache.iotdb.tsfile.read.controller.ChunkLoaderImpl;
import org.apache.iotdb.tsfile.read.controller.MetadataQuerier;
......@@ -45,21 +44,15 @@ public class ReadOnlyTsFile implements AutoCloseable {
tsFileExecutor = new TsFileExecutor(metadataQuerier, chunkLoader);
}
/**
* constructor, create ReadOnlyTsFile with TsFileSequenceReader.
*/
public ReadOnlyTsFile(TsFileSequenceReader fileReader, HashMap<String, Long> params)
throws IOException {
this.fileReader = fileReader;
this.metadataQuerier = new MetadataQuerierByFileImpl(fileReader, params);
this.chunkLoader = new ChunkLoaderImpl(fileReader);
tsFileExecutor = new TsFileExecutor(metadataQuerier, chunkLoader);
}
public QueryDataSet query(QueryExpression queryExpression) throws IOException {
return tsFileExecutor.execute(queryExpression);
}
public QueryDataSet query(QueryExpression queryExpression, long partitionStartOffset,
long partitionEndOffset) throws IOException {
return tsFileExecutor.execute(queryExpression, partitionStartOffset, partitionEndOffset);
}
public void close() throws IOException {
fileReader.close();
}
......
......@@ -19,6 +19,8 @@
package org.apache.iotdb.tsfile.read.common;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import org.apache.iotdb.tsfile.read.expression.IExpression;
import org.apache.iotdb.tsfile.read.expression.impl.BinaryExpression;
......@@ -46,6 +48,7 @@ public class TimeRange implements Comparable<TimeRange> {
/**
* Initialize a closed interval [min,max].
*
* @param min the left endpoint of the closed interval
* @param max the right endpoint of the closed interval
*/
......@@ -75,11 +78,18 @@ public class TimeRange implements Comparable<TimeRange> {
}
}
/**
* @return true if the value lies between the min and max values, inclusively
*/
public boolean contains(long value) {
return min <= value && max >= value;
public void setMin(long min) {
if (min < 0 || min > this.max) {
throw new IllegalArgumentException("Invalid input!");
}
this.min = min;
}
public void setMax(long max) {
if (max < 0 || max < this.min) {
throw new IllegalArgumentException("Invalid input!");
}
this.max = max;
}
/**
......@@ -89,22 +99,19 @@ public class TimeRange implements Comparable<TimeRange> {
return min <= r.min && max >= r.max;
}
/**
* @param min
* @param max
* Set a closed interval [min,max].
*
* @param min the left endpoint of the closed interval
* @param max the right endpoint of the closed interval
*/
public void set(long min, long max) {
if (min > max) {
throw new IllegalArgumentException("min should not be larger than max.");
}
this.min = min;
this.max = max;
sort();
}
/**
* @param r
*/
public void set(TimeRange r) {
set(r.getMin(), r.getMax());
}
/**
......@@ -114,15 +121,6 @@ public class TimeRange implements Comparable<TimeRange> {
return min;
}
/**
* @param min
*/
public void setMin(long min) {
this.min = min;
sort();
}
/**
* @return The upper range boundary
*/
......@@ -130,60 +128,61 @@ public class TimeRange implements Comparable<TimeRange> {
return max;
}
/**
* @param max
*/
public void setMax(long max) {
this.max = max;
sort();
}
private void sort() {
if (min > max) {
long t = min;
min = max;
max = t;
}
}
/**
* @return <code>true</code> if the intersection exists
* Here are some examples.
*
* [1,3] does not intersect with (4,5].
*
* [1,3) does not intersect with (3,5]
*
* [1,3] does not intersect with [5,6].
*
* [1,3] intersects with [2,5].
*
* [1,3] intersects with (3,5].
*
* [1,3) intersects with (2,5].
*
* @param r the given time range
* @return true if the current time range intersects with the given time range r
*/
public boolean intersection(TimeRange r, TimeRange dest) {
if (intersects(r)) {
dest.set(Math.max(min, r.min), Math.min(max, r.max));
private boolean intersects(TimeRange r) {
if ((!leftClose || !r.rightClose) && (r.max < min)) {
// e.g., [1,3] does not intersect with (4,5].
return false;
} else if (!leftClose && !r.rightClose && r.max <= min) {
// e.g.,[1,3) does not intersect with (3,5]
return false;
} else if (leftClose && r.rightClose && r.max <= min - 2) {
// e.g.,[1,3] does not intersect with [5,6].
return true;
} else if ((!rightClose || !r.leftClose) && (r.min > max)) {
return false;
} else if (!rightClose && r.leftClose && r.min >= max) {
return false;
} else if (rightClose && r.leftClose && r.min >= max + 2) {
return false;
} else {
return true;
}
return false;
}
/**
* @return <code>true</code> if the ranges have values in common
*/
public boolean intersects(TimeRange r) {
return overlaps(min, max, r.min, r.max);
}
/**
* @return <code>true</code> if the ranges overlap
*/
public static boolean overlaps(long minA, long maxA, long minB, long maxB) {
if (minA > maxA) {
throw new IllegalArgumentException("Invalid input: minA should not be larger than maxA.");
}
if (minB > maxB) {
throw new IllegalArgumentException("Invalid input: minB should not be larger than maxB.");
}
// Because timestamp is long data type, x and x+1 are considered continuous.
return !(minA >= maxB + 2 || maxA <= minB - 2);
}
@Override
public String toString() {
return "[ " + min + " : " + max + " ]";
StringBuilder res = new StringBuilder();
if (leftClose) {
res.append("[ ");
} else {
res.append("( ");
}
res.append(min).append(" : ").append(max);
if (rightClose) {
res.append(" ]");
} else {
res.append(" )");
}
return res.toString();
}
// NOTE the primitive timeRange is always a closed interval [min,max] and
......@@ -191,11 +190,11 @@ public class TimeRange implements Comparable<TimeRange> {
private boolean leftClose = true; // default true
private boolean rightClose = true; // default true
public void setLeftClose(boolean leftClose) {
private void setLeftClose(boolean leftClose) {
this.leftClose = leftClose;
}
public void setRightClose(boolean rightClose) {
private void setRightClose(boolean rightClose) {
this.rightClose = rightClose;
}
......@@ -207,65 +206,105 @@ public class TimeRange implements Comparable<TimeRange> {
return rightClose;
}
/**
* Return the union of the given time ranges.
*
* @param unionCandidates time ranges to be merged
* @return the union of time ranges
*/
public static List<TimeRange> sortAndMerge(List<TimeRange> unionCandidates) {
//sort the time ranges in ascending order of the start time
Collections.sort(unionCandidates);
ArrayList<TimeRange> unionResult = new ArrayList<>();
Iterator<TimeRange> iterator = unionCandidates.iterator();
TimeRange rangeCurr;
if (!iterator.hasNext()) {
return unionResult;
} else {
rangeCurr = iterator.next();
}
while (iterator.hasNext()) {
TimeRange rangeNext = iterator.next();
if (rangeCurr.intersects(rangeNext)) {
rangeCurr.set(Math.min(rangeCurr.getMin(), rangeNext.getMin()),
Math.max(rangeCurr.getMax(), rangeNext.getMax()));
} else {
unionResult.add(rangeCurr);
rangeCurr = rangeNext;
}
}
unionResult.add(rangeCurr);
return unionResult;
}
/**
* Get the remaining time ranges in the current ranges but not in timeRangesPrev.
*
* NOTE the primitive timeRange is always a closed interval [min,max] and only in this function
* are leftClose and rightClose changed.
*
* @param timeRangesPrev time ranges union in ascending order
* @param timeRangesPrev time ranges union in ascending order of the start time
* @return the remaining time ranges
*/
public List<TimeRange> getRemains(ArrayList<TimeRange> timeRangesPrev) {
public List<TimeRange> getRemains(List<TimeRange> timeRangesPrev) {
List<TimeRange> remains = new ArrayList<>();
for (TimeRange prev : timeRangesPrev) {
if (prev.min >= max + 2) { // keep consistent with the definition of `overlap`
break; // break early since timeRangesPrev is sorted
// +2 is to keep consistent with the definition of `intersects` of two closed intervals
if (prev.min >= max + 2) {
// break early since timeRangesPrev is sorted
break;
}
if (intersects(prev)) {
if (prev.contains(this)) {
// e.g., this=[3,5], prev=[1,10]
// e.g., this=[3,5], prev=[3,5] Note that in this case, prev contains this and vice versa.
return remains;
} else if (this.contains(prev)) {
if (prev.min > this.min && prev.max == this.max) {
TimeRange r = new TimeRange(this.min, prev.min);
r.setLeftClose(this.leftClose);
r.setRightClose(false);
remains.add(r);
return remains; // because timeRangesPrev is sorted
} else if (prev.min == this.min) { // && prev.max < this.max
// e.g., this=[1,6], prev=[3,6]
this.setMax(prev.min);
this.setRightClose(false);
remains.add(this);
// return the final result because timeRangesPrev is sorted
return remains;
} else if (prev.min == this.min) {
// Note prev.max < this.max
// e.g., this=[1,10], prev=[1,4]
min = prev.max;
leftClose = false;
} else {
// e.g., prev=[3,6], this=[1,10]
TimeRange r = new TimeRange(this.min, prev.min);
r.setLeftClose(this.leftClose);
r.setRightClose(false);
remains.add(r);
min = prev.max;
leftClose = false;
}
} else { // intersect without one containing the other
} else {
// intersect without one containing the other
if (prev.min < this.min) {
// e.g., this=[3,10], prev=[1,6]
min = prev.max;
leftClose = false;
} else {
TimeRange r = new TimeRange(this.min, prev.min);
r.setLeftClose(this.leftClose);
r.setRightClose(false);
remains.add(r);
// e.g., this=[1,8], prev=[5,12]
this.setMax(prev.min);
this.setRightClose(false);
remains.add(this);
// return the final result because timeRangesPrev is sorted
return remains;
}
}
}
}
TimeRange r = new TimeRange(this.min, this.max);
r.setLeftClose(this.leftClose);
r.setRightClose(this.rightClose);
remains.add(r);
remains.add(this);
return remains;
}
......
......@@ -19,7 +19,6 @@
package org.apache.iotdb.tsfile.read.controller;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.iotdb.tsfile.exception.write.NoMeasurementException;
......@@ -54,37 +53,13 @@ public interface MetadataQuerier {
TSDataType getDataType(String measurement) throws NoMeasurementException;
/**
* get time ranges of chunkGroups in or before the current partition and return the union result in ascending order
* Convert the space partition constraint to the time partition constraint.
*
* @param paths timeseries paths
* @param targetMode InPartition or PrevPartition
* @return time ranges union in ascending order
* @throws IOException
* @param paths selected paths in a query expression
* @param spacePartitionStartPos the start position of the space partition
* @param spacePartitionEndPos the end position of the space partition
* @return the converted time partition constraint
*/
public ArrayList<TimeRange> getTimeRangeInOrPrev(List<Path> paths, LoadMode targetMode)
throws IOException;
/**
* get the load mode of the MetadataQuerier
*
* @return LoadMode enum
*/
LoadMode getLoadMode();
/**
* set the load mode of the MetadataQuerier
*
* @param mode enum
*/
void setLoadMode(LoadMode mode);
/**
* The load mode of the MetadataQuerier:
* NoPartition - load metadata of all chunkgroups in the file
* InPartition - load metadata of chunkgroups which fall in the current partition
* PrevPartition - load metadata of chunkgroups which fall ahead the current partition
*/
enum LoadMode {
NoPartition, InPartition, PrevPartition
}
List<TimeRange> convertSpace2TimePartition(List<Path> paths, long spacePartitionStartPos,
long spacePartitionEndPos) throws IOException;
}
......@@ -23,13 +23,11 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import org.apache.iotdb.tsfile.common.cache.LRUCache;
import org.apache.iotdb.tsfile.common.constant.QueryConstant;
import org.apache.iotdb.tsfile.exception.write.NoMeasurementException;
import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
......@@ -52,50 +50,12 @@ public class MetadataQuerierByFileImpl implements MetadataQuerier {
private TsFileSequenceReader tsFileReader;
private LoadMode mode;
private long partitionStartOffset = 0L;
private long partitionEndOffset = 0L;
public LoadMode getLoadMode() {
return mode;
}
public void setLoadMode(LoadMode mode) {
this.mode = mode;
}
/**
* Constructor of MetadataQuerierByFileImpl.
*/
public MetadataQuerierByFileImpl(TsFileSequenceReader tsFileReader) throws IOException {
this.tsFileReader = tsFileReader;
this.fileMetaData = tsFileReader.readFileMetadata();
this.mode = LoadMode.NoPartition;
chunkMetaDataCache = new LRUCache<Path, List<ChunkMetaData>>(CHUNK_METADATA_CACHE_SIZE) {
@Override
public List<ChunkMetaData> loadObjectByKey(Path key) throws IOException {
return loadChunkMetadata(key);
}
};
}
/**
* Constructor of MetadataQuerierByFileImpl.
*/
public MetadataQuerierByFileImpl(TsFileSequenceReader tsFileReader, HashMap<String, Long> params)
throws IOException {
this.tsFileReader = tsFileReader;
this.fileMetaData = tsFileReader.readFileMetadata();
if (!params.containsKey(QueryConstant.PARTITION_START_OFFSET) || !params
.containsKey(QueryConstant.PARTITION_END_OFFSET)) {
throw new IllegalArgumentException(
"Input parameters miss partition_start_offset or partition_end_offset");
}
this.mode = LoadMode.InPartition;
this.partitionStartOffset = params.get(QueryConstant.PARTITION_START_OFFSET);
this.partitionEndOffset = params.get(QueryConstant.PARTITION_END_OFFSET);
chunkMetaDataCache = new LRUCache<Path, List<ChunkMetaData>>(CHUNK_METADATA_CACHE_SIZE) {
@Override
public List<ChunkMetaData> loadObjectByKey(Path key) throws IOException {
......@@ -168,9 +128,6 @@ public class MetadataQuerierByFileImpl implements MetadataQuerier {
break;
}
if (!checkAccess(chunkGroupMetaData)) {
continue;
}
// s1, s2
for (ChunkMetaData chunkMetaData : chunkGroupMetaData.getChunkMetaDataList()) {
......@@ -229,90 +186,120 @@ public class MetadataQuerierByFileImpl implements MetadataQuerier {
// get all ChunkMetaData of this path included in all ChunkGroups of this device
List<ChunkMetaData> chunkMetaDataList = new ArrayList<>();
for (ChunkGroupMetaData chunkGroupMetaData : tsDeviceMetadata.getChunkGroupMetaDataList()) {
if (checkAccess(chunkGroupMetaData)) {
List<ChunkMetaData> chunkMetaDataListInOneChunkGroup = chunkGroupMetaData
.getChunkMetaDataList();
for (ChunkMetaData chunkMetaData : chunkMetaDataListInOneChunkGroup) {
if (path.getMeasurement().equals(chunkMetaData.getMeasurementUid())) {
chunkMetaData.setVersion(chunkGroupMetaData.getVersion());
chunkMetaDataList.add(chunkMetaData);
}
List<ChunkMetaData> chunkMetaDataListInOneChunkGroup = chunkGroupMetaData
.getChunkMetaDataList();
for (ChunkMetaData chunkMetaData : chunkMetaDataListInOneChunkGroup) {
if (path.getMeasurement().equals(chunkMetaData.getMeasurementUid())) {
chunkMetaData.setVersion(chunkGroupMetaData.getVersion());
chunkMetaDataList.add(chunkMetaData);
}
}
}
return chunkMetaDataList;
}
public ArrayList<TimeRange> getTimeRangeInOrPrev(List<Path> paths, LoadMode targetMode)
throws IOException {
if (mode == LoadMode.NoPartition) {
throw new IOException(
"Wrong use of getTimeRangeInOrPrev: should not be in NoPartition mode");
@Override
public List<TimeRange> convertSpace2TimePartition(List<Path> paths, long spacePartitionStartPos,
long spacePartitionEndPos) throws IOException {
if (spacePartitionStartPos > spacePartitionEndPos) {
throw new IllegalArgumentException(
"'spacePartitionStartPos' should not be larger than 'spacePartitionEndPos'.");
}
// change the mode temporarily to control loadChunkMetadata's checkAccess function
this.mode = targetMode;
// (1) get timeRangesInCandidates and timeRangesBeforeCandidates by iterating through the metadata
ArrayList<TimeRange> timeRangesInCandidates = new ArrayList<>();
ArrayList<TimeRange> timeRangesBeforeCandidates = new ArrayList<>();
ArrayList<TimeRange> unionCandidates = new ArrayList<>();
// group measurements by device
TreeMap<String, Set<String>> deviceMeasurementsMap = new TreeMap<>();
for (Path path : paths) {
List<ChunkMetaData> chunkMetaDataList = loadChunkMetadata(path);
for (ChunkMetaData chunkMetaData : chunkMetaDataList) {
unionCandidates
.add(new TimeRange(chunkMetaData.getStartTime(), chunkMetaData.getEndTime()));
if (!deviceMeasurementsMap.containsKey(path.getDevice())) {
deviceMeasurementsMap.put(path.getDevice(), new HashSet<>());
}
deviceMeasurementsMap.get(path.getDevice()).add(path.getMeasurement());
}
Collections.sort(unionCandidates);
for (Map.Entry<String, Set<String>> deviceMeasurements : deviceMeasurementsMap.entrySet()) {
String selectedDevice = deviceMeasurements.getKey();
Set<String> selectedMeasurements = deviceMeasurements.getValue();
// union
ArrayList<TimeRange> unionResult = new ArrayList<>();
Iterator<TimeRange> iterator = unionCandidates.iterator();
TimeRange range_curr = null;
TsDeviceMetadataIndex index = fileMetaData.getDeviceMetadataIndex(selectedDevice);
TsDeviceMetadata tsDeviceMetadata = tsFileReader.readTsDeviceMetaData(index);
if (!iterator.hasNext()) {
return unionResult;
} else {
TimeRange r = iterator.next();
range_curr = new TimeRange(r.getMin(), r.getMax());
for (ChunkGroupMetaData chunkGroupMetaData : tsDeviceMetadata
.getChunkGroupMetaDataList()) {
LocateStatus mode = checkLocateStatus(chunkGroupMetaData, spacePartitionStartPos,
spacePartitionEndPos);
if (mode == LocateStatus.after) {
continue;
}
for (ChunkMetaData chunkMetaData : chunkGroupMetaData.getChunkMetaDataList()) {
String currentMeasurement = chunkMetaData.getMeasurementUid();
if (selectedMeasurements.contains(currentMeasurement)) {
TimeRange timeRange = new TimeRange(chunkMetaData.getStartTime(),
chunkMetaData.getEndTime());
if (mode == LocateStatus.in) {
timeRangesInCandidates.add(timeRange);
} else {
timeRangesBeforeCandidates.add(timeRange);
}
}
}
}
}
while (iterator.hasNext()) {
TimeRange range_next = iterator.next();
if (range_curr.intersects(range_next)) {
range_curr.set(Math.min(range_curr.getMin(), range_next.getMin()),
Math.max(range_curr.getMax(), range_next.getMax()));
} else {
unionResult.add(new TimeRange(range_curr.getMin(), range_curr.getMax()));
range_curr.set(range_next.getMin(), range_next.getMax());
}
// (2) sort and merge the timeRangesInCandidates
ArrayList<TimeRange> timeRangesIn = new ArrayList<>(
TimeRange.sortAndMerge(timeRangesInCandidates));
if (timeRangesIn.isEmpty()) {
return Collections.emptyList(); // return an empty list
}
unionResult.add(new TimeRange(range_curr.getMin(), range_curr.getMax()));
this.mode = LoadMode.InPartition; // restore the mode to InPartition
return unionResult;
// (3) sort and merge the timeRangesBeforeCandidates
ArrayList<TimeRange> timeRangesBefore = new ArrayList<>(
TimeRange.sortAndMerge(timeRangesBeforeCandidates));
// (4) calculate the remaining time ranges
List<TimeRange> resTimeRanges = new ArrayList<>();
for (TimeRange in : timeRangesIn) {
ArrayList<TimeRange> remains = new ArrayList<>(in.getRemains(timeRangesBefore));
resTimeRanges.addAll(remains);
}
return resTimeRanges;
}
/**
* check if the given chunk group can be accessed under the current load mode
* @param chunkGroupMetaData a chunk group's metadata
* @return True if the chunk group can be accessed. False otherwise.
* @throws IOException illegal mode
* Check the location of a given chunkGroupMetaData with respect to a space partition constraint.
*
* @param chunkGroupMetaData the given chunkGroupMetaData
* @param spacePartitionStartPos the start position of the space partition
* @param spacePartitionEndPos the end position of the space partition
* @return LocateStatus
*/
private boolean checkAccess(ChunkGroupMetaData chunkGroupMetaData) throws IOException {
private LocateStatus checkLocateStatus(ChunkGroupMetaData chunkGroupMetaData,
long spacePartitionStartPos, long spacePartitionEndPos) {
long startOffsetOfChunkGroup = chunkGroupMetaData.getStartOffsetOfChunkGroup();
long endOffsetOfChunkGroup = chunkGroupMetaData.getEndOffsetOfChunkGroup();
long middleOffsetOfChunkGroup = (startOffsetOfChunkGroup + endOffsetOfChunkGroup) / 2;
switch (mode) {
case NoPartition:
return true; // always true
case InPartition:
return (partitionStartOffset <= middleOffsetOfChunkGroup
&& middleOffsetOfChunkGroup < partitionEndOffset);
case PrevPartition:
return (middleOffsetOfChunkGroup < partitionStartOffset);
default:
throw new IOException(
"unexpected mode! It should be one of {NoPartition, InPartition, BeforePartition}");
if (spacePartitionStartPos <= middleOffsetOfChunkGroup
&& middleOffsetOfChunkGroup < spacePartitionEndPos) {
return LocateStatus.in;
} else if (middleOffsetOfChunkGroup < spacePartitionStartPos) {
return LocateStatus.before;
} else {
return LocateStatus.after;
}
}
/**
* The location of a chunkGroupMetaData with respect to a space partition constraint.
*
* in - the middle point of the chunkGroupMetaData is located in the current space partition.
* before - the middle point of the chunkGroupMetaData is located before the current space
* partition. after - the middle point of the chunkGroupMetaData is located after the current
* space partition.
*/
private enum LocateStatus {
in, before, after
}
}
......@@ -20,6 +20,7 @@ package org.apache.iotdb.tsfile.read.query.executor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
import org.apache.iotdb.tsfile.exception.write.NoMeasurementException;
......@@ -29,7 +30,6 @@ import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.common.TimeRange;
import org.apache.iotdb.tsfile.read.controller.ChunkLoader;
import org.apache.iotdb.tsfile.read.controller.MetadataQuerier;
import org.apache.iotdb.tsfile.read.controller.MetadataQuerier.LoadMode;
import org.apache.iotdb.tsfile.read.expression.IExpression;
import org.apache.iotdb.tsfile.read.expression.QueryExpression;
import org.apache.iotdb.tsfile.read.expression.impl.BinaryExpression;
......@@ -54,62 +54,7 @@ public class TsFileExecutor implements QueryExecutor {
@Override
public QueryDataSet execute(QueryExpression queryExpression) throws IOException {
LoadMode mode = metadataQuerier.getLoadMode();
if (mode == LoadMode.PrevPartition) {
throw new IOException("Wrong use of PrevPartition mode.");
}
if (metadataQuerier.getLoadMode() == LoadMode.InPartition) {
// (1) get the sorted union covered time ranges of chunkGroups in the current partition
ArrayList<TimeRange> timeRangesIn = metadataQuerier
.getTimeRangeInOrPrev(queryExpression.getSelectedSeries(), LoadMode.InPartition);
// (2) check if null
if (timeRangesIn.size() == 0) {
return new DataSetWithoutTimeGenerator(new ArrayList<Path>(), new ArrayList<TSDataType>(),
new ArrayList<FileSeriesReader>()); // return empty QueryDataSet
}
// (3) get the sorted union covered time ranges of chunkGroups before the current partition
ArrayList<TimeRange> timeRangesPrev = metadataQuerier
.getTimeRangeInOrPrev(queryExpression.getSelectedSeries(), LoadMode.PrevPartition);
// (4) calculate the remaining time range
ArrayList<TimeRange> timeRangesRemains = new ArrayList<>();
for (TimeRange in : timeRangesIn) {
ArrayList<TimeRange> remains = new ArrayList<>(in.getRemains(timeRangesPrev));
timeRangesRemains.addAll(remains);
}
// (5) check if null
if (timeRangesRemains.size() == 0) {
return new DataSetWithoutTimeGenerator(new ArrayList<Path>(), new ArrayList<TSDataType>(),
new ArrayList<FileSeriesReader>()); // return empty QueryDataSet
}
// (6) add an additional global time filter based on the remaining time range
IExpression timeBound = timeRangesRemains.get(0).getExpression();
for (int i = 1; i < timeRangesRemains.size(); i++) {
timeBound = BinaryExpression
.or(timeBound, timeRangesRemains.get(i).getExpression());
}
if (queryExpression.hasQueryFilter()) {
IExpression timeBoundExpression = BinaryExpression
.and(queryExpression.getExpression(), timeBound);
queryExpression.setExpression(timeBoundExpression);
} else {
queryExpression.setExpression(timeBound);
}
// (7) with global time filters, we can now remove partition constraints
metadataQuerier.setLoadMode(LoadMode.NoPartition);
metadataQuerier.loadChunkMetaDatas(queryExpression.getSelectedSeries());
} else { // NoPartition mode
metadataQuerier.loadChunkMetaDatas(queryExpression.getSelectedSeries());
}
metadataQuerier.loadChunkMetaDatas(queryExpression.getSelectedSeries());
if (queryExpression.hasQueryFilter()) {
try {
IExpression expression = queryExpression.getExpression();
......@@ -136,6 +81,47 @@ public class TsFileExecutor implements QueryExecutor {
}
}
/**
* Query with the space partition constraint.
*
* @param queryExpression query expression
* @param spacePartitionStartPos the start position of the space partition
* @param spacePartitionEndPos the end position of the space partition
* @return QueryDataSet
*/
public QueryDataSet execute(QueryExpression queryExpression, long spacePartitionStartPos,
long spacePartitionEndPos) throws IOException {
// convert the space partition constraint to the time partition constraint
ArrayList<TimeRange> resTimeRanges = new ArrayList<>(metadataQuerier
.convertSpace2TimePartition(queryExpression.getSelectedSeries(), spacePartitionStartPos,
spacePartitionEndPos));
// check if resTimeRanges is empty
if (resTimeRanges.isEmpty()) {
return new DataSetWithoutTimeGenerator(Collections.emptyList(), Collections.emptyList(),
Collections.emptyList()); // return an empty QueryDataSet
}
// construct an additional time filter based on the time partition constraint
IExpression addTimeExpression = resTimeRanges.get(0).getExpression();
for (int i = 1; i < resTimeRanges.size(); i++) {
addTimeExpression = BinaryExpression
.or(addTimeExpression, resTimeRanges.get(i).getExpression());
}
// combine the original query expression and the additional time filter
if (queryExpression.hasQueryFilter()) {
IExpression combinedExpression = BinaryExpression
.and(queryExpression.getExpression(), addTimeExpression);
queryExpression.setExpression(combinedExpression);
} else {
queryExpression.setExpression(addTimeExpression);
}
// Having converted the space partition constraint to an additional time filter, we can now query as normal.
return execute(queryExpression);
}
/**
* no filter, can use multi-way merge.
*
......
......@@ -18,65 +18,102 @@
*/
package org.apache.iotdb.tsfile.read;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.iotdb.tsfile.common.constant.QueryConstant;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadata;
import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadataIndex;
import org.apache.iotdb.tsfile.file.metadata.TsFileMetaData;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.apache.iotdb.tsfile.read.common.TimeRange;
import org.apache.iotdb.tsfile.read.expression.ExpressionType;
import org.apache.iotdb.tsfile.read.expression.IExpression;
import org.apache.iotdb.tsfile.read.expression.QueryExpression;
import org.apache.iotdb.tsfile.read.expression.impl.BinaryExpression;
import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
import org.apache.iotdb.tsfile.read.expression.util.ExpressionOptimizer;
import org.apache.iotdb.tsfile.read.filter.TimeFilter;
import org.apache.iotdb.tsfile.read.filter.ValueFilter;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.iotdb.tsfile.utils.TsFileGeneratorForTest;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/*
This test is designed for the TsFileExecutor's execute(queryExpression, params) function.
The test target here is the logic of converting the query partition constraint to an additional time filter.
Note that the correctness of the constructed additional time filter, which is guaranteed and tested in
MetadataQuerierByFileImplTest and TimeRangeTest, is not the test focus here.
*/
public class ReadInPartitionTest {
private static final String FILE_PATH = TsFileGeneratorForTest.outputDataFile;
private TsFileSequenceReader reader;
private static ReadOnlyTsFile roTsFile = null;
private static final Logger LOG = LoggerFactory.getLogger(ReadInPartitionTest.class);
private ArrayList<TimeRange> d1s6timeRangeList = new ArrayList<>();
private ArrayList<TimeRange> d2s1timeRangeList = new ArrayList<>();
private ArrayList<long[]> d1chunkGroupMetaDataOffsetList = new ArrayList<>();
private ArrayList<long[]> d2chunkGroupMetaDataOffsetList = new ArrayList<>();
@Before
public void before() throws InterruptedException, WriteProcessException, IOException {
TsFileGeneratorForTest.generateFile(1000000, 1024 * 1024, 10000);
reader = new TsFileSequenceReader(FILE_PATH);
TsFileSequenceReader reader = new TsFileSequenceReader(FILE_PATH);
roTsFile = new ReadOnlyTsFile(reader);
LOG.info("file length: {}", new File(FILE_PATH).length());
LOG.info("file magic head: {}", reader.readHeadMagic());
LOG.info("file magic tail: {}", reader.readTailMagic());
LOG.info("Level 1 metadata position: {}", reader.getFileMetadataPos());
LOG.info("Level 1 metadata size: {}", reader.getFileMetadataPos());
// Because the size of the generated chunkGroupMetaData may differ under different test environments,
// we get metadata from the real-time generated TsFile instead of using a fixed parameter setting.
TsFileMetaData metaData = reader.readFileMetadata();
System.out.println("[Metadata]");
List<TsDeviceMetadataIndex> deviceMetadataIndexList = metaData.getDeviceMap().values().stream()
.sorted((x, y) -> (int) (x.getOffset() - y.getOffset())).collect(Collectors.toList());
for (TsDeviceMetadataIndex index : deviceMetadataIndexList) {
TsDeviceMetadata deviceMetadata = reader.readTsDeviceMetaData(index);
List<ChunkGroupMetaData> chunkGroupMetaDataList = deviceMetadata.getChunkGroupMetaDataList();
for (ChunkGroupMetaData chunkGroupMetaData : chunkGroupMetaDataList) {
LOG.info("t[Device]Device:{}", chunkGroupMetaData.getDeviceID());
LOG.info("chunkGroupMetaData.start:{}, end:{}",
chunkGroupMetaData.getStartOffsetOfChunkGroup(),
chunkGroupMetaData.getEndOffsetOfChunkGroup());
// for (ChunkMetaData chunkMetadata : chunkGroupMetaData.getChunkMetaDataList()) {
// System.out.println("\t\tMeasurement:" + chunkMetadata.getMeasurementUid());
// System.out.println("\t\tFile offset:" + chunkMetadata.getOffsetOfChunkHeader());
// }
TsDeviceMetadataIndex d1MetadataIndex = metaData.getDeviceMap().get("d1");
TsDeviceMetadataIndex d2MetadataIndex = metaData.getDeviceMap().get("d2");
TsDeviceMetadata d1Metadata = reader.readTsDeviceMetaData(d1MetadataIndex);
List<ChunkGroupMetaData> d1chunkGroupMetaDataList = d1Metadata.getChunkGroupMetaDataList();
for (ChunkGroupMetaData chunkGroupMetaData : d1chunkGroupMetaDataList) {
// get a series of [startOffsetOfChunkGroup, endOffsetOfChunkGroup] from the chunkGroupMetaData of d1
long[] chunkGroupMetaDataOffset = new long[2];
chunkGroupMetaDataOffset[0] = chunkGroupMetaData.getStartOffsetOfChunkGroup();
chunkGroupMetaDataOffset[1] = chunkGroupMetaData.getEndOffsetOfChunkGroup();
d1chunkGroupMetaDataOffsetList.add(chunkGroupMetaDataOffset);
List<ChunkMetaData> chunkMetaDataList = chunkGroupMetaData.getChunkMetaDataList();
for (ChunkMetaData chunkMetaData : chunkMetaDataList) {
if (chunkMetaData.getMeasurementUid().equals("s6")) {
// get a series of [startTime, endTime] of d1.s6 from the chunkGroupMetaData of d1
d1s6timeRangeList
.add(new TimeRange(chunkMetaData.getStartTime(), chunkMetaData.getEndTime()));
}
}
}
TsDeviceMetadata d2Metadata = reader.readTsDeviceMetaData(d2MetadataIndex);
List<ChunkGroupMetaData> d2chunkGroupMetaDataList = d2Metadata.getChunkGroupMetaDataList();
for (ChunkGroupMetaData chunkGroupMetaData : d2chunkGroupMetaDataList) {
// get a series of [startOffsetOfChunkGroup, endOffsetOfChunkGroup] from the chunkGroupMetaData of d2
long[] chunkGroupMetaDataOffset = new long[2];
chunkGroupMetaDataOffset[0] = chunkGroupMetaData.getStartOffsetOfChunkGroup();
chunkGroupMetaDataOffset[1] = chunkGroupMetaData.getEndOffsetOfChunkGroup();
d2chunkGroupMetaDataOffsetList.add(chunkGroupMetaDataOffset);
List<ChunkMetaData> chunkMetaDataList = chunkGroupMetaData.getChunkMetaDataList();
for (ChunkMetaData chunkMetaData : chunkMetaDataList) {
if (chunkMetaData.getMeasurementUid().equals("s1")) {
// get a series of [startTime, endTime] of d2.s1 from the chunkGroupMetaData of d1
d2s1timeRangeList
.add(new TimeRange(chunkMetaData.getStartTime(), chunkMetaData.getEndTime()));
}
}
}
}
......@@ -88,107 +125,116 @@ public class ReadInPartitionTest {
}
@Test
public void test1() throws IOException {
HashMap<String, Long> params = new HashMap<>();
params.put(QueryConstant.PARTITION_START_OFFSET, 0L);
params.put(QueryConstant.PARTITION_END_OFFSET, 603242L);
roTsFile = new ReadOnlyTsFile(reader, params);
public void test0() throws IOException {
ArrayList<Path> paths = new ArrayList<>();
paths.add(new Path("d1.s6"));
paths.add(new Path("d2.s1"));
QueryExpression queryExpression = QueryExpression.create(paths, null);
QueryDataSet queryDataSet = roTsFile.query(queryExpression);
QueryDataSet queryDataSet = roTsFile.query(queryExpression, 0L, 0L);
int cnt = 0;
while (queryDataSet.hasNext()) {
RowRecord r = queryDataSet.next();
cnt++;
if (cnt == 1) {
Assert.assertEquals("1480562618000\t0.0\t1", r.toString());
} else if (cnt == 9352) {
Assert.assertEquals("1480562664755\tnull\t467551", r.toString());
}
}
Assert.assertEquals(9353, cnt);
// test the transformed expression
Assert.assertNull(queryExpression.getExpression());
// test the equivalence of the query result
Assert.assertFalse(queryDataSet.hasNext());
}
@Test
public void test2() throws IOException {
HashMap<String, Long> params = new HashMap<>();
params.put(QueryConstant.PARTITION_START_OFFSET, 603242L);
params.put(QueryConstant.PARTITION_END_OFFSET, 993790L);
roTsFile = new ReadOnlyTsFile(reader, params);
public void test1() throws IOException, QueryFilterOptimizationException {
ArrayList<Path> paths = new ArrayList<>();
paths.add(new Path("d1.s6"));
paths.add(new Path("d2.s1"));
QueryExpression queryExpression = QueryExpression.create(paths, null);
QueryDataSet queryDataSet = roTsFile.query(queryExpression);
QueryDataSet queryDataSet = roTsFile
.query(queryExpression, d1chunkGroupMetaDataOffsetList.get(0)[0],
d1chunkGroupMetaDataOffsetList.get(0)[1]);
// get the transformed expression
IExpression transformedExpression = queryExpression.getExpression();
int cnt = 0;
while (queryDataSet.hasNext()) {
// test the transformed expression
Assert.assertEquals(ExpressionType.GLOBAL_TIME, transformedExpression.getType());
IExpression expectedTimeExpression = d1s6timeRangeList.get(0).getExpression();
String expected = ExpressionOptimizer.getInstance().optimize(expectedTimeExpression,
queryExpression.getSelectedSeries()).toString();
Assert.assertEquals(expected, transformedExpression.toString());
// test the equivalence of the query result:
QueryDataSet queryDataSet_eq = roTsFile.query(queryExpression);
while (queryDataSet.hasNext() && queryDataSet_eq.hasNext()) {
RowRecord r = queryDataSet.next();
cnt++;
if (cnt == 1) {
Assert.assertEquals("1480562664765\tnull\t467651", r.toString());
}
RowRecord r2 = queryDataSet_eq.next();
Assert.assertEquals(r2.toString(), r.toString());
}
Assert.assertEquals(1, cnt);
Assert.assertEquals(queryDataSet_eq.hasNext(), queryDataSet.hasNext());
}
@Test
public void test3() throws IOException {
HashMap<String, Long> params = new HashMap<>();
params.put(QueryConstant.PARTITION_START_OFFSET, 993790L);
params.put(QueryConstant.PARTITION_END_OFFSET, 1608255L);
roTsFile = new ReadOnlyTsFile(reader, params);
public void test2() throws IOException, QueryFilterOptimizationException {
ArrayList<Path> paths = new ArrayList<>();
paths.add(new Path("d1.s6"));
paths.add(new Path("d2.s1"));
QueryExpression queryExpression = QueryExpression.create(paths, null);
QueryDataSet queryDataSet = roTsFile.query(queryExpression);
int cnt = 0;
while (queryDataSet.hasNext()) {
IExpression expression = new GlobalTimeExpression(TimeFilter.gt(50L));
QueryExpression queryExpression = QueryExpression.create(paths, expression);
QueryDataSet queryDataSet = roTsFile
.query(queryExpression, d1chunkGroupMetaDataOffsetList.get(0)[0],
d1chunkGroupMetaDataOffsetList.get(0)[1]);
// get the transformed expression
IExpression transformedExpression = queryExpression.getExpression();
// test the transformed expression
Assert.assertEquals(ExpressionType.GLOBAL_TIME, transformedExpression.getType());
IExpression expectedTimeExpression = BinaryExpression
.and(expression, d1s6timeRangeList.get(0).getExpression());
String expected = ExpressionOptimizer.getInstance().optimize(expectedTimeExpression,
queryExpression.getSelectedSeries()).toString();
Assert.assertEquals(expected, transformedExpression.toString());
// test the equivalence of the query result:
QueryDataSet queryDataSet_eq = roTsFile.query(queryExpression);
while (queryDataSet.hasNext() && queryDataSet_eq.hasNext()) {
RowRecord r = queryDataSet.next();
cnt++;
if (cnt == 1) {
Assert.assertEquals("1480562664770\t5196.0\t467701", r.toString());
} else if (cnt == 9936) {
Assert.assertEquals("1480562711445\tnull\t934451", r.toString());
}
RowRecord r2 = queryDataSet_eq.next();
Assert.assertEquals(r2.toString(), r.toString());
}
Assert.assertEquals(9337, cnt);
Assert.assertEquals(queryDataSet_eq.hasNext(), queryDataSet.hasNext());
}
@Test
public void test4() throws IOException {
HashMap<String, Long> params = new HashMap<>();
params.put(QueryConstant.PARTITION_START_OFFSET, 1608255L);
params.put(QueryConstant.PARTITION_END_OFFSET, 1999353L);
roTsFile = new ReadOnlyTsFile(reader, params);
public void test3() throws IOException, QueryFilterOptimizationException {
ArrayList<Path> paths = new ArrayList<>();
paths.add(new Path("d1.s6"));
paths.add(new Path("d2.s1"));
QueryExpression queryExpression = QueryExpression.create(paths, null);
QueryDataSet queryDataSet = roTsFile.query(queryExpression);
int cnt = 0;
while (queryDataSet.hasNext()) {
Filter filter = ValueFilter.gt(10L);
IExpression expression = new SingleSeriesExpression(new Path("d1.s3"), filter);
QueryExpression queryExpression = QueryExpression.create(paths, expression);
QueryDataSet queryDataSet = roTsFile
.query(queryExpression, d1chunkGroupMetaDataOffsetList.get(0)[0],
d1chunkGroupMetaDataOffsetList.get(0)[1]);
// get the transformed expression
IExpression transformedExpression = queryExpression.getExpression();
// test the transformed expression
Assert.assertEquals(ExpressionType.SERIES, transformedExpression.getType());
IExpression expectedTimeExpression = BinaryExpression
.and(expression, d1s6timeRangeList.get(0).getExpression());
String expected = ExpressionOptimizer.getInstance().optimize(expectedTimeExpression,
queryExpression.getSelectedSeries()).toString();
Assert.assertEquals(expected, transformedExpression.toString());
// test the equivalence of the query result:
QueryDataSet queryDataSet_eq = roTsFile.query(queryExpression);
while (queryDataSet.hasNext() && queryDataSet_eq.hasNext()) {
RowRecord r = queryDataSet.next();
cnt++;
RowRecord r2 = queryDataSet_eq.next();
Assert.assertEquals(r2.toString(), r.toString());
}
Assert.assertEquals(0, cnt);
Assert.assertEquals(queryDataSet_eq.hasNext(), queryDataSet.hasNext());
}
}
......@@ -21,10 +21,28 @@ package org.apache.iotdb.tsfile.read.common;
import static org.junit.Assert.assertEquals;
import java.util.ArrayList;
import org.junit.Assert;
import org.junit.Test;
public class TimeRangeTest {
@Test
public void mergeTest() {
ArrayList<TimeRange> unionCandidates = new ArrayList<>();
unionCandidates.add(new TimeRange(0L, 10L));
unionCandidates.add(new TimeRange(3L, 10L));
unionCandidates.add(new TimeRange(100L, 200L));
unionCandidates.add(new TimeRange(20L, 30L));
unionCandidates.add(new TimeRange(5L, 6L));
ArrayList<TimeRange> res = new ArrayList<>(TimeRange.sortAndMerge(unionCandidates));
Assert.assertEquals(3, res.size());
Assert.assertEquals("[ 0 : 10 ]", res.get(0).toString());
Assert.assertEquals("[ 20 : 30 ]", res.get(1).toString());
Assert.assertEquals("[ 100 : 200 ]", res.get(2).toString());
}
@Test
/*
no overlap
......@@ -197,7 +215,7 @@ public class TimeRangeTest {
@Test
/*
more than one timerange in previous ranges
more than one time ranges in previous ranges
*/
public void getRemainsTest9() {
TimeRange r = new TimeRange(1, 10);
......@@ -222,4 +240,26 @@ public class TimeRangeTest {
assertEquals(remainRanges.get(2).getRightClose(), true);
}
@Test
/*
more than one time ranges in previous ranges
*/
public void getRemainsTest10() {
TimeRange r = new TimeRange(1, 10);
ArrayList<TimeRange> prevRanges = new ArrayList<>();
prevRanges.add(new TimeRange(3, 4));
prevRanges.add(new TimeRange(11, 20));
ArrayList<TimeRange> remainRanges = new ArrayList<>(r.getRemains(prevRanges));
assertEquals(2, remainRanges.size());
assertEquals(remainRanges.get(0).getMin(), 1);
assertEquals(remainRanges.get(0).getMax(), 3);
assertEquals(remainRanges.get(0).getLeftClose(), true);
assertEquals(remainRanges.get(0).getRightClose(), false);
assertEquals(remainRanges.get(1).getMin(), 4);
assertEquals(remainRanges.get(1).getMax(), 11); // NOTE here is the technical detail.
assertEquals(remainRanges.get(1).getLeftClose(), false);
assertEquals(remainRanges.get(1).getRightClose(), false);
}
}
......@@ -20,15 +20,16 @@ package org.apache.iotdb.tsfile.read.controller;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import org.apache.iotdb.tsfile.common.constant.QueryConstant;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadata;
import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadataIndex;
import org.apache.iotdb.tsfile.file.metadata.TsFileMetaData;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.common.TimeRange;
import org.apache.iotdb.tsfile.read.controller.MetadataQuerier.LoadMode;
import org.apache.iotdb.tsfile.utils.TsFileGeneratorForTest;
import org.junit.After;
import org.junit.Assert;
......@@ -38,86 +39,127 @@ import org.junit.Test;
public class MetadataQuerierByFileImplTest {
private static final String FILE_PATH = TsFileGeneratorForTest.outputDataFile;
private TsFileSequenceReader fileReader;
private TsFileSequenceReader reader;
private ArrayList<TimeRange> d1s6timeRangeList = new ArrayList<>();
private ArrayList<TimeRange> d2s1timeRangeList = new ArrayList<>();
private ArrayList<long[]> d1chunkGroupMetaDataOffsetList = new ArrayList<>();
private ArrayList<long[]> d2chunkGroupMetaDataOffsetList = new ArrayList<>();
@Before
public void before() throws InterruptedException, WriteProcessException, IOException {
TsFileGeneratorForTest.generateFile(1000000, 1024 * 1024, 10000);
reader = new TsFileSequenceReader(FILE_PATH);
// Because the size of the generated chunkGroupMetaData may differ under different test environments,
// we get metadata from the real-time generated TsFile instead of using a fixed parameter setting.
TsFileMetaData metaData = reader.readFileMetadata();
TsDeviceMetadataIndex d1MetadataIndex = metaData.getDeviceMap().get("d1");
TsDeviceMetadataIndex d2MetadataIndex = metaData.getDeviceMap().get("d2");
TsDeviceMetadata d1Metadata = reader.readTsDeviceMetaData(d1MetadataIndex);
List<ChunkGroupMetaData> d1chunkGroupMetaDataList = d1Metadata.getChunkGroupMetaDataList();
for (ChunkGroupMetaData chunkGroupMetaData : d1chunkGroupMetaDataList) {
// get a series of [startOffsetOfChunkGroup, endOffsetOfChunkGroup] from the chunkGroupMetaData of d1
long[] chunkGroupMetaDataOffset = new long[2];
chunkGroupMetaDataOffset[0] = chunkGroupMetaData.getStartOffsetOfChunkGroup();
chunkGroupMetaDataOffset[1] = chunkGroupMetaData.getEndOffsetOfChunkGroup();
d1chunkGroupMetaDataOffsetList.add(chunkGroupMetaDataOffset);
List<ChunkMetaData> chunkMetaDataList = chunkGroupMetaData.getChunkMetaDataList();
for (ChunkMetaData chunkMetaData : chunkMetaDataList) {
if (chunkMetaData.getMeasurementUid().equals("s6")) {
// get a series of [startTime, endTime] of d1.s6 from the chunkGroupMetaData of d1
d1s6timeRangeList
.add(new TimeRange(chunkMetaData.getStartTime(), chunkMetaData.getEndTime()));
}
}
}
TsDeviceMetadata d2Metadata = reader.readTsDeviceMetaData(d2MetadataIndex);
List<ChunkGroupMetaData> d2chunkGroupMetaDataList = d2Metadata.getChunkGroupMetaDataList();
for (ChunkGroupMetaData chunkGroupMetaData : d2chunkGroupMetaDataList) {
// get a series of [startOffsetOfChunkGroup, endOffsetOfChunkGroup] from the chunkGroupMetaData of d2
long[] chunkGroupMetaDataOffset = new long[2];
chunkGroupMetaDataOffset[0] = chunkGroupMetaData.getStartOffsetOfChunkGroup();
chunkGroupMetaDataOffset[1] = chunkGroupMetaData.getEndOffsetOfChunkGroup();
d2chunkGroupMetaDataOffsetList.add(chunkGroupMetaDataOffset);
List<ChunkMetaData> chunkMetaDataList = chunkGroupMetaData.getChunkMetaDataList();
for (ChunkMetaData chunkMetaData : chunkMetaDataList) {
if (chunkMetaData.getMeasurementUid().equals("s1")) {
// get a series of [startTime, endTime] of d2.s1 from the chunkGroupMetaData of d1
d2s1timeRangeList
.add(new TimeRange(chunkMetaData.getStartTime(), chunkMetaData.getEndTime()));
}
}
}
}
@After
public void after() throws IOException {
fileReader.close();
reader.close();
TsFileGeneratorForTest.after();
}
@Test
public void test_NoPartition() throws IOException {
fileReader = new TsFileSequenceReader(FILE_PATH);
MetadataQuerierByFileImpl metadataQuerierByFile = new MetadataQuerierByFileImpl(fileReader);
List<ChunkMetaData> chunkMetaDataList = metadataQuerierByFile
.getChunkMetaDataList(new Path("d2.s1"));
for (ChunkMetaData chunkMetaData : chunkMetaDataList) {
Assert.assertEquals("s1", chunkMetaData.getMeasurementUid());
}
}
public void testEmpty() throws IOException {
MetadataQuerierByFileImpl metadataQuerierByFile = new MetadataQuerierByFileImpl(reader);
@Test
public void test_InPartition() throws IOException {
fileReader = new TsFileSequenceReader(FILE_PATH);
HashMap<String, Long> params = new HashMap<>();
params.put(QueryConstant.PARTITION_START_OFFSET, 12L);
params.put(QueryConstant.PARTITION_END_OFFSET, 1999000L);
MetadataQuerierByFileImpl metadataQuerierByFile = new MetadataQuerierByFileImpl(fileReader,
params);
List<ChunkMetaData> chunkMetaDataList = metadataQuerierByFile
.getChunkMetaDataList(new Path("d2.s1"));
Assert.assertEquals(2, chunkMetaDataList.size());
// NOTE different systems have different exact split points.
// Therefore specific start and end time are not tested.
ArrayList<Path> paths = new ArrayList<>();
paths.add(new Path("d1.s6"));
paths.add(new Path("d2.s1"));
ArrayList<TimeRange> resTimeRanges = new ArrayList<>(metadataQuerierByFile
.convertSpace2TimePartition(paths, 0L, 0L));
Assert.assertEquals(0, resTimeRanges.size());
}
@Test
public void test_getTimeRangeInPartition() throws IOException {
fileReader = new TsFileSequenceReader(FILE_PATH);
HashMap<String, Long> params = new HashMap<>();
params.put(QueryConstant.PARTITION_START_OFFSET, 1608255L);
params.put(QueryConstant.PARTITION_END_OFFSET, 3006837L);
public void testConvert1() throws IOException {
MetadataQuerierByFileImpl metadataQuerierByFile = new MetadataQuerierByFileImpl(reader);
MetadataQuerierByFileImpl metadataQuerierByFile = new MetadataQuerierByFileImpl(fileReader,
params);
ArrayList<Path> paths = new ArrayList<>();
paths.add(new Path("d1.s6"));
paths.add(new Path("d2.s1"));
ArrayList<TimeRange> timeRanges = metadataQuerierByFile
.getTimeRangeInOrPrev(paths, LoadMode.InPartition);
Assert.assertEquals(2, timeRanges.size());
// NOTE different systems have different exact split points.
// Therefore specific start and end time are not tested.
long spacePartitionStartPos = d1chunkGroupMetaDataOffsetList.get(0)[0];
long spacePartitionEndPos = d1chunkGroupMetaDataOffsetList.get(1)[1];
ArrayList<TimeRange> resTimeRanges = new ArrayList<>(metadataQuerierByFile
.convertSpace2TimePartition(paths, spacePartitionStartPos, spacePartitionEndPos));
ArrayList<TimeRange> unionCandidates = new ArrayList<>();
unionCandidates.add(d1s6timeRangeList.get(0));
unionCandidates.add(d2s1timeRangeList.get(0));
unionCandidates.add(d1s6timeRangeList.get(1));
ArrayList<TimeRange> expectedRanges = new ArrayList<>(TimeRange.sortAndMerge(unionCandidates));
Assert.assertEquals(expectedRanges.toString(), resTimeRanges.toString());
}
@Test
public void test_getTimeRangePrePartition() throws IOException {
fileReader = new TsFileSequenceReader(FILE_PATH);
public void testConvert2() throws IOException {
MetadataQuerierByFileImpl metadataQuerierByFile = new MetadataQuerierByFileImpl(reader);
HashMap<String, Long> params = new HashMap<>();
params.put(QueryConstant.PARTITION_START_OFFSET, 1608255L);
params.put(QueryConstant.PARTITION_END_OFFSET, 3006837L);
MetadataQuerierByFileImpl metadataQuerierByFile = new MetadataQuerierByFileImpl(fileReader,
params);
ArrayList<Path> paths = new ArrayList<>();
paths.add(new Path("d1.s6"));
paths.add(new Path("d2.s1"));
ArrayList<TimeRange> timeRanges = metadataQuerierByFile
.getTimeRangeInOrPrev(paths, LoadMode.PrevPartition);
Assert.assertEquals(2, timeRanges.size());
// NOTE different systems have different exact split points.
// Therefore specific start and end time are not tested.
}
long spacePartitionStartPos = d2chunkGroupMetaDataOffsetList.get(0)[0];
long spacePartitionEndPos = d2chunkGroupMetaDataOffsetList.get(0)[1];
ArrayList<TimeRange> resTimeRanges = new ArrayList<>(metadataQuerierByFile
.convertSpace2TimePartition(paths, spacePartitionStartPos, spacePartitionEndPos));
ArrayList<TimeRange> inCandidates = new ArrayList<>();
ArrayList<TimeRange> beforeCandidates = new ArrayList<>();
inCandidates.add(d2s1timeRangeList.get(0));
beforeCandidates.add(d1s6timeRangeList.get(0));
ArrayList<TimeRange> expectedRanges = new ArrayList<>();
for (TimeRange in : inCandidates) {
ArrayList<TimeRange> remains = new ArrayList<>(in.getRemains(beforeCandidates));
expectedRanges.addAll(remains);
}
Assert.assertEquals(expectedRanges.toString(), resTimeRanges.toString());
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册