未验证 提交 f06460bf 编写于 作者: W wshao08 提交者: GitHub

Switch to use new TsFile interfaces in Last query executor (#1048)

* Switch to use new TimeseriesMetadata in Last query executor
上级 3191fc4b
......@@ -54,7 +54,7 @@ public class GroupByFillDataSet extends QueryDataSet {
this.groupByEngineDataSet = groupByEngineDataSet;
this.fillTypes = fillTypes;
initPreviousParis(context, groupByFillPlan);
initLastTimeArray(context);
initLastTimeArray(context, groupByFillPlan);
}
private void initPreviousParis(QueryContext context, GroupByFillPlan groupByFillPlan)
......@@ -75,13 +75,14 @@ public class GroupByFillDataSet extends QueryDataSet {
}
}
private void initLastTimeArray(QueryContext context)
private void initLastTimeArray(QueryContext context, GroupByFillPlan groupByFillPlan)
throws IOException, StorageEngineException, QueryProcessException {
lastTimeArray = new long[paths.size()];
Arrays.fill(lastTimeArray, Long.MAX_VALUE);
for (int i = 0; i < paths.size(); i++) {
TimeValuePair lastTimeValuePair =
LastQueryExecutor.calculateLastPairForOneSeries(paths.get(i), dataTypes.get(i), context);
TimeValuePair lastTimeValuePair = LastQueryExecutor.calculateLastPairForOneSeries(
paths.get(i), dataTypes.get(i), context,
groupByFillPlan.getAllMeasurementsInDevice(paths.get(i).getDevice()));
if (lastTimeValuePair.getValue() != null) {
lastTimeArray[i] = lastTimeValuePair.getTimestamp();
}
......
......@@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.executor;
import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_VALUE;
import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TIMESERIES;
import java.util.Set;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.StorageEngineException;
......@@ -35,6 +36,7 @@ import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.db.query.dataset.ListDataSet;
import org.apache.iotdb.db.utils.FileLoaderUtils;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.TimeValuePair;
......@@ -70,7 +72,7 @@ public class LastQueryExecutor {
*
* @param context query context
*/
public QueryDataSet execute(QueryContext context)
public QueryDataSet execute(QueryContext context, LastQueryPlan lastQueryPlan)
throws StorageEngineException, IOException, QueryProcessException {
ListDataSet dataSet = new ListDataSet(
......@@ -78,8 +80,9 @@ public class LastQueryExecutor {
Arrays.asList(TSDataType.TEXT, TSDataType.TEXT));
for (int i = 0; i < selectedSeries.size(); i++) {
TimeValuePair lastTimeValuePair =
calculateLastPairForOneSeries(selectedSeries.get(i), dataTypes.get(i), context);
TimeValuePair lastTimeValuePair = calculateLastPairForOneSeries(
selectedSeries.get(i), dataTypes.get(i), context,
lastQueryPlan.getAllMeasurementsInDevice(selectedSeries.get(i).getDevice()));
if (lastTimeValuePair.getValue() != null) {
RowRecord resultRecord = new RowRecord(lastTimeValuePair.getTimestamp());
Field pathField = new Field(TSDataType.TEXT);
......@@ -104,7 +107,7 @@ public class LastQueryExecutor {
* @return TimeValuePair
*/
public static TimeValuePair calculateLastPairForOneSeries(
Path seriesPath, TSDataType tsDataType, QueryContext context)
Path seriesPath, TSDataType tsDataType, QueryContext context, Set<String> sensors)
throws IOException, QueryProcessException, StorageEngineException {
// Retrieve last value from MNode
......@@ -128,14 +131,27 @@ public class LastQueryExecutor {
if (!seqFileResources.isEmpty()) {
for (int i = seqFileResources.size() - 1; i >= 0; i--) {
List<ChunkMetadata> chunkMetadata = FileLoaderUtils.loadChunkMetadataFromTsFileResource(
seqFileResources.get(i), seriesPath, context);
if (!chunkMetadata.isEmpty()) {
ChunkMetadata lastChunkMetaData = chunkMetadata.get(chunkMetadata.size() - 1);
Statistics chunkStatistics = lastChunkMetaData.getStatistics();
resultPair = constructLastPair(
chunkStatistics.getEndTime(), chunkStatistics.getLastValue(), tsDataType);
break;
TimeseriesMetadata timeseriesMetadata = FileLoaderUtils.loadTimeSeriesMetadata(
seqFileResources.get(i), seriesPath, context, null, sensors);
if (timeseriesMetadata != null) {
if (timeseriesMetadata.getStatistics().canUseStatistics()) {
Statistics timeseriesMetadataStats = timeseriesMetadata.getStatistics();
resultPair = constructLastPair(
timeseriesMetadataStats.getEndTime(),
timeseriesMetadataStats.getLastValue(),
tsDataType);
break;
} else {
List<ChunkMetadata> chunkMetadataList = timeseriesMetadata.loadChunkMetadataList();
if (!chunkMetadataList.isEmpty()) {
ChunkMetadata lastChunkMetaData = chunkMetadataList.get(chunkMetadataList.size() - 1);
Statistics chunkStatistics = lastChunkMetaData.getStatistics();
resultPair =
constructLastPair(
chunkStatistics.getEndTime(), chunkStatistics.getLastValue(), tsDataType);
break;
}
}
}
}
}
......@@ -145,9 +161,9 @@ public class LastQueryExecutor {
if (resource.getEndTimeMap().get(seriesPath.getDevice()) < resultPair.getTimestamp()) {
continue;
}
List<ChunkMetadata> chunkMetadata =
FileLoaderUtils.loadChunkMetadataFromTsFileResource(resource, seriesPath, context);
for (ChunkMetadata chunkMetaData : chunkMetadata) {
TimeseriesMetadata timeseriesMetadata =
FileLoaderUtils.loadTimeSeriesMetadata(resource, seriesPath, context, null, sensors);
for (ChunkMetadata chunkMetaData : timeseriesMetadata.loadChunkMetadataList()) {
if (chunkMetaData.getEndTime() == resultPair.getTimestamp()
&& chunkMetaData.getVersion() > version) {
Statistics chunkStatistics = chunkMetaData.getStatistics();
......
......@@ -185,7 +185,7 @@ public class QueryRouter implements IQueryRouter {
public QueryDataSet lastQuery(LastQueryPlan lastQueryPlan, QueryContext context)
throws StorageEngineException, QueryProcessException, IOException {
LastQueryExecutor lastQueryExecutor = new LastQueryExecutor(lastQueryPlan);
return lastQueryExecutor.execute(context);
return lastQueryExecutor.execute(context, lastQueryPlan);
}
}
......@@ -147,8 +147,7 @@ public class PreviousFill extends IFill {
timeseriesMetadata.getStatistics().getLastValue(),
dataType);
} else {
List<ChunkMetadata> seqChunkMetadataList =
FileLoaderUtils.loadChunkMetadataList(timeseriesMetadata);
List<ChunkMetadata> seqChunkMetadataList = timeseriesMetadata.loadChunkMetadataList();
for (int i = seqChunkMetadataList.size() - 1; i >= 0; i--) {
lastPoint = getChunkLastPoint(seqChunkMetadataList.get(i));
......
......@@ -159,50 +159,6 @@ public class FileLoaderUtils {
return chunkReader.loadPageReaderList();
}
/**
* load all ChunkMetadatas belong to the seriesPath
*/
public static List<ChunkMetadata> loadChunkMetadataFromTsFileResource(
TsFileResource resource, Path seriesPath, QueryContext context) throws IOException {
List<ChunkMetadata> chunkMetadataList;
if (resource == null) {
return new ArrayList<>();
}
if (resource.isClosed()) {
chunkMetadataList = ChunkMetadataCache.getInstance().get(resource.getPath(), seriesPath);
} else {
chunkMetadataList = resource.getChunkMetadataList();
}
List<Modification> pathModifications =
context.getPathModifications(resource.getModFile(), seriesPath.getFullPath());
if (!pathModifications.isEmpty()) {
QueryUtils.modifyChunkMetaData(chunkMetadataList, pathModifications);
}
TsFileSequenceReader tsFileSequenceReader =
FileReaderManager.getInstance().get(resource.getPath(), resource.isClosed());
for (ChunkMetadata data : chunkMetadataList) {
data.setChunkLoader(new DiskChunkLoader(tsFileSequenceReader));
}
List<ReadOnlyMemChunk> memChunks = resource.getReadOnlyMemChunk();
if (memChunks != null) {
for (ReadOnlyMemChunk readOnlyMemChunk : memChunks) {
if (!memChunks.isEmpty()) {
chunkMetadataList.add(readOnlyMemChunk.getChunkMetaData());
}
}
}
/*
* remove empty or invalid chunk metadata
*/
chunkMetadataList.removeIf(chunkMetaData -> (
chunkMetaData.getStartTime() > chunkMetaData.getEndTime()));
return chunkMetadataList;
}
public static List<ChunkMetadata> getChunkMetadataList(Path path, String filePath) throws IOException {
TsFileSequenceReader tsFileReader = FileReaderManager.getInstance().get(filePath, true);
return tsFileReader.getChunkMetadataList(path);
......
......@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.integration;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.metadata.mnode.LeafMNode;
import org.apache.iotdb.db.metadata.mnode.MNode;
......@@ -96,15 +97,15 @@ public class IoTDBLastIT {
}
@Test
public void lastCacheTest() throws SQLException {
String[] retArray1 =
public void lastCacheUpdateTest() throws SQLException, MetadataException {
String[] retArray =
new String[] {
"500,root.ln.wf01.wt01.temperature,22.1",
"500,root.ln.wf01.wt01.status,false",
"500,root.ln.wf01.wt01.id,5"
};
String[] retArray2 =
new String[] {
"500,root.ln.wf01.wt01.id,5",
"700,root.ln.wf01.wt01.temperature,33.1",
"700,root.ln.wf01.wt01.status,false",
"700,root.ln.wf01.wt01.id,3",
"700,root.ln.wf01.wt01.temperature,33.1",
"700,root.ln.wf01.wt01.status,false",
"700,root.ln.wf01.wt01.id,3"
......@@ -124,16 +125,17 @@ public class IoTDBLastIT {
String ans = resultSet.getString(TIMESTAMP_STR) + ","
+ resultSet.getString(TIMESEIRES_STR) + ","
+ resultSet.getString(VALUE_STR);
Assert.assertEquals(retArray1[cnt], ans);
Assert.assertEquals(retArray[cnt], ans);
cnt++;
}
}
LeafMNode node = (LeafMNode) MManager.getInstance().getNodeByPath("root.ln.wf01.wt01.temperature");
LeafMNode node =
(LeafMNode) MManager.getInstance().getNodeByPath("root.ln.wf01.wt01.temperature");
node.resetCache();
statement.execute(
"insert into root.ln.wf01.wt01(time, temperature, status, id) values(700, 33.1, false, 3)");
"insert into root.ln.wf01.wt01(time, temperature, status, id) values(700, 33.1, false, 3)");
// Last cache is updated with above insert sql
long time = node.getCachedLast().getTimestamp();
......@@ -141,13 +143,12 @@ public class IoTDBLastIT {
hasResultSet = statement.execute("select last temperature,status,id from root.ln.wf01.wt01");
Assert.assertTrue(hasResultSet);
cnt = 0;
try (ResultSet resultSet = statement.getResultSet()) {
while (resultSet.next()) {
String ans = resultSet.getString(TIMESTAMP_STR) + ","
+ resultSet.getString(TIMESEIRES_STR) + ","
+ resultSet.getString(VALUE_STR);
Assert.assertEquals(retArray2[cnt], ans);
Assert.assertEquals(retArray[cnt], ans);
cnt++;
}
}
......@@ -161,33 +162,26 @@ public class IoTDBLastIT {
hasResultSet = statement.execute("select last temperature,status,id from root.ln.wf01.wt01");
Assert.assertTrue(hasResultSet);
cnt = 0;
try (ResultSet resultSet = statement.getResultSet()) {
while (resultSet.next()) {
String ans = resultSet.getString(TIMESTAMP_STR) + ","
+ resultSet.getString(TIMESEIRES_STR) + ","
+ resultSet.getString(VALUE_STR);
Assert.assertEquals(retArray2[cnt], ans);
Assert.assertEquals(retArray[cnt], ans);
cnt++;
}
}
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
Assert.assertEquals(cnt, retArray.length);
}
}
@Test
public void lastWithUnSeqFilesTest() {
String[] retArray1 =
public void lastWithUnSeqFilesTest() throws SQLException, MetadataException {
String[] retArray =
new String[] {
"500,root.ln.wf01.wt02.temperature,15.7",
"500,root.ln.wf01.wt02.status,false",
"500,root.ln.wf01.wt02.id,9"
};
String[] retArray2 =
new String[] {
"500,root.ln.wf01.wt02.id,9",
"600,root.ln.wf01.wt02.temperature,10.2",
"600,root.ln.wf01.wt02.status,false",
"600,root.ln.wf01.wt02.id,6"
......@@ -212,7 +206,7 @@ public class IoTDBLastIT {
resultSet.getString(TIMESTAMP_STR) + ","
+ resultSet.getString(TIMESEIRES_STR) + ","
+ resultSet.getString(VALUE_STR);
Assert.assertEquals(retArray1[cnt], ans);
Assert.assertEquals(retArray[cnt], ans);
cnt++;
}
}
......@@ -224,24 +218,21 @@ public class IoTDBLastIT {
statement.execute("flush");
hasResultSet = statement.execute(
"select last temperature,status,id from root.ln.wf01.wt02");
cnt = 0;
try (ResultSet resultSet = statement.getResultSet()) {
while (resultSet.next()) {
String ans = resultSet.getString(TIMESTAMP_STR) + ","
+ resultSet.getString(TIMESEIRES_STR) + ","
+ resultSet.getString(VALUE_STR);
Assert.assertEquals(retArray2[cnt], ans);
Assert.assertEquals(retArray[cnt], ans);
cnt++;
}
}
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
Assert.assertEquals(cnt, retArray.length);
}
}
@Test
public void lastWithEmptyChunkMetadataTest() throws SQLException {
public void lastWithEmptyChunkMetadataTest() throws SQLException, MetadataException {
String[] retArray =
new String[] {
"300,root.ln.wf01.wt03.temperature,23.1",
......@@ -274,10 +265,6 @@ public class IoTDBLastIT {
}
Assert.assertEquals(cnt, 1);
}
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册