提交 8a23a21a 编写于 作者: L Lei Rui 提交者: Jialin Qiao

add more partition tests for spark-tsfile-connector;format code (#383)

上级 906dc900
......@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
......@@ -18,29 +18,30 @@
*/
package org.apache.iotdb.tsfile.qp;
import org.apache.iotdb.tsfile.read.ReadOnlyTsFile;
import org.apache.iotdb.tsfile.read.expression.QueryExpression;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.iotdb.tsfile.read.ReadOnlyTsFile;
import org.apache.iotdb.tsfile.read.expression.QueryExpression;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
/**
* This class used to execute Queries on TSFile
*/
public class Executor {
public static List<QueryDataSet> query(ReadOnlyTsFile reader, List<QueryExpression> queryExpressions, long start, long end) {
List<QueryDataSet> dataSets = new ArrayList<>();
try {
for(QueryExpression expression: queryExpressions) {
QueryDataSet queryDataSet = reader.query(expression, start, end);
dataSets.add(queryDataSet);
}
} catch (IOException e) {
e.printStackTrace();
}
return dataSets;
public static List<QueryDataSet> query(ReadOnlyTsFile reader,
List<QueryExpression> queryExpressions, long start, long end) {
List<QueryDataSet> dataSets = new ArrayList<>();
try {
for (QueryExpression expression : queryExpressions) {
QueryDataSet queryDataSet = reader.query(expression, start, end);
dataSets.add(queryDataSet);
}
} catch (IOException e) {
e.printStackTrace();
}
return dataSets;
}
}
......@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
......@@ -18,6 +18,11 @@
*/
package org.apache.iotdb.tsfile.qp;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.iotdb.tsfile.qp.common.FilterOperator;
import org.apache.iotdb.tsfile.qp.common.SQLConstant;
import org.apache.iotdb.tsfile.qp.common.SingleQuery;
......@@ -30,161 +35,159 @@ import org.apache.iotdb.tsfile.qp.optimizer.PhysicalOptimizer;
import org.apache.iotdb.tsfile.qp.optimizer.RemoveNotOptimizer;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* This class is used to convert information given by sparkSQL to construct TSFile's query plans.
* For TSFile's schema differ from SparkSQL's table schema
* e.g.
* TSFile's SQL: select s1,s2 from root.car.d1 where s1 = 10
* SparkSQL's SQL: select s1,s2 from XXX where delta_object = d1
* For TSFile's schema differ from SparkSQL's table schema e.g. TSFile's SQL: select s1,s2 from
* root.car.d1 where s1 = 10 SparkSQL's SQL: select s1,s2 from XXX where delta_object = d1
*/
public class QueryProcessor {
//construct logical query plans first, then convert them to physical ones
public List<TSQueryPlan> generatePlans(FilterOperator filter, List<String> paths, List<String> columnNames,
TsFileSequenceReader in, Long start, Long end) throws QueryProcessorException, IOException {
List<TSQueryPlan> queryPlans = new ArrayList<>();
//construct logical query plans first, then convert them to physical ones
public List<TSQueryPlan> generatePlans(FilterOperator filter, List<String> paths,
List<String> columnNames, TsFileSequenceReader in, Long start, Long end) throws
QueryProcessorException, IOException {
if (filter != null) {
RemoveNotOptimizer removeNot = new RemoveNotOptimizer();
filter = removeNot.optimize(filter);
List<TSQueryPlan> queryPlans = new ArrayList<>();
DNFFilterOptimizer dnf = new DNFFilterOptimizer();
filter = dnf.optimize(filter);
if (filter != null) {
RemoveNotOptimizer removeNot = new RemoveNotOptimizer();
filter = removeNot.optimize(filter);
// merge different query path
// e.g. or (sensor_1 > 20, sensor_1 <10, sensor_2 > 10) => or (or (sensor_1 > 20, sensor_1 < 10), sensor_2 > 10)
MergeSingleFilterOptimizer merge = new MergeSingleFilterOptimizer();
filter = merge.optimize(filter);
DNFFilterOptimizer dnf = new DNFFilterOptimizer();
filter = dnf.optimize(filter);
List<FilterOperator> filterOperators = splitFilter(filter);
// merge different query path
// e.g. or (sensor_1 > 20, sensor_1 <10, sensor_2 > 10)
// => or (or (sensor_1 > 20, sensor_1 < 10), sensor_2 > 10)
MergeSingleFilterOptimizer merge = new MergeSingleFilterOptimizer();
filter = merge.optimize(filter);
for (FilterOperator filterOperator : filterOperators) {
SingleQuery singleQuery = constructSelectPlan(filterOperator, columnNames);
if (singleQuery != null) {
queryPlans.addAll(new PhysicalOptimizer(columnNames).optimize(singleQuery, paths, in, start, end));
}
}
} else {
queryPlans.addAll(new PhysicalOptimizer(columnNames).optimize(null, paths, in, start, end));
}
// merge query plan
Map<List<String>, List<TSQueryPlan>> pathMap = new HashMap<>();
for (TSQueryPlan tsQueryPlan : queryPlans) {
if (pathMap.containsKey(tsQueryPlan.getPaths())) {
pathMap.get(tsQueryPlan.getPaths()).add(tsQueryPlan);
} else {
List<TSQueryPlan> plans = new ArrayList<>();
plans.add(tsQueryPlan);
pathMap.put(tsQueryPlan.getPaths(), plans);
}
}
List<FilterOperator> filterOperators = splitFilter(filter);
queryPlans.clear();
for (List<TSQueryPlan> plans : pathMap.values()) {
TSQueryPlan mergePlan = null;
for (TSQueryPlan plan : plans) {
if (mergePlan == null) {
mergePlan = plan;
} else {
FilterOperator timeFilterOperator = new FilterOperator(SQLConstant.KW_OR);
List<FilterOperator> timeFilterChildren = new ArrayList<>();
timeFilterChildren.add(mergePlan.getTimeFilterOperator());
timeFilterChildren.add(plan.getTimeFilterOperator());
timeFilterOperator.setChildrenList(timeFilterChildren);
mergePlan.setTimeFilterOperator(timeFilterOperator);
FilterOperator valueFilterOperator = new FilterOperator(SQLConstant.KW_OR);
List<FilterOperator> valueFilterChildren = new ArrayList<>();
valueFilterChildren.add(mergePlan.getValueFilterOperator());
valueFilterChildren.add(plan.getValueFilterOperator());
valueFilterOperator.setChildrenList(valueFilterChildren);
mergePlan.setValueFilterOperator(valueFilterOperator);
}
}
queryPlans.add(mergePlan);
for (FilterOperator filterOperator : filterOperators) {
SingleQuery singleQuery = constructSelectPlan(filterOperator, columnNames);
if (singleQuery != null) {
queryPlans.addAll(new PhysicalOptimizer(columnNames).optimize(singleQuery,
paths, in, start, end));
}
return queryPlans;
}
} else {
queryPlans.addAll(new PhysicalOptimizer(columnNames).optimize(null,
paths, in, start, end));
}
private List<FilterOperator> splitFilter(FilterOperator filterOperator) {
if (filterOperator.isSingle() || filterOperator.getTokenIntType() != SQLConstant.KW_OR) {
List<FilterOperator> ret = new ArrayList<>();
ret.add(filterOperator);
return ret;
}
// a list of conjunctions linked by or
return filterOperator.childOperators;
// merge query plan
Map<List<String>, List<TSQueryPlan>> pathMap = new HashMap<>();
for (TSQueryPlan tsQueryPlan : queryPlans) {
if (pathMap.containsKey(tsQueryPlan.getPaths())) {
pathMap.get(tsQueryPlan.getPaths()).add(tsQueryPlan);
} else {
List<TSQueryPlan> plans = new ArrayList<>();
plans.add(tsQueryPlan);
pathMap.put(tsQueryPlan.getPaths(), plans);
}
}
private SingleQuery constructSelectPlan(FilterOperator filterOperator, List<String> columnNames) throws QueryOperatorException {
FilterOperator timeFilter = null;
FilterOperator valueFilter = null;
List<FilterOperator> columnFilterOperators = new ArrayList<>();
List<FilterOperator> singleFilterList = null;
if (filterOperator.isSingle()) {
singleFilterList = new ArrayList<>();
singleFilterList.add(filterOperator);
queryPlans.clear();
} else if (filterOperator.getTokenIntType() == SQLConstant.KW_AND) {
// original query plan has been dealt with merge optimizer, thus all nodes with same path have been
// merged to one node
singleFilterList = filterOperator.getChildren();
for (List<TSQueryPlan> plans : pathMap.values()) {
TSQueryPlan mergePlan = null;
for (TSQueryPlan plan : plans) {
if (mergePlan == null) {
mergePlan = plan;
} else {
FilterOperator timeFilterOperator = new FilterOperator(SQLConstant.KW_OR);
List<FilterOperator> timeFilterChildren = new ArrayList<>();
timeFilterChildren.add(mergePlan.getTimeFilterOperator());
timeFilterChildren.add(plan.getTimeFilterOperator());
timeFilterOperator.setChildrenList(timeFilterChildren);
mergePlan.setTimeFilterOperator(timeFilterOperator);
FilterOperator valueFilterOperator = new FilterOperator(SQLConstant.KW_OR);
List<FilterOperator> valueFilterChildren = new ArrayList<>();
valueFilterChildren.add(mergePlan.getValueFilterOperator());
valueFilterChildren.add(plan.getValueFilterOperator());
valueFilterOperator.setChildrenList(valueFilterChildren);
mergePlan.setValueFilterOperator(valueFilterOperator);
}
}
queryPlans.add(mergePlan);
}
if (singleFilterList == null) {
return null;
}
return queryPlans;
}
List<FilterOperator> valueList = new ArrayList<>();
for (FilterOperator child : singleFilterList) {
if (!child.isSingle()) {
valueList.add(child);
} else {
String singlePath = child.getSinglePath();
if (columnNames.contains(singlePath)) {
if (!columnFilterOperators.contains(child))
columnFilterOperators.add(child);
else
throw new QueryOperatorException(
"The same key filter has been specified more than once: " + singlePath);
} else {
switch (child.getSinglePath()) {
case SQLConstant.RESERVED_TIME:
if (timeFilter != null) {
throw new QueryOperatorException(
"time filter has been specified more than once");
}
timeFilter = child;
break;
default:
valueList.add(child);
break;
}
}
}
}
private List<FilterOperator> splitFilter(FilterOperator filterOperator) {
if (filterOperator.isSingle() || filterOperator.getTokenIntType() != SQLConstant.KW_OR) {
List<FilterOperator> ret = new ArrayList<>();
ret.add(filterOperator);
return ret;
}
// a list of conjunctions linked by or
return filterOperator.childOperators;
}
private SingleQuery constructSelectPlan(FilterOperator filterOperator, List<String> columnNames)
throws QueryOperatorException {
FilterOperator timeFilter = null;
FilterOperator valueFilter = null;
List<FilterOperator> columnFilterOperators = new ArrayList<>();
List<FilterOperator> singleFilterList = null;
if (filterOperator.isSingle()) {
singleFilterList = new ArrayList<>();
singleFilterList.add(filterOperator);
} else if (filterOperator.getTokenIntType() == SQLConstant.KW_AND) {
// original query plan has been dealt with merge optimizer, thus all nodes with same
// path have been merged to one node
singleFilterList = filterOperator.getChildren();
}
if (valueList.size() == 1) {
valueFilter = valueList.get(0);
if (singleFilterList == null) {
return null;
}
} else if (valueList.size() > 1) {
valueFilter = new FilterOperator(SQLConstant.KW_AND, false);
valueFilter.childOperators = valueList;
List<FilterOperator> valueList = new ArrayList<>();
for (FilterOperator child : singleFilterList) {
if (!child.isSingle()) {
valueList.add(child);
} else {
String singlePath = child.getSinglePath();
if (columnNames.contains(singlePath)) {
if (!columnFilterOperators.contains(child)) {
columnFilterOperators.add(child);
} else {
throw new QueryOperatorException(
"The same key filter has been specified more than once: " + singlePath);
}
} else {
switch (child.getSinglePath()) {
case SQLConstant.RESERVED_TIME:
if (timeFilter != null) {
throw new QueryOperatorException(
"time filter has been specified more than once");
}
timeFilter = child;
break;
default:
valueList.add(child);
break;
}
}
}
}
if (valueList.size() == 1) {
valueFilter = valueList.get(0);
return new SingleQuery(columnFilterOperators, timeFilter, valueFilter);
} else if (valueList.size() > 1) {
valueFilter = new FilterOperator(SQLConstant.KW_AND, false);
valueFilter.childOperators = valueList;
}
return new SingleQuery(columnFilterOperators, timeFilter, valueFilter);
}
}
......@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
......@@ -23,53 +23,52 @@ import org.apache.iotdb.tsfile.qp.exception.BasicOperatorException;
/**
* basic operators include < > >= <= !=.
*
*/
public class BasicOperator extends FilterOperator {
private String seriesPath;
private String seriesValue;
private String seriesPath;
private String seriesValue;
public BasicOperator(int tokenIntType, String path, String value) {
super(tokenIntType);
this.seriesPath = this.singlePath = path;
this.seriesValue = value;
this.isLeaf = true;
this.isSingle = true;
}
public BasicOperator(int tokenIntType, String path, String value) {
super(tokenIntType);
this.seriesPath = this.singlePath = path;
this.seriesValue = value;
this.isLeaf = true;
this.isSingle = true;
}
public String getSeriesPath() {
return seriesPath;
}
public String getSeriesPath() {
return seriesPath;
}
public String getSeriesValue() {
return seriesValue;
}
public String getSeriesValue() {
return seriesValue;
}
public void setReversedTokenIntType() throws BasicOperatorException {
int intType = SQLConstant.reverseWords.get(tokenIntType);
setTokenIntType(intType);
}
public void setReversedTokenIntType() throws BasicOperatorException {
int intType = SQLConstant.reverseWords.get(tokenIntType);
setTokenIntType(intType);
}
@Override
public String getSinglePath() {
return singlePath;
}
@Override
public String getSinglePath() {
return singlePath;
}
@Override
public BasicOperator clone() {
BasicOperator ret;
ret = new BasicOperator(this.tokenIntType, seriesPath, seriesValue);
ret.tokenSymbol = tokenSymbol;
ret.isLeaf = isLeaf;
ret.isSingle = isSingle;
return ret;
}
@Override
public BasicOperator clone() {
BasicOperator ret;
ret = new BasicOperator(this.tokenIntType, seriesPath, seriesValue);
ret.tokenSymbol = tokenSymbol;
ret.isLeaf = isLeaf;
ret.isSingle = isSingle;
return ret;
}
@Override
public String toString() {
return "[" + seriesPath + tokenSymbol + seriesValue + "]";
}
@Override
public String toString() {
return "[" + seriesPath + tokenSymbol + seriesValue + "]";
}
}
......@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
......@@ -18,10 +18,9 @@
*/
package org.apache.iotdb.tsfile.qp.common;
import org.apache.iotdb.tsfile.utils.StringContainer;
import java.util.ArrayList;
import java.util.List;
import org.apache.iotdb.tsfile.utils.StringContainer;
/**
......@@ -33,125 +32,126 @@ import java.util.List;
*/
public class FilterOperator extends Operator implements Comparable<FilterOperator> {
public List<FilterOperator> childOperators;
// leaf filter operator means it doesn't have left and right child filterOperator. Leaf filter
// should be BasicOperator.
protected boolean isLeaf = false;
// All recursive children of this filter belong to one series path when isSingle variable is true
protected boolean isSingle = false;
// if isSingle = false, singlePath must be null
protected String singlePath = null;
public FilterOperator(int tokenType) {
super(tokenType);
childOperators = new ArrayList<>();
}
public void setTokenIntType(int intType) {
super.tokenIntType = intType;
super.tokenSymbol = SQLConstant.tokenSymbol.get(tokenIntType);
}
public FilterOperator(int tokenType, boolean isSingle) {
this(tokenType);
this.isSingle = isSingle;
}
public void addHeadDeltaObjectPath(String deltaObject) {
for (FilterOperator child : childOperators) {
child.addHeadDeltaObjectPath(deltaObject);
}
if(isSingle) {
this.singlePath = deltaObject + "." + this.singlePath;
}
}
public List<FilterOperator> getChildren() {
return childOperators;
}
public List<String> getAllPaths() {
List<String> paths = new ArrayList<>();
if(isLeaf) {
paths.add(singlePath);
} else {
for(FilterOperator child: childOperators) {
paths.addAll(child.getAllPaths());
}
}
return paths;
}
public void setChildrenList(List<FilterOperator> children) {
this.childOperators = children;
}
public void setIsSingle(boolean b) {
this.isSingle = b;
}
public void setSinglePath(String p) {
this.singlePath = p;
}
public String getSinglePath() {
return singlePath;
}
public void addChildOPerator(FilterOperator op) {
childOperators.add(op);
}
@Override
public int compareTo(FilterOperator operator) {
if (singlePath == null && operator.singlePath == null) {
return 0;
}
if (singlePath == null) {
return 1;
}
if (operator.singlePath == null) {
return -1;
}
return operator.singlePath.compareTo(singlePath);
}
public List<FilterOperator> childOperators;
// leaf filter operator means it doesn't have left and right child filterOperator. Leaf filter
// should be BasicOperator.
protected boolean isLeaf = false;
// All recursive children of this filter belong to one series path when isSingle variable is true
protected boolean isSingle = false;
// if isSingle = false, singlePath must be null
protected String singlePath = null;
public FilterOperator(int tokenType) {
super(tokenType);
childOperators = new ArrayList<>();
}
public void setTokenIntType(int intType) {
super.tokenIntType = intType;
super.tokenSymbol = SQLConstant.tokenSymbol.get(tokenIntType);
}
public FilterOperator(int tokenType, boolean isSingle) {
this(tokenType);
this.isSingle = isSingle;
}
public void addHeadDeltaObjectPath(String deltaObject) {
for (FilterOperator child : childOperators) {
child.addHeadDeltaObjectPath(deltaObject);
}
if (isSingle) {
this.singlePath = deltaObject + "." + this.singlePath;
}
}
public List<FilterOperator> getChildren() {
return childOperators;
}
public List<String> getAllPaths() {
List<String> paths = new ArrayList<>();
if (isLeaf) {
paths.add(singlePath);
} else {
for (FilterOperator child : childOperators) {
paths.addAll(child.getAllPaths());
}
}
return paths;
}
public void setChildrenList(List<FilterOperator> children) {
this.childOperators = children;
}
public void setIsSingle(boolean b) {
this.isSingle = b;
}
public void setSinglePath(String p) {
this.singlePath = p;
}
public String getSinglePath() {
return singlePath;
}
public void addChildOPerator(FilterOperator op) {
childOperators.add(op);
}
@Override
public int compareTo(FilterOperator operator) {
if (singlePath == null && operator.singlePath == null) {
return 0;
}
if (singlePath == null) {
return 1;
}
if (operator.singlePath == null) {
return -1;
}
return operator.singlePath.compareTo(singlePath);
}
public boolean isLeaf() {
return isLeaf;
}
public boolean isSingle() {
return isSingle;
}
@Override
public String toString() {
StringContainer sc = new StringContainer();
sc.addTail("[", this.tokenSymbol);
if (isSingle) {
sc.addTail("[single:", getSinglePath(), "]");
}
sc.addTail(" ");
for (FilterOperator filter : childOperators) {
sc.addTail(filter.toString());
}
sc.addTail("]");
return sc.toString();
}
public boolean isLeaf() {
return isLeaf;
@Override
public FilterOperator clone() {
FilterOperator ret = new FilterOperator(this.tokenIntType);
ret.tokenSymbol = tokenSymbol;
ret.isLeaf = isLeaf;
ret.isSingle = isSingle;
if (singlePath != null) {
ret.singlePath = singlePath;
}
public boolean isSingle() {
return isSingle;
}
@Override
public String toString() {
StringContainer sc = new StringContainer();
sc.addTail("[", this.tokenSymbol);
if (isSingle) {
sc.addTail("[single:", getSinglePath(), "]");
}
sc.addTail(" ");
for (FilterOperator filter : childOperators) {
sc.addTail(filter.toString());
}
sc.addTail("]");
return sc.toString();
}
@Override
public FilterOperator clone() {
FilterOperator ret = new FilterOperator(this.tokenIntType);
ret.tokenSymbol=tokenSymbol;
ret.isLeaf = isLeaf;
ret.isSingle = isSingle;
if(singlePath != null)
ret.singlePath = singlePath;
for (FilterOperator filterOperator : this.childOperators) {
ret.addChildOPerator(filterOperator.clone());
}
return ret;
for (FilterOperator filterOperator : this.childOperators) {
ret.addChildOPerator(filterOperator.clone());
}
return ret;
}
}
......@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
......@@ -23,24 +23,25 @@ package org.apache.iotdb.tsfile.qp.common;
*
*/
public abstract class Operator {
int tokenIntType;
String tokenSymbol;
Operator(int tokenIntType) {
this.tokenIntType = tokenIntType;
this.tokenSymbol = SQLConstant.tokenSymbol.get(tokenIntType);
}
int tokenIntType;
String tokenSymbol;
Operator(int tokenIntType) {
this.tokenIntType = tokenIntType;
this.tokenSymbol = SQLConstant.tokenSymbol.get(tokenIntType);
}
public int getTokenIntType() {
return tokenIntType;
}
public int getTokenIntType() {
return tokenIntType;
}
public String getTokenSymbol() {
return tokenSymbol;
}
public String getTokenSymbol() {
return tokenSymbol;
}
@Override
public String toString() {
return tokenSymbol;
}
@Override
public String toString() {
return tokenSymbol;
}
}
......@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
......@@ -23,128 +23,127 @@ import java.util.Map;
/**
* this class contains several constants used in SQL.
*
*/
public class SQLConstant {
public static final String DELTA_OBJECT_NAME = "delta_object_name";
public static final String REGEX_PATH_SEPARATOR = "\\.";
public static final String PATH_SEPARATOR = ".";
public static final String RESERVED_TIME = "time";
public static final String RESERVED_FREQ = "freq";
public static final String RESERVED_DELTA_OBJECT = "device_name";
public static final String INT32 = "INT32";
public static final String INT64 = "INT64";
public static final String FLOAT = "FLOAT";
public static final String DOUBLE = "DOUBLE";
public static final String BYTE_ARRAY = "BYTE_ARRAY";
public static final int KW_AND = 1;
public static final int KW_OR = 2;
public static final int KW_NOT = 3;
public static final int EQUAL = 11;
public static final int NOTEQUAL = 12;
public static final int LESSTHANOREQUALTO = 13;
public static final int LESSTHAN = 14;
public static final int GREATERTHANOREQUALTO = 15;
public static final int GREATERTHAN = 16;
public static final int EQUAL_NS = 17;
public static final int TOK_SELECT = 21;
public static final int TOK_FROM = 22;
public static final int TOK_WHERE = 23;
public static final int TOK_INSERT = 24;
public static final int TOK_DELETE = 25;
public static final int TOK_UPDATE = 26;
public static final int TOK_QUERY = 27;
public static final int TOK_AUTHOR_CREATE = 41;
public static final int TOK_AUTHOR_DROP = 42;
public static final int TOK_AUTHOR_GRANT = 43;
public static final int TOK_AUTHOR_REVOKE = 44;
public static final int TOK_DATALOAD = 45;
public static final int TOK_METADATA_CREATE = 51;
public static final int TOK_METADATA_DELETE = 52;
public static final int TOK_METADATA_SET_FILE_LEVEL = 53;
public static final int TOK_PORPERTY_CREATE = 54;
public static final int TOK_PORPERTY_ADD_LABEL = 55;
public static final int TOK_PORPERTY_DELETE_LABEL = 56;
public static final int TOK_PORPERTY_LINK = 57;
public static final int TOK_PORPERTY_UNLINK = 58;
public static Map<Integer, String> tokenSymbol = new HashMap<Integer, String>();
public static Map<Integer, String> tokenNames = new HashMap<Integer, String>();
public static Map<Integer, Integer> reverseWords = new HashMap<Integer, Integer>();
static {
tokenSymbol.put(KW_AND, "&");
tokenSymbol.put(KW_OR, "|");
tokenSymbol.put(KW_NOT, "!");
tokenSymbol.put(EQUAL, "=");
tokenSymbol.put(NOTEQUAL, "<>");
tokenSymbol.put(EQUAL_NS, "<=>");
tokenSymbol.put(LESSTHANOREQUALTO, "<=");
tokenSymbol.put(LESSTHAN, "<");
tokenSymbol.put(GREATERTHANOREQUALTO, ">=");
tokenSymbol.put(GREATERTHAN, ">");
}
static {
tokenNames.put(KW_AND, "and");
tokenNames.put(KW_OR, "or");
tokenNames.put(KW_NOT, "not");
tokenNames.put(EQUAL, "equal");
tokenNames.put(NOTEQUAL, "not_equal");
tokenNames.put(EQUAL_NS, "equal_ns");
tokenNames.put(LESSTHANOREQUALTO, "lessthan_or_equalto");
tokenNames.put(LESSTHAN, "lessthan");
tokenNames.put(GREATERTHANOREQUALTO, "greaterthan_or_equalto");
tokenNames.put(GREATERTHAN, "greaterthan");
tokenNames.put(TOK_SELECT, "TOK_SELECT");
tokenNames.put(TOK_FROM, "TOK_FROM");
tokenNames.put(TOK_WHERE, "TOK_WHERE");
tokenNames.put(TOK_INSERT, "TOK_INSERT");
tokenNames.put(TOK_DELETE, "TOK_DELETE");
tokenNames.put(TOK_UPDATE, "TOK_UPDATE");
tokenNames.put(TOK_QUERY, "TOK_QUERY");
tokenNames.put(TOK_AUTHOR_CREATE, "TOK_AUTHOR_CREATE");
tokenNames.put(TOK_AUTHOR_DROP, "TOK_AUTHOR_DROP");
tokenNames.put(TOK_AUTHOR_GRANT, "TOK_AUTHOR_GRANT");
tokenNames.put(TOK_AUTHOR_REVOKE, "TOK_AUTHOR_REVOKE");
tokenNames.put(TOK_DATALOAD, "TOK_DATALOAD");
tokenNames.put(TOK_METADATA_CREATE, "TOK_METADATA_CREATE");
tokenNames.put(TOK_METADATA_DELETE, "TOK_METADATA_DELETE");
tokenNames.put(TOK_METADATA_SET_FILE_LEVEL, "TOK_METADATA_SET_FILE_LEVEL");
tokenNames.put(TOK_PORPERTY_CREATE, "TOK_PORPERTY_CREATE");
tokenNames.put(TOK_PORPERTY_ADD_LABEL, "TOK_PORPERTY_ADD_LABEL");
tokenNames.put(TOK_PORPERTY_DELETE_LABEL, "TOK_PORPERTY_DELETE_LABEL");
tokenNames.put(TOK_PORPERTY_LINK, "TOK_PORPERTY_LINK");
tokenNames.put(TOK_PORPERTY_UNLINK, "TOK_PORPERTY_UNLINK");
}
static {
reverseWords.put(KW_AND, KW_OR);
reverseWords.put(KW_OR, KW_AND);
reverseWords.put(EQUAL, NOTEQUAL);
reverseWords.put(NOTEQUAL, EQUAL);
reverseWords.put(LESSTHAN, GREATERTHANOREQUALTO);
reverseWords.put(GREATERTHANOREQUALTO, LESSTHAN);
reverseWords.put(LESSTHANOREQUALTO, GREATERTHAN);
reverseWords.put(GREATERTHAN, LESSTHANOREQUALTO);
}
public static boolean isReservedPath(String pathStr) {
return pathStr.equals(SQLConstant.RESERVED_TIME)
|| pathStr.equals(SQLConstant.RESERVED_FREQ)
|| pathStr.equals(SQLConstant.RESERVED_DELTA_OBJECT);
}
public static final String DELTA_OBJECT_NAME = "delta_object_name";
public static final String REGEX_PATH_SEPARATOR = "\\.";
public static final String PATH_SEPARATOR = ".";
public static final String RESERVED_TIME = "time";
public static final String RESERVED_FREQ = "freq";
public static final String RESERVED_DELTA_OBJECT = "device_name";
public static final String INT32 = "INT32";
public static final String INT64 = "INT64";
public static final String FLOAT = "FLOAT";
public static final String DOUBLE = "DOUBLE";
public static final String BYTE_ARRAY = "BYTE_ARRAY";
public static final int KW_AND = 1;
public static final int KW_OR = 2;
public static final int KW_NOT = 3;
public static final int EQUAL = 11;
public static final int NOTEQUAL = 12;
public static final int LESSTHANOREQUALTO = 13;
public static final int LESSTHAN = 14;
public static final int GREATERTHANOREQUALTO = 15;
public static final int GREATERTHAN = 16;
public static final int EQUAL_NS = 17;
public static final int TOK_SELECT = 21;
public static final int TOK_FROM = 22;
public static final int TOK_WHERE = 23;
public static final int TOK_INSERT = 24;
public static final int TOK_DELETE = 25;
public static final int TOK_UPDATE = 26;
public static final int TOK_QUERY = 27;
public static final int TOK_AUTHOR_CREATE = 41;
public static final int TOK_AUTHOR_DROP = 42;
public static final int TOK_AUTHOR_GRANT = 43;
public static final int TOK_AUTHOR_REVOKE = 44;
public static final int TOK_DATALOAD = 45;
public static final int TOK_METADATA_CREATE = 51;
public static final int TOK_METADATA_DELETE = 52;
public static final int TOK_METADATA_SET_FILE_LEVEL = 53;
public static final int TOK_PORPERTY_CREATE = 54;
public static final int TOK_PORPERTY_ADD_LABEL = 55;
public static final int TOK_PORPERTY_DELETE_LABEL = 56;
public static final int TOK_PORPERTY_LINK = 57;
public static final int TOK_PORPERTY_UNLINK = 58;
public static Map<Integer, String> tokenSymbol = new HashMap<Integer, String>();
public static Map<Integer, String> tokenNames = new HashMap<Integer, String>();
public static Map<Integer, Integer> reverseWords = new HashMap<Integer, Integer>();
static {
tokenSymbol.put(KW_AND, "&");
tokenSymbol.put(KW_OR, "|");
tokenSymbol.put(KW_NOT, "!");
tokenSymbol.put(EQUAL, "=");
tokenSymbol.put(NOTEQUAL, "<>");
tokenSymbol.put(EQUAL_NS, "<=>");
tokenSymbol.put(LESSTHANOREQUALTO, "<=");
tokenSymbol.put(LESSTHAN, "<");
tokenSymbol.put(GREATERTHANOREQUALTO, ">=");
tokenSymbol.put(GREATERTHAN, ">");
}
static {
tokenNames.put(KW_AND, "and");
tokenNames.put(KW_OR, "or");
tokenNames.put(KW_NOT, "not");
tokenNames.put(EQUAL, "equal");
tokenNames.put(NOTEQUAL, "not_equal");
tokenNames.put(EQUAL_NS, "equal_ns");
tokenNames.put(LESSTHANOREQUALTO, "lessthan_or_equalto");
tokenNames.put(LESSTHAN, "lessthan");
tokenNames.put(GREATERTHANOREQUALTO, "greaterthan_or_equalto");
tokenNames.put(GREATERTHAN, "greaterthan");
tokenNames.put(TOK_SELECT, "TOK_SELECT");
tokenNames.put(TOK_FROM, "TOK_FROM");
tokenNames.put(TOK_WHERE, "TOK_WHERE");
tokenNames.put(TOK_INSERT, "TOK_INSERT");
tokenNames.put(TOK_DELETE, "TOK_DELETE");
tokenNames.put(TOK_UPDATE, "TOK_UPDATE");
tokenNames.put(TOK_QUERY, "TOK_QUERY");
tokenNames.put(TOK_AUTHOR_CREATE, "TOK_AUTHOR_CREATE");
tokenNames.put(TOK_AUTHOR_DROP, "TOK_AUTHOR_DROP");
tokenNames.put(TOK_AUTHOR_GRANT, "TOK_AUTHOR_GRANT");
tokenNames.put(TOK_AUTHOR_REVOKE, "TOK_AUTHOR_REVOKE");
tokenNames.put(TOK_DATALOAD, "TOK_DATALOAD");
tokenNames.put(TOK_METADATA_CREATE, "TOK_METADATA_CREATE");
tokenNames.put(TOK_METADATA_DELETE, "TOK_METADATA_DELETE");
tokenNames.put(TOK_METADATA_SET_FILE_LEVEL, "TOK_METADATA_SET_FILE_LEVEL");
tokenNames.put(TOK_PORPERTY_CREATE, "TOK_PORPERTY_CREATE");
tokenNames.put(TOK_PORPERTY_ADD_LABEL, "TOK_PORPERTY_ADD_LABEL");
tokenNames.put(TOK_PORPERTY_DELETE_LABEL, "TOK_PORPERTY_DELETE_LABEL");
tokenNames.put(TOK_PORPERTY_LINK, "TOK_PORPERTY_LINK");
tokenNames.put(TOK_PORPERTY_UNLINK, "TOK_PORPERTY_UNLINK");
}
static {
reverseWords.put(KW_AND, KW_OR);
reverseWords.put(KW_OR, KW_AND);
reverseWords.put(EQUAL, NOTEQUAL);
reverseWords.put(NOTEQUAL, EQUAL);
reverseWords.put(LESSTHAN, GREATERTHANOREQUALTO);
reverseWords.put(GREATERTHANOREQUALTO, LESSTHAN);
reverseWords.put(LESSTHANOREQUALTO, GREATERTHAN);
reverseWords.put(GREATERTHAN, LESSTHANOREQUALTO);
}
public static boolean isReservedPath(String pathStr) {
return pathStr.equals(SQLConstant.RESERVED_TIME)
|| pathStr.equals(SQLConstant.RESERVED_FREQ)
|| pathStr.equals(SQLConstant.RESERVED_DELTA_OBJECT);
}
}
......@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
......@@ -28,35 +28,36 @@ import java.util.List;
*/
public class SingleQuery {
private List<FilterOperator> columnFilterOperators;
private FilterOperator timeFilterOperator;
private FilterOperator valueFilterOperator;
private List<FilterOperator> columnFilterOperators;
private FilterOperator timeFilterOperator;
private FilterOperator valueFilterOperator;
public SingleQuery(List<FilterOperator> columnFilterOperators,
FilterOperator timeFilter, FilterOperator valueFilter) {
super();
this.columnFilterOperators = columnFilterOperators;
this.timeFilterOperator = timeFilter;
this.valueFilterOperator = valueFilter;
}
public SingleQuery(List<FilterOperator> columnFilterOperators,
FilterOperator timeFilter, FilterOperator valueFilter) {
super();
this.columnFilterOperators = columnFilterOperators;
this.timeFilterOperator = timeFilter;
this.valueFilterOperator = valueFilter;
}
public List<FilterOperator> getColumnFilterOperator() {
public List<FilterOperator> getColumnFilterOperator() {
return columnFilterOperators;
}
return columnFilterOperators;
}
public FilterOperator getTimeFilterOperator() {
return timeFilterOperator;
}
public FilterOperator getTimeFilterOperator() {
return timeFilterOperator;
}
public FilterOperator getValueFilterOperator() {
return valueFilterOperator;
}
public FilterOperator getValueFilterOperator() {
return valueFilterOperator;
}
@Override
public String toString() {
return "SingleQuery: \n" + columnFilterOperators + "\n" + timeFilterOperator + "\n" + valueFilterOperator;
}
@Override
public String toString() {
return "SingleQuery: \n" + columnFilterOperators + "\n" + timeFilterOperator + "\n"
+ valueFilterOperator;
}
}
......@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
......@@ -26,43 +26,46 @@ import java.util.List;
*
*/
public class TSQueryPlan {
private List<String> paths = new ArrayList<>();
private FilterOperator timeFilterOperator;
private FilterOperator valueFilterOperator;
public TSQueryPlan(List<String> paths, FilterOperator timeFilter, FilterOperator valueFilter) {
this.paths = paths;
this.timeFilterOperator = timeFilter;
this.valueFilterOperator = valueFilter;
}
private List<String> paths = new ArrayList<>();
private FilterOperator timeFilterOperator;
private FilterOperator valueFilterOperator;
public List<String> getPaths() {
return paths;
}
public TSQueryPlan(List<String> paths, FilterOperator timeFilter, FilterOperator valueFilter) {
this.paths = paths;
this.timeFilterOperator = timeFilter;
this.valueFilterOperator = valueFilter;
}
public FilterOperator getTimeFilterOperator() {
return timeFilterOperator;
}
public List<String> getPaths() {
return paths;
}
public void setTimeFilterOperator(FilterOperator timeFilterOperator) {
this.timeFilterOperator = timeFilterOperator;
}
public FilterOperator getTimeFilterOperator() {
return timeFilterOperator;
}
public void setValueFilterOperator(FilterOperator valueFilterOperator) {
this.valueFilterOperator = valueFilterOperator;
}
public void setTimeFilterOperator(FilterOperator timeFilterOperator) {
this.timeFilterOperator = timeFilterOperator;
}
public FilterOperator getValueFilterOperator() {
return valueFilterOperator;
}
public void setValueFilterOperator(FilterOperator valueFilterOperator) {
this.valueFilterOperator = valueFilterOperator;
}
public String toString(){
String ret = "";
ret += paths.toString();
if(timeFilterOperator != null)
ret += timeFilterOperator.toString();
if(valueFilterOperator != null)
ret += valueFilterOperator.toString();
return ret;
public FilterOperator getValueFilterOperator() {
return valueFilterOperator;
}
public String toString() {
String ret = "";
ret += paths.toString();
if (timeFilterOperator != null) {
ret += timeFilterOperator.toString();
}
if (valueFilterOperator != null) {
ret += valueFilterOperator.toString();
}
return ret;
}
}
......@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
......@@ -25,10 +25,10 @@ package org.apache.iotdb.tsfile.qp.exception;
*/
public class BasicOperatorException extends QueryProcessorException {
private static final long serialVersionUID = -2163809754074237707L;
private static final long serialVersionUID = -2163809754074237707L;
public BasicOperatorException(String msg) {
super(msg);
}
public BasicOperatorException(String msg) {
super(msg);
}
}
......@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
......@@ -25,10 +25,10 @@ package org.apache.iotdb.tsfile.qp.exception;
*/
public class DNFOptimizeException extends LogicalOptimizeException {
private static final long serialVersionUID = 807384397361662482L;
private static final long serialVersionUID = 807384397361662482L;
public DNFOptimizeException(String msg) {
super(msg);
}
public DNFOptimizeException(String msg) {
super(msg);
}
}
......@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
......@@ -24,10 +24,10 @@ package org.apache.iotdb.tsfile.qp.exception;
*/
public class LogicalOptimizeException extends QueryProcessorException {
private static final long serialVersionUID = -7098092782689670064L;
private static final long serialVersionUID = -7098092782689670064L;
public LogicalOptimizeException(String msg) {
super(msg);
}
public LogicalOptimizeException(String msg) {
super(msg);
}
}
......@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
......@@ -21,10 +21,10 @@ package org.apache.iotdb.tsfile.qp.exception;
public class MergeFilterException extends LogicalOptimizeException {
private static final long serialVersionUID = 8581594261924961899L;
private static final long serialVersionUID = 8581594261924961899L;
public MergeFilterException(String msg) {
super(msg);
}
public MergeFilterException(String msg) {
super(msg);
}
}
......@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
......@@ -20,10 +20,10 @@ package org.apache.iotdb.tsfile.qp.exception;
public class QueryOperatorException extends LogicalOptimizeException {
private static final long serialVersionUID = 8581594261924961899L;
private static final long serialVersionUID = 8581594261924961899L;
public QueryOperatorException(String msg) {
super(msg);
}
public QueryOperatorException(String msg) {
super(msg);
}
}
......@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
......@@ -23,17 +23,19 @@ package org.apache.iotdb.tsfile.qp.exception;
*
*/
public class QueryProcessorException extends Exception {
private static final long serialVersionUID = -8987915921329335088L;
private String errMsg;
QueryProcessorException(String msg) {
super(msg);
this.errMsg = msg;
}
@Override
public String getMessage() {
return errMsg;
}
private static final long serialVersionUID = -8987915921329335088L;
private String errMsg;
QueryProcessorException(String msg) {
super(msg);
this.errMsg = msg;
}
@Override
public String getMessage() {
return errMsg;
}
}
......@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
......@@ -19,17 +19,16 @@
package org.apache.iotdb.tsfile.qp.exception;
/**
* This exception is threw whiling meeting error in
*
*/
public class RemoveNotException extends LogicalOptimizeException {
private static final long serialVersionUID = -772591029262375715L;
private static final long serialVersionUID = -772591029262375715L;
public RemoveNotException(String msg) {
super(msg);
}
public RemoveNotException(String msg) {
super(msg);
}
}
......@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
......@@ -18,142 +18,141 @@
*/
package org.apache.iotdb.tsfile.qp.optimizer;
import org.apache.iotdb.tsfile.qp.common.FilterOperator;
import org.apache.iotdb.tsfile.qp.exception.DNFOptimizeException;
import java.util.ArrayList;
import java.util.List;
import static org.apache.iotdb.tsfile.qp.common.SQLConstant.KW_AND;
import static org.apache.iotdb.tsfile.qp.common.SQLConstant.KW_OR;
import java.util.ArrayList;
import java.util.List;
import org.apache.iotdb.tsfile.qp.common.FilterOperator;
import org.apache.iotdb.tsfile.qp.exception.DNFOptimizeException;
public class DNFFilterOptimizer implements IFilterOptimizer {
/**
* get DNF(disjunctive normal form) for this filter operator tree.
* Before invoking getDNF function, make sure that operator tree must be binary tree.
* In other words, each non-leaf node has exactly two children.
*
* @param filter filter operator to be optimized
* @return FilterOperator
* @throws DNFOptimizeException exception in DNF optimizing
*/
@Override
public FilterOperator optimize(FilterOperator filter) throws DNFOptimizeException {
return getDNF(filter);
}
/**
* get DNF(disjunctive normal form) for this filter operator tree.
* Before invoking getDNF function, make sure that operator tree must be binary tree.
* In other words, each non-leaf node has exactly two children.
*
* @param filter filter operator to be optimized
* @return FilterOperator
* @throws DNFOptimizeException exception in DNF optimizing
*/
@Override
public FilterOperator optimize(FilterOperator filter) throws DNFOptimizeException {
return getDNF(filter);
}
private FilterOperator getDNF(FilterOperator filter) throws DNFOptimizeException {
if (filter.isLeaf())
return filter;
List<FilterOperator> children = filter.getChildren();
if (children.size() != 2) {
throw new DNFOptimizeException("node :" + filter.getTokenSymbol() + " has "
+ children.size() + " children");
private FilterOperator getDNF(FilterOperator filter) throws DNFOptimizeException {
if (filter.isLeaf()) {
return filter;
}
List<FilterOperator> children = filter.getChildren();
if (children.size() != 2) {
throw new DNFOptimizeException("node :" + filter.getTokenSymbol() + " has "
+ children.size() + " children");
}
FilterOperator left = getDNF(children.get(0));
FilterOperator right = getDNF(children.get(1));
List<FilterOperator> newChildrenList = new ArrayList<>();
switch (filter.getTokenIntType()) {
case KW_OR:
addChildOpInOr(left, newChildrenList);
addChildOpInOr(right, newChildrenList);
break;
case KW_AND:
if (left.getTokenIntType() != KW_OR && right.getTokenIntType() != KW_OR) {
addChildOpInAnd(left, newChildrenList);
addChildOpInAnd(right, newChildrenList);
} else {
List<FilterOperator> leftAndChildren = getAndChild(left);
List<FilterOperator> rightAndChildren = getAndChild(right);
for (FilterOperator laChild : leftAndChildren) {
for (FilterOperator raChild : rightAndChildren) {
FilterOperator r = mergeToConjunction(laChild.clone(), raChild.clone());
newChildrenList.add(r);
}
}
filter.setTokenIntType(KW_OR);
}
FilterOperator left = getDNF(children.get(0));
FilterOperator right = getDNF(children.get(1));
List<FilterOperator> newChildrenList = new ArrayList<>();
switch (filter.getTokenIntType()) {
case KW_OR:
addChildOpInOr(left, newChildrenList);
addChildOpInOr(right, newChildrenList);
break;
case KW_AND:
if (left.getTokenIntType() != KW_OR && right.getTokenIntType() != KW_OR) {
addChildOpInAnd(left, newChildrenList);
addChildOpInAnd(right, newChildrenList);
} else {
List<FilterOperator> leftAndChildren = getAndChild(left);
List<FilterOperator> rightAndChildren = getAndChild(right);
for (FilterOperator laChild : leftAndChildren) {
for (FilterOperator raChild : rightAndChildren) {
FilterOperator r = mergeToConjunction(laChild.clone(), raChild.clone());
newChildrenList.add(r);
}
}
filter.setTokenIntType(KW_OR);
}
break;
default:
throw new DNFOptimizeException("get DNF failed, this tokenType is:"
+ filter.getTokenIntType());
}
filter.setChildrenList(newChildrenList);
return filter;
break;
default:
throw new DNFOptimizeException("get DNF failed, this tokenType is:"
+ filter.getTokenIntType());
}
filter.setChildrenList(newChildrenList);
return filter;
}
/**
* Merge two conjunction filter operators into one.<br>
* conjunction operator consists of {@code FilterOperator} and inner operator which token is
* KW_AND.<br>
* e.g. (a and b) merge (c) is (a and b and c)
*
* @param a
* @param b
* @return FilterOperator
* @throws DNFOptimizeException
*/
private FilterOperator mergeToConjunction(FilterOperator a, FilterOperator b)
throws DNFOptimizeException {
List<FilterOperator> retChildrenList = new ArrayList<>();
addChildOpInAnd(a, retChildrenList);
addChildOpInAnd(b, retChildrenList);
FilterOperator ret = new FilterOperator(KW_AND, false);
ret.setChildrenList(retChildrenList);
return ret;
}
/**
* Merge two conjunction filter operators into one.<br>
* conjunction operator consists of {@code FilterOperator} and inner operator which token is
* KW_AND.<br>
* e.g. (a and b) merge (c) is (a and b and c)
*
* @param a
* @param b
* @return FilterOperator
* @throws DNFOptimizeException
*/
private FilterOperator mergeToConjunction(FilterOperator a, FilterOperator b)
throws DNFOptimizeException {
List<FilterOperator> retChildrenList = new ArrayList<>();
addChildOpInAnd(a, retChildrenList);
addChildOpInAnd(b, retChildrenList);
FilterOperator ret = new FilterOperator(KW_AND, false);
ret.setChildrenList(retChildrenList);
/**
* Obtain conjunction node according to input filter operator's token type.
* If token type == KW_OR, return its children. Otherwise, return a list contains input filter operator
*
* @param child
* @return List<FilterOperator>
*/
private List<FilterOperator> getAndChild(FilterOperator child) {
switch (child.getTokenIntType()) {
case KW_OR:
return child.getChildren();
default:
// other token type means leaf node or "and" operator
List<FilterOperator> ret = new ArrayList<>();
ret.add(child);
return ret;
}
}
/**
* Obtain conjunction node according to input filter operator's token type.
* If token type == KW_OR, return its children. Otherwise, return a list contains input filter operator
*
* @param child
* @return List<FilterOperator>
*/
private List<FilterOperator> getAndChild(FilterOperator child) {
switch (child.getTokenIntType()) {
case KW_OR:
return child.getChildren();
default:
// other token type means leaf node or "and" operator
List<FilterOperator> ret = new ArrayList<>();
ret.add(child);
return ret;
}
}
/**
* @param child
* @param newChildrenList
* @throws DNFOptimizeException
*/
private void addChildOpInAnd(FilterOperator child, List<FilterOperator> newChildrenList)
throws DNFOptimizeException {
if (child.isLeaf())
newChildrenList.add(child);
else if (child.getTokenIntType() == KW_AND)
newChildrenList.addAll(child.getChildren());
else {
throw new DNFOptimizeException(
"add all children of an OR operator to newChildrenList in AND");
}
/**
* @param child
* @param newChildrenList
* @throws DNFOptimizeException
*/
private void addChildOpInAnd(FilterOperator child, List<FilterOperator> newChildrenList)
throws DNFOptimizeException {
if (child.isLeaf()) {
newChildrenList.add(child);
} else if (child.getTokenIntType() == KW_AND) {
newChildrenList.addAll(child.getChildren());
} else {
throw new DNFOptimizeException(
"add all children of an OR operator to newChildrenList in AND");
}
}
/**
* @param child
* @param newChildrenList
*/
private void addChildOpInOr(FilterOperator child, List<FilterOperator> newChildrenList) {
if (child.isLeaf() || child.getTokenIntType() == KW_AND) {
newChildrenList.add(child);
}
else{
newChildrenList.addAll(child.getChildren());
}
/**
* @param child
* @param newChildrenList
*/
private void addChildOpInOr(FilterOperator child, List<FilterOperator> newChildrenList) {
if (child.isLeaf() || child.getTokenIntType() == KW_AND) {
newChildrenList.add(child);
} else {
newChildrenList.addAll(child.getChildren());
}
}
}
......@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
......@@ -28,5 +28,7 @@ import org.apache.iotdb.tsfile.qp.exception.RemoveNotException;
*
*/
public interface IFilterOptimizer {
FilterOperator optimize(FilterOperator filter) throws RemoveNotException, DNFOptimizeException, MergeFilterException;
FilterOperator optimize(FilterOperator filter)
throws RemoveNotException, DNFOptimizeException, MergeFilterException;
}
......@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
......@@ -18,121 +18,124 @@
*/
package org.apache.iotdb.tsfile.qp.optimizer;
import org.apache.iotdb.tsfile.qp.common.FilterOperator;
import org.apache.iotdb.tsfile.qp.exception.MergeFilterException;
import org.apache.iotdb.tsfile.qp.common.BasicOperator;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.iotdb.tsfile.qp.common.BasicOperator;
import org.apache.iotdb.tsfile.qp.common.FilterOperator;
import org.apache.iotdb.tsfile.qp.exception.MergeFilterException;
public class MergeSingleFilterOptimizer implements IFilterOptimizer {
@Override
public FilterOperator optimize(FilterOperator filter) throws MergeFilterException {
mergeSamePathFilter(filter);
@Override
public FilterOperator optimize(FilterOperator filter) throws MergeFilterException {
mergeSamePathFilter(filter);
return filter;
}
return filter;
}
private String mergeSamePathFilter(FilterOperator filter) throws MergeFilterException {
if (filter.isLeaf())
return filter.getSinglePath();
List<FilterOperator> children = filter.getChildren();
if (children.isEmpty()) {
throw new MergeFilterException("this inner filter has no children!");
}
if (children.size() == 1) {
throw new MergeFilterException("this inner filter has just one child!");
}
String childPath = mergeSamePathFilter(children.get(0));
String tempPath;
for (int i = 1; i < children.size(); i++) {
tempPath = mergeSamePathFilter(children.get(i));
// if one of children differs from others or is not single node(path = null), filter's path
// is null
if (tempPath == null || !tempPath.equals(childPath))
childPath = null;
}
if (childPath != null) {
filter.setIsSingle(true);
filter.setSinglePath(childPath);
return childPath;
}
private String mergeSamePathFilter(FilterOperator filter) throws MergeFilterException {
if (filter.isLeaf()) {
return filter.getSinglePath();
}
List<FilterOperator> children = filter.getChildren();
if (children.isEmpty()) {
throw new MergeFilterException("this inner filter has no children!");
}
if (children.size() == 1) {
throw new MergeFilterException("this inner filter has just one child!");
}
String childPath = mergeSamePathFilter(children.get(0));
String tempPath;
for (int i = 1; i < children.size(); i++) {
tempPath = mergeSamePathFilter(children.get(i));
// if one of children differs from others or is not single node(path = null), filter's path
// is null
if (tempPath == null || !tempPath.equals(childPath)) {
childPath = null;
}
}
if (childPath != null) {
filter.setIsSingle(true);
filter.setSinglePath(childPath);
return childPath;
}
// make same paths close
Collections.sort(children);
List<FilterOperator> ret = new ArrayList<>();
// make same paths close
Collections.sort(children);
List<FilterOperator> ret = new ArrayList<>();
List<FilterOperator> tempExtrNode = null;
int i;
for (i = 0; i < children.size(); i++) {
tempPath = children.get(i).getSinglePath();
// sorted by path, all "null" paths are in the end
if (tempPath == null) {
break;
}
if (childPath == null) {
// first child to be added
childPath = tempPath;
tempExtrNode = new ArrayList<>();
tempExtrNode.add(children.get(i));
} else if (childPath.equals(tempPath)) {
// successive next single child with same path,merge it with previous children
tempExtrNode.add(children.get(i));
} else {
// not more same, add exist nodes in tempExtrNode into a new node
// prevent make a node which has only one child.
if (tempExtrNode.size() == 1) {
ret.add(tempExtrNode.get(0));
// use exist Object directly for efficiency
tempExtrNode.set(0, children.get(i));
childPath = tempPath;
} else {
// add a new inner node
FilterOperator newFilter = new FilterOperator(filter.getTokenIntType(), true);
newFilter.setSinglePath(childPath);
newFilter.setChildrenList(tempExtrNode);
ret.add(newFilter);
tempExtrNode = new ArrayList<>();
tempExtrNode.add(children.get(i));
childPath = tempPath;
}
}
}
// the last several children before "not single paths" has not been added to ret list.
if (childPath != null) {
if (tempExtrNode.size() == 1) {
ret.add(tempExtrNode.get(0));
} else {
// add a new inner node
FilterOperator newFil = new FilterOperator(filter.getTokenIntType(), true);
newFil.setSinglePath(childPath);
newFil.setChildrenList(tempExtrNode);
ret.add(newFil);
}
}
// add last null children
for (; i < children.size(); i++) {
ret.add(children.get(i));
}
if (ret.size() == 1) {
// all children have same path, which means this filter node is a single node
filter.setIsSingle(true);
filter.setSinglePath(childPath);
filter.setChildrenList(ret.get(0).getChildren());
return childPath;
List<FilterOperator> tempExtrNode = null;
int i;
for (i = 0; i < children.size(); i++) {
tempPath = children.get(i).getSinglePath();
// sorted by path, all "null" paths are in the end
if (tempPath == null) {
break;
}
if (childPath == null) {
// first child to be added
childPath = tempPath;
tempExtrNode = new ArrayList<>();
tempExtrNode.add(children.get(i));
} else if (childPath.equals(tempPath)) {
// successive next single child with same path,merge it with previous children
tempExtrNode.add(children.get(i));
} else {
// not more same, add exist nodes in tempExtrNode into a new node
// prevent make a node which has only one child.
if (tempExtrNode.size() == 1) {
ret.add(tempExtrNode.get(0));
// use exist Object directly for efficiency
tempExtrNode.set(0, children.get(i));
childPath = tempPath;
} else {
filter.setIsSingle(false);
filter.setChildrenList(ret);
return null;
// add a new inner node
FilterOperator newFilter = new FilterOperator(filter.getTokenIntType(), true);
newFilter.setSinglePath(childPath);
newFilter.setChildrenList(tempExtrNode);
ret.add(newFilter);
tempExtrNode = new ArrayList<>();
tempExtrNode.add(children.get(i));
childPath = tempPath;
}
}
}
// the last several children before "not single paths" has not been added to ret list.
if (childPath != null) {
if (tempExtrNode.size() == 1) {
ret.add(tempExtrNode.get(0));
} else {
// add a new inner node
FilterOperator newFil = new FilterOperator(filter.getTokenIntType(), true);
newFil.setSinglePath(childPath);
newFil.setChildrenList(tempExtrNode);
ret.add(newFil);
}
}
// add last null children
for (; i < children.size(); i++) {
ret.add(children.get(i));
}
if (ret.size() == 1) {
// all children have same path, which means this filter node is a single node
filter.setIsSingle(true);
filter.setSinglePath(childPath);
filter.setChildrenList(ret.get(0).getChildren());
return childPath;
} else {
filter.setIsSingle(false);
filter.setChildrenList(ret);
return null;
}
}
private boolean allIsBasic(List<FilterOperator> children) {
for(FilterOperator child: children) {
if(!(child instanceof BasicOperator))
return false;
}
return true;
private boolean allIsBasic(List<FilterOperator> children) {
for (FilterOperator child : children) {
if (!(child instanceof BasicOperator)) {
return false;
}
}
return true;
}
}
......@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
......@@ -18,208 +18,223 @@
*/
package org.apache.iotdb.tsfile.qp.optimizer;
import org.apache.iotdb.tsfile.qp.common.*;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.iotdb.tsfile.qp.common.BasicOperator;
import org.apache.iotdb.tsfile.qp.common.FilterOperator;
import org.apache.iotdb.tsfile.qp.common.SQLConstant;
import org.apache.iotdb.tsfile.qp.common.SingleQuery;
import org.apache.iotdb.tsfile.qp.common.TSQueryPlan;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import java.io.IOException;
import java.util.*;
public class PhysicalOptimizer {
//determine whether to query all delta_objects from TSFile. true means do query.
private boolean flag;
private List<String> validDeltaObjects = new ArrayList<>();
private List<String> columnNames;
public PhysicalOptimizer(List<String> columnNames) {
this.columnNames = columnNames;
//determine whether to query all delta_objects from TSFile. true means do query.
private boolean flag;
private List<String> validDeltaObjects = new ArrayList<>();
private List<String> columnNames;
public PhysicalOptimizer(List<String> columnNames) {
this.columnNames = columnNames;
}
public List<TSQueryPlan> optimize(SingleQuery singleQuery, List<String> paths,
TsFileSequenceReader in, Long start, Long end) throws IOException {
List<String> actualDeltaObjects = in.getDeviceNameInRange(start, end);
List<MeasurementSchema> actualSeries = in.readFileMetadata().getMeasurementSchemaList();
List<String> selectedSeries = new ArrayList<>();
for (String path : paths) {
if (!columnNames.contains(path) && !path.equals(SQLConstant.RESERVED_TIME)) {
selectedSeries.add(path);
}
}
public List<TSQueryPlan> optimize(SingleQuery singleQuery, List<String> paths,
TsFileSequenceReader in, Long start, Long end) throws IOException {
List<String> actualDeltaObjects = in.getDeviceNameInRange(start, end);
List<MeasurementSchema> actualSeries = in.readFileMetadata().getMeasurementSchemaList();
List<String> selectedSeries = new ArrayList<>();
for (String path : paths) {
if (!columnNames.contains(path) && !path.equals(SQLConstant.RESERVED_TIME)) {
selectedSeries.add(path);
}
}
FilterOperator timeFilter = null;
FilterOperator valueFilter = null;
if (singleQuery != null) {
timeFilter = singleQuery.getTimeFilterOperator();
valueFilter = singleQuery.getValueFilterOperator();
if (valueFilter != null) {
List<String> filterPaths = valueFilter.getAllPaths();
List<String> actualPaths = new ArrayList<>();
for (MeasurementSchema series : actualSeries) {
actualPaths.add(series.getMeasurementId());
}
//if filter paths doesn't in tsfile, don't query
if (!actualPaths.containsAll(filterPaths))
return new ArrayList<>();
}
flag = true;
Map<String, Set<String>> selectColumns = mergeColumns(singleQuery.getColumnFilterOperator());
if (!flag) {
//e.g. where column1 = 'd1' and column2 = 'd2', should not query
return new ArrayList<>();
}
//if select deltaObject, then match with measurement
if (!selectColumns.isEmpty()) {
combination(actualDeltaObjects, selectColumns, selectColumns.keySet().toArray(), 0, new String[selectColumns.size()]);
} else {
validDeltaObjects.addAll(in.getDeviceNameInRange(start, end));
}
} else {
validDeltaObjects.addAll(in.getDeviceNameInRange(start, end));
FilterOperator timeFilter = null;
FilterOperator valueFilter = null;
if (singleQuery != null) {
timeFilter = singleQuery.getTimeFilterOperator();
valueFilter = singleQuery.getValueFilterOperator();
if (valueFilter != null) {
List<String> filterPaths = valueFilter.getAllPaths();
List<String> actualPaths = new ArrayList<>();
for (MeasurementSchema series : actualSeries) {
actualPaths.add(series.getMeasurementId());
}
List<MeasurementSchema> fileSeries = in.readFileMetadata().getMeasurementSchemaList();
Set<String> seriesSet = new HashSet<>();
for (MeasurementSchema series : fileSeries) {
seriesSet.add(series.getMeasurementId());
}
//query all measurements from TSFile
if (selectedSeries.size() == 0) {
for (MeasurementSchema series : actualSeries) {
selectedSeries.add(series.getMeasurementId());
}
} else {
//remove paths that doesn't exist in file
selectedSeries.removeIf(path -> !seriesSet.contains(path));
//if filter paths doesn't in tsfile, don't query
if (!actualPaths.containsAll(filterPaths)) {
return new ArrayList<>();
}
}
flag = true;
Map<String, Set<String>> selectColumns = mergeColumns(singleQuery.getColumnFilterOperator());
if (!flag) {
//e.g. where column1 = 'd1' and column2 = 'd2', should not query
return new ArrayList<>();
}
//if select deltaObject, then match with measurement
if (!selectColumns.isEmpty()) {
combination(actualDeltaObjects, selectColumns, selectColumns.keySet().toArray(), 0,
new String[selectColumns.size()]);
} else {
validDeltaObjects.addAll(in.getDeviceNameInRange(start, end));
}
} else {
validDeltaObjects.addAll(in.getDeviceNameInRange(start, end));
}
List<TSQueryPlan> tsFileQueries = new ArrayList<>();
for (String deltaObject : validDeltaObjects) {
List<String> newPaths = new ArrayList<>();
for (String path : selectedSeries) {
String newPath = deltaObject + SQLConstant.PATH_SEPARATOR + path;
newPaths.add(newPath);
}
if (valueFilter == null) {
tsFileQueries.add(new TSQueryPlan(newPaths, timeFilter, null));
} else {
FilterOperator newValueFilter = valueFilter.clone();
newValueFilter.addHeadDeltaObjectPath(deltaObject);
tsFileQueries.add(new TSQueryPlan(newPaths, timeFilter, newValueFilter));
}
}
return tsFileQueries;
List<MeasurementSchema> fileSeries = in.readFileMetadata().getMeasurementSchemaList();
Set<String> seriesSet = new HashSet<>();
for (MeasurementSchema series : fileSeries) {
seriesSet.add(series.getMeasurementId());
}
//query all measurements from TSFile
if (selectedSeries.size() == 0) {
for (MeasurementSchema series : actualSeries) {
selectedSeries.add(series.getMeasurementId());
}
} else {
//remove paths that doesn't exist in file
selectedSeries.removeIf(path -> !seriesSet.contains(path));
}
/**
* calculate combinations of selected columns and add valid deltaObjects to validDeltaObjects
*
* @param actualDeltaObjects deltaObjects from file
* @param columnValues e.g. (device:{d1,d2}) (board:{c1,c2}) or (delta_object:{d1,d2})
* @param columns e.g. device, board
* @param beginIndex current recursion list index
* @param values combination of column values
*/
private void combination(List<String> actualDeltaObjects, Map<String, Set<String>> columnValues, Object[] columns, int beginIndex, String[] values) {
// which should in column names -> now just device_name
// use delta_object column
if (columnValues.containsKey(SQLConstant.RESERVED_DELTA_OBJECT)) {
Set<String> delta_objects = columnValues.get(SQLConstant.RESERVED_DELTA_OBJECT);
for (String delta_object : delta_objects) {
if (actualDeltaObjects.contains(delta_object))
validDeltaObjects.add(delta_object);
}
return;
List<TSQueryPlan> tsFileQueries = new ArrayList<>();
for (String deltaObject : validDeltaObjects) {
List<String> newPaths = new ArrayList<>();
for (String path : selectedSeries) {
String newPath = deltaObject + SQLConstant.PATH_SEPARATOR + path;
newPaths.add(newPath);
}
if (valueFilter == null) {
tsFileQueries.add(new TSQueryPlan(newPaths, timeFilter, null));
} else {
FilterOperator newValueFilter = valueFilter.clone();
newValueFilter.addHeadDeltaObjectPath(deltaObject);
tsFileQueries.add(new TSQueryPlan(newPaths, timeFilter, newValueFilter));
}
}
return tsFileQueries;
}
/**
* calculate combinations of selected columns and add valid deltaObjects to validDeltaObjects
*
* @param actualDeltaObjects deltaObjects from file
* @param columnValues e.g. (device:{d1,d2}) (board:{c1,c2}) or (delta_object:{d1,d2})
* @param columns e.g. device, board
* @param beginIndex current recursion list index
* @param values combination of column values
*/
private void combination(List<String> actualDeltaObjects, Map<String, Set<String>> columnValues,
Object[] columns, int beginIndex, String[] values) {
// which should in column names -> now just device_name
// use delta_object column
if (columnValues.containsKey(SQLConstant.RESERVED_DELTA_OBJECT)) {
Set<String> delta_objects = columnValues.get(SQLConstant.RESERVED_DELTA_OBJECT);
for (String delta_object : delta_objects) {
if (actualDeltaObjects.contains(delta_object)) {
validDeltaObjects.add(delta_object);
}
}
return;
}
if (beginIndex == columns.length) {
for (String deltaObject : actualDeltaObjects) {
boolean valid = true;
//if deltaObject is root.column1_value.column2_value then
//actualValues is [root, column1_value, column2_value]
String[] actualValues = deltaObject.split(SQLConstant.REGEX_PATH_SEPARATOR);
for (int i = 0; i < columns.length; i++) {
int columnIndex = columnNames.indexOf(columns[i].toString());
if (!actualValues[columnIndex].equals(values[i])) {
valid = false;
}
}
if (valid)
validDeltaObjects.add(deltaObject);
}
return;
if (beginIndex == columns.length) {
for (String deltaObject : actualDeltaObjects) {
boolean valid = true;
//if deltaObject is root.column1_value.column2_value then
//actualValues is [root, column1_value, column2_value]
String[] actualValues = deltaObject.split(SQLConstant.REGEX_PATH_SEPARATOR);
for (int i = 0; i < columns.length; i++) {
int columnIndex = columnNames.indexOf(columns[i].toString());
if (!actualValues[columnIndex].equals(values[i])) {
valid = false;
}
}
for (String c : columnValues.get(columns[beginIndex].toString())) {
values[beginIndex] = c;
combination(actualDeltaObjects, columnValues, columns, beginIndex + 1, values);
if (valid) {
validDeltaObjects.add(deltaObject);
}
}
return;
}
private Map<String, Set<String>> mergeColumns(List<FilterOperator> columnFilterOperators) {
Map<String, Set<String>> column_values_map = new HashMap<>();
for (FilterOperator filterOperator : columnFilterOperators) {
Pair<String, Set<String>> column_values = mergeColumn(filterOperator);
if (column_values != null && !column_values.right.isEmpty())
column_values_map.put(column_values.left, column_values.right);
}
return column_values_map;
for (String c : columnValues.get(columns[beginIndex].toString())) {
values[beginIndex] = c;
combination(actualDeltaObjects, columnValues, columns, beginIndex + 1, values);
}
/**
* merge one column filterOperator
*
* @param columnFilterOperator column filter
* @return selected values of the column filter
*/
private Pair<String, Set<String>> mergeColumn(FilterOperator columnFilterOperator) {
if (columnFilterOperator == null) {
return null;
}
if (columnFilterOperator.isLeaf()) {
// special case : not equal
if (columnFilterOperator.getTokenIntType() == SQLConstant.NOTEQUAL) {
return null;
}
//
Set<String> ret = new HashSet<>();
ret.add(((BasicOperator) columnFilterOperator).getSeriesValue());
return new Pair<>(columnFilterOperator.getSinglePath(), ret);
}
List<FilterOperator> children = columnFilterOperator.getChildren();
if (children == null || children.isEmpty()) {
return new Pair<>(null, new HashSet<>());
}
Pair<String, Set<String>> ret = mergeColumn(children.get(0));
if (ret == null) {
return null;
}
for (int i = 1; i < children.size(); i++) {
Pair<String, Set<String>> temp = mergeColumn(children.get(i));
if (temp == null) {
return null;
}
switch (columnFilterOperator.getTokenIntType()) {
case SQLConstant.KW_AND:
ret.right.retainAll(temp.right);
//example: "where device = d1 and device = d2" should not query data
if (ret.right.isEmpty()) {
flag = false;
}
break;
case SQLConstant.KW_OR:
ret.right.addAll(temp.right);
break;
default:
throw new UnsupportedOperationException("given error token type:" + columnFilterOperator.getTokenIntType());
}
}
return ret;
}
private Map<String, Set<String>> mergeColumns(List<FilterOperator> columnFilterOperators) {
Map<String, Set<String>> column_values_map = new HashMap<>();
for (FilterOperator filterOperator : columnFilterOperators) {
Pair<String, Set<String>> column_values = mergeColumn(filterOperator);
if (column_values != null && !column_values.right.isEmpty()) {
column_values_map.put(column_values.left, column_values.right);
}
}
return column_values_map;
}
/**
* merge one column filterOperator
*
* @param columnFilterOperator column filter
* @return selected values of the column filter
*/
private Pair<String, Set<String>> mergeColumn(FilterOperator columnFilterOperator) {
if (columnFilterOperator == null) {
return null;
}
if (columnFilterOperator.isLeaf()) {
// special case : not equal
if (columnFilterOperator.getTokenIntType() == SQLConstant.NOTEQUAL) {
return null;
}
//
Set<String> ret = new HashSet<>();
ret.add(((BasicOperator) columnFilterOperator).getSeriesValue());
return new Pair<>(columnFilterOperator.getSinglePath(), ret);
}
List<FilterOperator> children = columnFilterOperator.getChildren();
if (children == null || children.isEmpty()) {
return new Pair<>(null, new HashSet<>());
}
Pair<String, Set<String>> ret = mergeColumn(children.get(0));
if (ret == null) {
return null;
}
for (int i = 1; i < children.size(); i++) {
Pair<String, Set<String>> temp = mergeColumn(children.get(i));
if (temp == null) {
return null;
}
switch (columnFilterOperator.getTokenIntType()) {
case SQLConstant.KW_AND:
ret.right.retainAll(temp.right);
//example: "where device = d1 and device = d2" should not query data
if (ret.right.isEmpty()) {
flag = false;
}
break;
case SQLConstant.KW_OR:
ret.right.addAll(temp.right);
break;
default:
throw new UnsupportedOperationException(
"given error token type:" + columnFilterOperator.getTokenIntType());
}
}
return ret;
}
}
......@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
......@@ -19,86 +19,90 @@
package org.apache.iotdb.tsfile.qp.optimizer;
import static org.apache.iotdb.tsfile.qp.common.SQLConstant.KW_AND;
import static org.apache.iotdb.tsfile.qp.common.SQLConstant.KW_NOT;
import static org.apache.iotdb.tsfile.qp.common.SQLConstant.KW_OR;
import java.util.List;
import org.apache.iotdb.tsfile.qp.common.BasicOperator;
import org.apache.iotdb.tsfile.qp.common.FilterOperator;
import org.apache.iotdb.tsfile.qp.common.SQLConstant;
import org.apache.iotdb.tsfile.qp.exception.BasicOperatorException;
import org.apache.iotdb.tsfile.qp.exception.RemoveNotException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.iotdb.tsfile.qp.common.BasicOperator;
import org.apache.iotdb.tsfile.qp.common.SQLConstant;
import org.apache.iotdb.tsfile.qp.exception.BasicOperatorException;
import java.util.List;
import static org.apache.iotdb.tsfile.qp.common.SQLConstant.KW_AND;
import static org.apache.iotdb.tsfile.qp.common.SQLConstant.KW_NOT;
import static org.apache.iotdb.tsfile.qp.common.SQLConstant.KW_OR;
public class RemoveNotOptimizer implements IFilterOptimizer {
private static final Logger LOG = LoggerFactory.getLogger(RemoveNotOptimizer.class);
/**
* get DNF(disjunctive normal form) for this filter operator tree. Before getDNF, this op tree
* must be binary, in another word, each non-leaf node has exactly two children.
*
* @param filter filter operator to be optimized
* @return optimized filter operator
* @throws RemoveNotException excepiton in remove not optimizing
*/
@Override
public FilterOperator optimize(FilterOperator filter) throws RemoveNotException {
return removeNot(filter);
}
private static final Logger LOG = LoggerFactory.getLogger(RemoveNotOptimizer.class);
private FilterOperator removeNot(FilterOperator filter) throws RemoveNotException {
if (filter.isLeaf())
return filter;
int tokenInt = filter.getTokenIntType();
switch (tokenInt) {
case KW_AND:
case KW_OR:
// replace children in-place for efficiency
List<FilterOperator> children = filter.getChildren();
children.set(0, removeNot(children.get(0)));
children.set(1, removeNot(children.get(1)));
return filter;
case KW_NOT:
try {
return reverseFilter(filter.getChildren().get(0));
} catch (BasicOperatorException e) {
LOG.error("reverse Filter failed.");
}
default:
throw new RemoveNotException("Unknown token in removeNot: " + tokenInt + ","
+ SQLConstant.tokenNames.get(tokenInt));
/**
* get DNF(disjunctive normal form) for this filter operator tree. Before getDNF, this op tree
* must be binary, in another word, each non-leaf node has exactly two children.
*
* @param filter filter operator to be optimized
* @return optimized filter operator
* @throws RemoveNotException excepiton in remove not optimizing
*/
@Override
public FilterOperator optimize(FilterOperator filter) throws RemoveNotException {
return removeNot(filter);
}
private FilterOperator removeNot(FilterOperator filter) throws RemoveNotException {
if (filter.isLeaf()) {
return filter;
}
int tokenInt = filter.getTokenIntType();
switch (tokenInt) {
case KW_AND:
case KW_OR:
// replace children in-place for efficiency
List<FilterOperator> children = filter.getChildren();
children.set(0, removeNot(children.get(0)));
children.set(1, removeNot(children.get(1)));
return filter;
case KW_NOT:
try {
return reverseFilter(filter.getChildren().get(0));
} catch (BasicOperatorException e) {
LOG.error("reverse Filter failed.");
}
default:
throw new RemoveNotException("Unknown token in removeNot: " + tokenInt + ","
+ SQLConstant.tokenNames.get(tokenInt));
}
}
private FilterOperator reverseFilter(FilterOperator filter) throws RemoveNotException, BasicOperatorException {
int tokenInt = filter.getTokenIntType();
if (filter.isLeaf()) {
try {
((BasicOperator) filter).setReversedTokenIntType();
} catch (BasicOperatorException e) {
throw new RemoveNotException(
"convert BasicFuntion to reserved meet failed: previous token:" + tokenInt
+ "tokenSymbol:" + SQLConstant.tokenNames.get(tokenInt));
}
return filter;
}
switch (tokenInt) {
case KW_AND:
case KW_OR:
List<FilterOperator> children = filter.getChildren();
children.set(0, reverseFilter(children.get(0)));
children.set(1, reverseFilter(children.get(1)));
filter.setTokenIntType(SQLConstant.reverseWords.get(tokenInt));
return filter;
case KW_NOT:
return removeNot(filter.getChildren().get(0));
default:
throw new RemoveNotException("Unknown token in reverseFilter: " + tokenInt + ","
+ SQLConstant.tokenNames.get(tokenInt));
}
private FilterOperator reverseFilter(FilterOperator filter)
throws RemoveNotException, BasicOperatorException {
int tokenInt = filter.getTokenIntType();
if (filter.isLeaf()) {
try {
((BasicOperator) filter).setReversedTokenIntType();
} catch (BasicOperatorException e) {
throw new RemoveNotException(
"convert BasicFuntion to reserved meet failed: previous token:" + tokenInt
+ "tokenSymbol:" + SQLConstant.tokenNames.get(tokenInt));
}
return filter;
}
switch (tokenInt) {
case KW_AND:
case KW_OR:
List<FilterOperator> children = filter.getChildren();
children.set(0, reverseFilter(children.get(0)));
children.set(1, reverseFilter(children.get(1)));
filter.setTokenIntType(SQLConstant.reverseWords.get(tokenInt));
return filter;
case KW_NOT:
return removeNot(filter.getChildren().get(0));
default:
throw new RemoveNotException("Unknown token in reverseFilter: " + tokenInt + ","
+ SQLConstant.tokenNames.get(tokenInt));
}
}
}
......@@ -20,14 +20,8 @@ package org.apache.iotdb.tsfile
import java.util
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor
import org.apache.iotdb.tsfile.common.constant.QueryConstant
import org.apache.iotdb.tsfile.file.metadata.enums.{TSDataType, TSEncoding}
import org.apache.iotdb.tsfile.read.common.{Field, Path}
import org.apache.iotdb.tsfile.write.record.TSRecord
import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint
import org.apache.iotdb.tsfile.write.schema.{MeasurementSchema, Schema, SchemaBuilder}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType
import org.apache.iotdb.tsfile.read.common.Field
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
......@@ -98,8 +92,8 @@ abstract class Converter {
case t: StructType => Some(t)
case _ => throw new RuntimeException(
s"""TSFile schema cannot be converted to a Spark SQL StructType:
|${tsfileSchema.toString}
|""".stripMargin)
|${tsfileSchema.toString}
|""".stripMargin)
}
}
......
......@@ -56,7 +56,8 @@ private[tsfile] class DefaultSource extends FileFormat with DataSourceRegister {
val conf = spark.sparkContext.hadoopConfiguration
//check if the path is given
options.getOrElse(DefaultSource.path, throw new TSFileDataSourceException(s"${DefaultSource.path} must be specified for org.apache.iotdb.tsfile DataSource"))
options.getOrElse(DefaultSource.path, throw new TSFileDataSourceException(
s"${DefaultSource.path} must be specified for org.apache.iotdb.tsfile DataSource"))
if (options.getOrElse(DefaultSource.isNarrowForm, "").equals("narrow_form")) {
val tsfileSchema = NarrowConverter.getUnionSeries(files, conf)
......@@ -86,13 +87,15 @@ private[tsfile] class DefaultSource extends FileFormat with DataSourceRegister {
requiredSchema: StructType,
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = {
hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow]
= {
val broadcastedConf =
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
(file: PartitionedFile) => {
val log = LoggerFactory.getLogger(classOf[DefaultSource])
log.info("This partition starts from " + file.start.asInstanceOf[java.lang.Long] + " and ends at " + (file.start + file.length).asInstanceOf[java.lang.Long])
log.info("This partition starts from " + file.start.asInstanceOf[java.lang.Long]
+ " and ends at " + (file.start + file.length).asInstanceOf[java.lang.Long])
log.info(file.toString())
val conf = broadcastedConf.value.value
......@@ -100,25 +103,30 @@ private[tsfile] class DefaultSource extends FileFormat with DataSourceRegister {
val reader: TsFileSequenceReader = new TsFileSequenceReader(in)
Option(TaskContext.get()).foreach { taskContext => {
taskContext.addTaskCompletionListener { _ => in.close() }
log.info("task Id: " + taskContext.taskAttemptId() + " partition Id: " + taskContext.partitionId())
}
}
val tsFileMetaData = reader.readFileMetadata
// get queriedSchema from requiredSchema
var queriedSchema = WideConverter.prepSchema(requiredSchema, tsFileMetaData)
val readTsFile: ReadOnlyTsFile = new ReadOnlyTsFile(reader)
Option(TaskContext.get()).foreach { taskContext => {
taskContext.addTaskCompletionListener { _ => readTsFile.close() }
log.info("task Id: " + taskContext.taskAttemptId() + " partition Id: " +
taskContext.partitionId())
}
}
if (options.getOrElse(DefaultSource.isNarrowForm, "").equals("narrow_form")) {
val device_names = tsFileMetaData.getDeviceMap.keySet()
val measurement_names = tsFileMetaData.getMeasurementSchema.keySet()
// construct queryExpression based on queriedSchema and filters
val queryExpressions = NarrowConverter.toQueryExpression(dataSchema, device_names, measurement_names, filters, reader, file.start.asInstanceOf[java.lang.Long], (file.start + file.length).asInstanceOf[java.lang.Long])
val queryExpressions = NarrowConverter.toQueryExpression(dataSchema, device_names,
measurement_names, filters, reader, file.start.asInstanceOf[java.lang.Long],
(file.start + file.length).asInstanceOf[java.lang.Long])
val queryDataSets = Executor.query(readTsFile, queryExpressions,
file.start.asInstanceOf[java.lang.Long],
(file.start + file.length).asInstanceOf[java.lang.Long])
val queryDataSets = Executor.query(readTsFile, queryExpressions, file.start.asInstanceOf[java.lang.Long], (file.start + file.length).asInstanceOf[java.lang.Long])
var queryDataSet: QueryDataSet = null
var device_name: String = null
......@@ -155,8 +163,6 @@ private[tsfile] class DefaultSource extends FileFormat with DataSourceRegister {
}
override def next(): InternalRow = {
queryNext()
val curRecord = queryDataSet.next()
val fields = curRecord.getFields
val paths = queryDataSet.getPaths
......@@ -171,7 +177,8 @@ private[tsfile] class DefaultSource extends FileFormat with DataSourceRegister {
rowBuffer(index) = device_name
}
else {
val pos = paths.indexOf(new org.apache.iotdb.tsfile.read.common.Path(device_name, field.name))
val pos = paths.indexOf(new org.apache.iotdb.tsfile.read.common.Path(device_name,
field.name))
var curField: Field = null
if (pos != -1) {
curField = fields.get(pos)
......@@ -186,11 +193,15 @@ private[tsfile] class DefaultSource extends FileFormat with DataSourceRegister {
}
}
else {
// get queriedSchema from requiredSchema
var queriedSchema = WideConverter.prepSchema(requiredSchema, tsFileMetaData)
// construct queryExpression based on queriedSchema and filters
val queryExpression = WideConverter.toQueryExpression(queriedSchema, filters)
val queryDataSet = readTsFile.query(queryExpression, file.start.asInstanceOf[java.lang.Long],
val queryDataSet = readTsFile.query(queryExpression,
file.start.asInstanceOf[java.lang.Long],
(file.start + file.length).asInstanceOf[java.lang.Long])
new Iterator[InternalRow] {
......
......@@ -22,7 +22,6 @@ import java.util
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileStatus
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor
import org.apache.iotdb.tsfile.common.constant.QueryConstant
import org.apache.iotdb.tsfile.file.metadata.TsFileMetaData
import org.apache.iotdb.tsfile.file.metadata.enums.{TSDataType, TSEncoding}
......@@ -30,15 +29,13 @@ import org.apache.iotdb.tsfile.io.HDFSInput
import org.apache.iotdb.tsfile.qp.QueryProcessor
import org.apache.iotdb.tsfile.qp.common.{BasicOperator, FilterOperator, SQLConstant, TSQueryPlan}
import org.apache.iotdb.tsfile.read.TsFileSequenceReader
import org.apache.iotdb.tsfile.read.common.{Field, Path}
import org.apache.iotdb.tsfile.read.common.Path
import org.apache.iotdb.tsfile.read.expression.impl.{BinaryExpression, GlobalTimeExpression, SingleSeriesExpression}
import org.apache.iotdb.tsfile.read.expression.{IExpression, QueryExpression}
import org.apache.iotdb.tsfile.read.filter.{TimeFilter, ValueFilter}
import org.apache.iotdb.tsfile.utils.Binary
import org.apache.iotdb.tsfile.write.record.TSRecord
import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint
import org.apache.iotdb.tsfile.write.schema.{MeasurementSchema, Schema, SchemaBuilder}
import org.apache.parquet.filter2.predicate.Operators.NotEq
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
......@@ -48,7 +45,8 @@ import scala.collection.mutable
import scala.collection.mutable.ListBuffer
/**
* This object contains methods that are used to convert schema and data between SparkSQL and TSFile.
* This object contains methods that are used to convert schema and data between SparkSQL
* and TSFile.
*
*/
object NarrowConverter extends Converter {
......@@ -81,7 +79,7 @@ object NarrowConverter extends Converter {
}
})
in.close()
reader.close()
})
unionSeries
......@@ -95,7 +93,8 @@ object NarrowConverter extends Converter {
* @param addTimeField true to add a time field; false to not
* @return the converted list of fields
*/
override def toSqlField(tsfileSchema: util.ArrayList[Series], addTimeField: Boolean): ListBuffer[StructField] = {
override def toSqlField(tsfileSchema: util.ArrayList[Series], addTimeField: Boolean):
ListBuffer[StructField] = {
val fields = new ListBuffer[StructField]()
if (addTimeField) {
......@@ -129,7 +128,8 @@ object NarrowConverter extends Converter {
var queriedSchema: StructType = new StructType()
if (requiredSchema.isEmpty
|| (requiredSchema.size == 1 && requiredSchema.iterator.next().name == QueryConstant.RESERVED_TIME)) {
|| (requiredSchema.size == 1 && requiredSchema.iterator.next().name ==
QueryConstant.RESERVED_TIME)) {
// for example, (i) select count(*) from table; (ii) select time from table
val fileSchema = WideConverter.getSeries(tsFileMetaData)
......@@ -229,7 +229,8 @@ object NarrowConverter extends Converter {
}
if (valueFilter != null) {
if (finalFilter != null) {
finalFilter = BinaryExpression.and(finalFilter, transformFilterToExpression(schema, valueFilter, deviceName))
finalFilter = BinaryExpression.and(finalFilter, transformFilterToExpression(schema,
valueFilter, deviceName))
}
else {
finalFilter = transformFilterToExpression(schema, valueFilter, deviceName)
......@@ -274,7 +275,8 @@ object NarrowConverter extends Converter {
operator
case node: LessThanOrEqual =>
operator = new BasicOperator(SQLConstant.LESSTHANOREQUALTO, node.attribute, node.value.toString)
operator = new BasicOperator(SQLConstant.LESSTHANOREQUALTO, node.attribute,
node.value.toString)
operator
case node: GreaterThan =>
......@@ -282,7 +284,8 @@ object NarrowConverter extends Converter {
operator
case node: GreaterThanOrEqual =>
operator = new BasicOperator(SQLConstant.GREATERTHANOREQUALTO, node.attribute, node.value.toString)
operator = new BasicOperator(SQLConstant.GREATERTHANOREQUALTO, node.attribute,
node.value.toString)
operator
case _ =>
......@@ -297,7 +300,8 @@ object NarrowConverter extends Converter {
* @param node filter tree's node
* @return TSFile filter expression
*/
private def transformFilterToExpression(schema: StructType, node: FilterOperator, device_name: String): IExpression = {
private def transformFilterToExpression(schema: StructType, node: FilterOperator,
device_name: String): IExpression = {
var filter: IExpression = null
node.getTokenIntType match {
case SQLConstant.KW_NOT =>
......@@ -309,7 +313,8 @@ object NarrowConverter extends Converter {
filter = transformFilterToExpression(schema, child, device_name)
}
else {
filter = BinaryExpression.and(filter, transformFilterToExpression(schema, child, device_name))
filter = BinaryExpression.and(filter, transformFilterToExpression(schema, child,
device_name))
}
})
filter
......@@ -320,7 +325,8 @@ object NarrowConverter extends Converter {
filter = transformFilterToExpression(schema, child, device_name)
}
else {
filter = BinaryExpression.or(filter, transformFilterToExpression(schema, child, device_name))
filter = BinaryExpression.or(filter, transformFilterToExpression(schema, child,
device_name))
}
})
filter
......@@ -329,45 +335,55 @@ object NarrowConverter extends Converter {
case SQLConstant.EQUAL =>
val basicOperator = node.asInstanceOf[BasicOperator]
if (QueryConstant.RESERVED_TIME.equals(basicOperator.getSeriesPath.toLowerCase())) {
filter = new GlobalTimeExpression(TimeFilter.eq(java.lang.Long.parseLong(basicOperator.getSeriesValue)))
filter = new GlobalTimeExpression(TimeFilter.eq(java.lang.Long.parseLong(
basicOperator.getSeriesValue)))
} else {
filter = constructExpression(schema, basicOperator.getSeriesPath, basicOperator.getSeriesValue, FilterTypes.Eq, device_name)
filter = constructExpression(schema, basicOperator.getSeriesPath,
basicOperator.getSeriesValue, FilterTypes.Eq, device_name)
}
filter
case SQLConstant.LESSTHAN =>
val basicOperator = node.asInstanceOf[BasicOperator]
if (QueryConstant.RESERVED_TIME.equals(basicOperator.getSeriesPath.toLowerCase())) {
filter = new GlobalTimeExpression(TimeFilter.lt(java.lang.Long.parseLong(basicOperator.getSeriesValue)))
filter = new GlobalTimeExpression(TimeFilter.lt(java.lang.Long.parseLong(
basicOperator.getSeriesValue)))
} else {
filter = constructExpression(schema, basicOperator.getSeriesPath, basicOperator.getSeriesValue, FilterTypes.Lt, device_name)
filter = constructExpression(schema, basicOperator.getSeriesPath,
basicOperator.getSeriesValue, FilterTypes.Lt, device_name)
}
filter
case SQLConstant.LESSTHANOREQUALTO =>
val basicOperator = node.asInstanceOf[BasicOperator]
if (QueryConstant.RESERVED_TIME.equals(basicOperator.getSeriesPath.toLowerCase())) {
filter = new GlobalTimeExpression(TimeFilter.ltEq(java.lang.Long.parseLong(basicOperator.getSeriesValue)))
filter = new GlobalTimeExpression(TimeFilter.ltEq(java.lang.Long.parseLong(
basicOperator.getSeriesValue)))
} else {
filter = constructExpression(schema, basicOperator.getSeriesPath, basicOperator.getSeriesValue, FilterTypes.LtEq, device_name)
filter = constructExpression(schema, basicOperator.getSeriesPath,
basicOperator.getSeriesValue, FilterTypes.LtEq, device_name)
}
filter
case SQLConstant.GREATERTHAN =>
val basicOperator = node.asInstanceOf[BasicOperator]
if (QueryConstant.RESERVED_TIME.equals(basicOperator.getSeriesPath.toLowerCase())) {
filter = new GlobalTimeExpression(TimeFilter.gt(java.lang.Long.parseLong(basicOperator.getSeriesValue)))
filter = new GlobalTimeExpression(TimeFilter.gt(java.lang.Long.parseLong(
basicOperator.getSeriesValue)))
} else {
filter = constructExpression(schema, basicOperator.getSeriesPath, basicOperator.getSeriesValue, FilterTypes.Gt, device_name)
filter = constructExpression(schema, basicOperator.getSeriesPath,
basicOperator.getSeriesValue, FilterTypes.Gt, device_name)
}
filter
case SQLConstant.GREATERTHANOREQUALTO =>
val basicOperator = node.asInstanceOf[BasicOperator]
if (QueryConstant.RESERVED_TIME.equals(basicOperator.getSeriesPath.toLowerCase())) {
filter = new GlobalTimeExpression(TimeFilter.gtEq(java.lang.Long.parseLong(basicOperator.getSeriesValue)))
filter = new GlobalTimeExpression(TimeFilter.gtEq(java.lang.Long.parseLong(
basicOperator.getSeriesValue)))
} else {
filter = constructExpression(schema, basicOperator.getSeriesPath, basicOperator.getSeriesValue, FilterTypes.GtEq, device_name)
filter = constructExpression(schema, basicOperator.getSeriesPath,
basicOperator.getSeriesValue, FilterTypes.GtEq, device_name)
}
filter
......@@ -377,12 +393,14 @@ object NarrowConverter extends Converter {
}
def constructExpression(schema: StructType, nodeName: String, nodeValue: String, filterType: FilterTypes.Value, device_name: String): IExpression = {
def constructExpression(schema: StructType, nodeName: String, nodeValue: String,
filterType: FilterTypes.Value, device_name: String): IExpression = {
val fieldNames = schema.fieldNames
val index = fieldNames.indexOf(nodeName)
if (index == -1) {
// placeholder for an invalid filter in the current TsFile
val filter = new SingleSeriesExpression(new Path(device_name + SQLConstant.PATH_SEPARATOR + nodeName), null)
val filter = new SingleSeriesExpression(new Path(device_name +
SQLConstant.PATH_SEPARATOR + nodeName), null)
filter
} else {
val dataType = schema.get(index).dataType
......@@ -391,27 +409,33 @@ object NarrowConverter extends Converter {
case FilterTypes.Eq =>
dataType match {
case BooleanType =>
val filter = new SingleSeriesExpression(new Path(device_name + SQLConstant.PATH_SEPARATOR + nodeName),
val filter = new SingleSeriesExpression(new Path(device_name +
SQLConstant.PATH_SEPARATOR + nodeName),
ValueFilter.eq(new java.lang.Boolean(nodeValue)))
filter
case IntegerType =>
val filter = new SingleSeriesExpression(new Path(device_name + SQLConstant.PATH_SEPARATOR + nodeName),
val filter = new SingleSeriesExpression(new Path(device_name +
SQLConstant.PATH_SEPARATOR + nodeName),
ValueFilter.eq(new java.lang.Integer(nodeValue)))
filter
case LongType =>
val filter = new SingleSeriesExpression(new Path(device_name + SQLConstant.PATH_SEPARATOR + nodeName),
val filter = new SingleSeriesExpression(new Path(device_name +
SQLConstant.PATH_SEPARATOR + nodeName),
ValueFilter.eq(new java.lang.Long(nodeValue)))
filter
case FloatType =>
val filter = new SingleSeriesExpression(new Path(device_name + SQLConstant.PATH_SEPARATOR + nodeName),
val filter = new SingleSeriesExpression(new Path(device_name +
SQLConstant.PATH_SEPARATOR + nodeName),
ValueFilter.eq(new java.lang.Float(nodeValue)))
filter
case DoubleType =>
val filter = new SingleSeriesExpression(new Path(device_name + SQLConstant.PATH_SEPARATOR + nodeName),
val filter = new SingleSeriesExpression(new Path(device_name +
SQLConstant.PATH_SEPARATOR + nodeName),
ValueFilter.eq(new java.lang.Double(nodeValue)))
filter
case StringType =>
val filter = new SingleSeriesExpression(new Path(device_name + SQLConstant.PATH_SEPARATOR + nodeName),
val filter = new SingleSeriesExpression(new Path(device_name +
SQLConstant.PATH_SEPARATOR + nodeName),
ValueFilter.eq(nodeValue))
filter
case other => throw new UnsupportedOperationException(s"Unsupported type $other")
......@@ -419,19 +443,23 @@ object NarrowConverter extends Converter {
case FilterTypes.Gt =>
dataType match {
case IntegerType =>
val filter = new SingleSeriesExpression(new Path(device_name + SQLConstant.PATH_SEPARATOR + nodeName),
val filter = new SingleSeriesExpression(new Path(device_name +
SQLConstant.PATH_SEPARATOR + nodeName),
ValueFilter.gt(new java.lang.Integer(nodeValue)))
filter
case LongType =>
val filter = new SingleSeriesExpression(new Path(device_name + SQLConstant.PATH_SEPARATOR + nodeName),
val filter = new SingleSeriesExpression(new Path(device_name +
SQLConstant.PATH_SEPARATOR + nodeName),
ValueFilter.gt(new java.lang.Long(nodeValue)))
filter
case FloatType =>
val filter = new SingleSeriesExpression(new Path(device_name + SQLConstant.PATH_SEPARATOR + nodeName),
val filter = new SingleSeriesExpression(new Path(device_name +
SQLConstant.PATH_SEPARATOR + nodeName),
ValueFilter.gt(new java.lang.Float(nodeValue)))
filter
case DoubleType =>
val filter = new SingleSeriesExpression(new Path(device_name + SQLConstant.PATH_SEPARATOR + nodeName),
val filter = new SingleSeriesExpression(new Path(device_name +
SQLConstant.PATH_SEPARATOR + nodeName),
ValueFilter.gt(new java.lang.Double(nodeValue)))
filter
case other => throw new UnsupportedOperationException(s"Unsupported type $other")
......@@ -439,19 +467,23 @@ object NarrowConverter extends Converter {
case FilterTypes.GtEq =>
dataType match {
case IntegerType =>
val filter = new SingleSeriesExpression(new Path(device_name + SQLConstant.PATH_SEPARATOR + nodeName),
val filter = new SingleSeriesExpression(new Path(device_name +
SQLConstant.PATH_SEPARATOR + nodeName),
ValueFilter.gtEq(new java.lang.Integer(nodeValue)))
filter
case LongType =>
val filter = new SingleSeriesExpression(new Path(device_name + SQLConstant.PATH_SEPARATOR + nodeName),
val filter = new SingleSeriesExpression(new Path(device_name +
SQLConstant.PATH_SEPARATOR + nodeName),
ValueFilter.gtEq(new java.lang.Long(nodeValue)))
filter
case FloatType =>
val filter = new SingleSeriesExpression(new Path(device_name + SQLConstant.PATH_SEPARATOR + nodeName),
val filter = new SingleSeriesExpression(new Path(device_name +
SQLConstant.PATH_SEPARATOR + nodeName),
ValueFilter.gtEq(new java.lang.Float(nodeValue)))
filter
case DoubleType =>
val filter = new SingleSeriesExpression(new Path(device_name + SQLConstant.PATH_SEPARATOR + nodeName),
val filter = new SingleSeriesExpression(new Path(device_name +
SQLConstant.PATH_SEPARATOR + nodeName),
ValueFilter.gtEq(new java.lang.Double(nodeValue)))
filter
case other => throw new UnsupportedOperationException(s"Unsupported type $other")
......@@ -459,19 +491,23 @@ object NarrowConverter extends Converter {
case FilterTypes.Lt =>
dataType match {
case IntegerType =>
val filter = new SingleSeriesExpression(new Path(device_name + SQLConstant.PATH_SEPARATOR + nodeName),
val filter = new SingleSeriesExpression(new Path(device_name +
SQLConstant.PATH_SEPARATOR + nodeName),
ValueFilter.lt(new java.lang.Integer(nodeValue)))
filter
case LongType =>
val filter = new SingleSeriesExpression(new Path(device_name + SQLConstant.PATH_SEPARATOR + nodeName),
val filter = new SingleSeriesExpression(new Path(device_name +
SQLConstant.PATH_SEPARATOR + nodeName),
ValueFilter.lt(new java.lang.Long(nodeValue)))
filter
case FloatType =>
val filter = new SingleSeriesExpression(new Path(device_name + SQLConstant.PATH_SEPARATOR + nodeName),
val filter = new SingleSeriesExpression(new Path(device_name +
SQLConstant.PATH_SEPARATOR + nodeName),
ValueFilter.lt(new java.lang.Float(nodeValue)))
filter
case DoubleType =>
val filter = new SingleSeriesExpression(new Path(device_name + SQLConstant.PATH_SEPARATOR + nodeName),
val filter = new SingleSeriesExpression(new Path(device_name +
SQLConstant.PATH_SEPARATOR + nodeName),
ValueFilter.lt(new java.lang.Double(nodeValue)))
filter
case other => throw new UnsupportedOperationException(s"Unsupported type $other")
......@@ -479,19 +515,23 @@ object NarrowConverter extends Converter {
case FilterTypes.LtEq =>
dataType match {
case IntegerType =>
val filter = new SingleSeriesExpression(new Path(device_name + SQLConstant.PATH_SEPARATOR + nodeName),
val filter = new SingleSeriesExpression(new Path(device_name +
SQLConstant.PATH_SEPARATOR + nodeName),
ValueFilter.ltEq(new java.lang.Integer(nodeValue)))
filter
case LongType =>
val filter = new SingleSeriesExpression(new Path(device_name + SQLConstant.PATH_SEPARATOR + nodeName),
val filter = new SingleSeriesExpression(new Path(device_name +
SQLConstant.PATH_SEPARATOR + nodeName),
ValueFilter.ltEq(new java.lang.Long(nodeValue)))
filter
case FloatType =>
val filter = new SingleSeriesExpression(new Path(device_name + SQLConstant.PATH_SEPARATOR + nodeName),
val filter = new SingleSeriesExpression(new Path(device_name +
SQLConstant.PATH_SEPARATOR + nodeName),
ValueFilter.ltEq(new java.lang.Float(nodeValue)))
filter
case DoubleType =>
val filter = new SingleSeriesExpression(new Path(device_name + SQLConstant.PATH_SEPARATOR + nodeName),
val filter = new SingleSeriesExpression(new Path(device_name +
SQLConstant.PATH_SEPARATOR + nodeName),
ValueFilter.ltEq(new java.lang.Double(nodeValue)))
filter
case other => throw new UnsupportedOperationException(s"Unsupported type $other")
......
......@@ -22,9 +22,9 @@ import org.apache.hadoop.io.NullWritable
import org.apache.hadoop.mapreduce.{RecordWriter, TaskAttemptContext}
import org.apache.iotdb.tsfile.io.TsFileOutputFormat
import org.apache.iotdb.tsfile.write.record.TSRecord
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.OutputWriter
import org.apache.spark.sql.types._
import org.apache.spark.sql.catalyst.InternalRow
private[tsfile] class NarrowTsFileOutputWriter(
pathStr: String,
......
......@@ -19,8 +19,8 @@
package org.apache.iotdb.tsfile
import org.apache.iotdb.tsfile.common.constant.QueryConstant
import org.apache.spark.sql.{SparkSession, _}
import org.apache.spark.sql.types._
import org.apache.spark.sql.{SparkSession, _}
object Transformer {
......
......@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
......@@ -22,20 +22,19 @@ import java.util
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileStatus
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor
import org.apache.iotdb.tsfile.common.constant.QueryConstant
import org.apache.iotdb.tsfile.file.metadata.TsFileMetaData
import org.apache.iotdb.tsfile.file.metadata.enums.{TSDataType, TSEncoding}
import org.apache.iotdb.tsfile.io.HDFSInput
import org.apache.iotdb.tsfile.read.TsFileSequenceReader
import org.apache.iotdb.tsfile.read.common.{Field, Path}
import org.apache.iotdb.tsfile.read.common.Path
import org.apache.iotdb.tsfile.read.expression.impl.{BinaryExpression, GlobalTimeExpression, SingleSeriesExpression}
import org.apache.iotdb.tsfile.read.expression.{IExpression, QueryExpression}
import org.apache.iotdb.tsfile.read.filter.{TimeFilter, ValueFilter}
import org.apache.iotdb.tsfile.utils.Binary
import org.apache.iotdb.tsfile.write.record.TSRecord
import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint
import org.apache.iotdb.tsfile.write.schema.{Schema, MeasurementSchema, SchemaBuilder}
import org.apache.iotdb.tsfile.write.schema.{MeasurementSchema, Schema, SchemaBuilder}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
......@@ -46,10 +45,11 @@ import scala.collection.mutable.ListBuffer
/**
* This object contains methods that are used to convert schema and data between SparkSQL and TSFile.
* This object contains methods that are used to convert schema and data between SparkSQL
* and TSFile.
*
*/
object WideConverter extends Converter{
object WideConverter extends Converter {
/**
* Get series from the given tsFileMetaData.
......@@ -103,7 +103,7 @@ object WideConverter extends Converter{
}
})
})
in.close()
reader.close()
})
unionSeries
......@@ -120,7 +120,8 @@ object WideConverter extends Converter{
var queriedSchema: StructType = new StructType()
if (requiredSchema.isEmpty
|| (requiredSchema.size == 1 && requiredSchema.iterator.next().name == QueryConstant.RESERVED_TIME)) {
|| (requiredSchema.size == 1 && requiredSchema.iterator.next().name ==
QueryConstant.RESERVED_TIME)) {
// for example, (i) select count(*) from table; (ii) select time from table
val fileSchema = WideConverter.getSeries(tsFileMetaData)
......@@ -196,7 +197,8 @@ object WideConverter extends Converter{
* @param addTimeField true to add a time field; false to not
* @return the converted list of fields
*/
def toSqlField(tsfileSchema: util.ArrayList[Series], addTimeField: Boolean): ListBuffer[StructField] = {
def toSqlField(tsfileSchema: util.ArrayList[Series], addTimeField: Boolean):
ListBuffer[StructField] = {
val fields = new ListBuffer[StructField]()
if (addTimeField) {
......@@ -232,11 +234,13 @@ object WideConverter extends Converter{
throw new Exception("NOT filter is not supported now")
case node: And =>
filter = BinaryExpression.and(transformFilter(schema, node.left), transformFilter(schema, node.right))
filter = BinaryExpression.and(transformFilter(schema, node.left), transformFilter(schema,
node.right))
filter
case node: Or =>
filter = BinaryExpression.or(transformFilter(schema, node.left), transformFilter(schema, node.right))
filter = BinaryExpression.or(transformFilter(schema, node.left), transformFilter(schema,
node.right))
filter
case node: EqualTo =>
......@@ -284,7 +288,8 @@ object WideConverter extends Converter{
}
}
def constructFilter(schema: StructType, nodeName: String, nodeValue: Any, filterType: FilterTypes.Value): IExpression = {
def constructFilter(schema: StructType, nodeName: String, nodeValue: Any,
filterType: FilterTypes.Value): IExpression = {
val fieldNames = schema.fieldNames
val index = fieldNames.indexOf(nodeName)
if (index == -1) {
......
......@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
......@@ -22,15 +22,15 @@ import org.apache.hadoop.io.NullWritable
import org.apache.hadoop.mapreduce.{RecordWriter, TaskAttemptContext}
import org.apache.iotdb.tsfile.io.TsFileOutputFormat
import org.apache.iotdb.tsfile.write.record.TSRecord
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.OutputWriter
import org.apache.spark.sql.types._
import org.apache.spark.sql.catalyst.InternalRow
private[tsfile] class WideTsFileOutputWriter(
pathStr: String,
dataSchema: StructType,
options: Map[String, String],
context: TaskAttemptContext) extends OutputWriter {
pathStr: String,
dataSchema: StructType,
options: Map[String, String],
context: TaskAttemptContext) extends OutputWriter {
private val recordWriter: RecordWriter[NullWritable, TSRecord] = {
val fileSchema = WideConverter.toTsFileSchema(dataSchema, options)
......
......@@ -19,19 +19,21 @@
package org.apache.iotdb
import org.apache.spark.sql.{DataFrame, DataFrameReader, DataFrameWriter, SparkSession}
import org.apache.spark.sql.{DataFrame, DataFrameReader, DataFrameWriter}
package object tsfile {
/**
* add a method 'tsfile' to DataFrameReader to read tsfile
*
* @param reader dataframeReader
*/
implicit class TsFileDataFrameReader(reader: DataFrameReader) {
def tsfile(path: String,
isNarrowForm: Boolean = false): DataFrame = {
if (isNarrowForm) {
reader.option(DefaultSource.path, path).option(DefaultSource.isNarrowForm, "narrow_form").format("org.apache.iotdb.tsfile").load
reader.option(DefaultSource.path, path).option(DefaultSource.isNarrowForm, "narrow_form").
format("org.apache.iotdb.tsfile").load
}
else {
reader.option(DefaultSource.path, path).format("org.apache.iotdb.tsfile").load
......@@ -46,7 +48,8 @@ package object tsfile {
def tsfile(path: String,
isNarrowForm: Boolean = false): Unit = {
if (isNarrowForm) {
writer.option(DefaultSource.path, path).option(DefaultSource.isNarrowForm, "narrow_form").format("org.apache.iotdb.tsfile").save
writer.option(DefaultSource.path, path).option(DefaultSource.isNarrowForm, "narrow_form").
format("org.apache.iotdb.tsfile").save
}
else {
writer.option(DefaultSource.path, path).format("org.apache.iotdb.tsfile").save
......
......@@ -19,6 +19,7 @@
package org.apache.iotdb.tool;
import java.io.File;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.utils.Binary;
......@@ -212,4 +213,58 @@ public class TsFileWriteTool {
// close TsFile
tsFileWriter.close();
}
/**
* Create a tsfile that contains data from device_1 and device_2;
* device_1's chunkgroups and device_2's chunkgroups are interleaved.
*
* Below is the created tsfile's statistics in one case. Note that the absolute values may change
* under different tsfile versions or hardware environments.
*
* | space pos | device.chunkgroup | time range of this chunkgroup |
* | [12,290545] | device_1.chunkgroup_1 | [0,131044] |
* | [290545,1042103] | device_2.chunkgroup_1 | [0,262088] |
* | [1042103,1345041] | device_1.chunkgroup_2 | [131045,254915] |
* | [1345041,2110092] | device_2.chunkgroup_2 | [262090,509830] |
* | [2110092,2421676] | device_1.chunkgroup_3 | [254916,376146] |
* | [2421676,3194704] | device_2.chunkgroup_3 | [509832,752292] |
* | [3194704,3256098] | device_1.chunkgroup_4 | [376147,399999] |
* | [3256098,3410323] | device_2.chunkgroup_4 | [752294,799998] |
*
*/
public void create4(String tsfilePath) throws Exception {
TSFileDescriptor.getInstance().getConfig().groupSizeInByte = 1024 * 1024;
File f = new File(tsfilePath);
if (f.exists()) {
f.delete();
}
TsFileWriter tsFileWriter = new TsFileWriter(f);
tsFileWriter
.addMeasurement(new MeasurementSchema("sensor_1", TSDataType.INT32, TSEncoding.RLE));
tsFileWriter
.addMeasurement(new MeasurementSchema("sensor_2", TSDataType.FLOAT, TSEncoding.RLE));
tsFileWriter
.addMeasurement(new MeasurementSchema("sensor_3", TSDataType.BOOLEAN, TSEncoding.RLE));
int j = 0;
for (int i = 0; i < 400000; i++) {
TSRecord tsRecord_d1 = new TSRecord(i, "device_1");
DataPoint dPoint = new IntDataPoint("sensor_1", i);
tsRecord_d1.addTuple(dPoint);
tsFileWriter.write(tsRecord_d1);
j = i * 2; // mimic devices at two different sample rates
TSRecord tsRecord_d2 = new TSRecord(j, "device_2");
DataPoint dPoint1 = new IntDataPoint("sensor_1", j);
DataPoint dPoint2 = new FloatDataPoint("sensor_2", (float) j);
DataPoint dPoint3 = new BooleanDataPoint("sensor_3", j % 2 == 0);
tsRecord_d2.addTuple(dPoint1);
tsRecord_d2.addTuple(dPoint2);
tsRecord_d2.addTuple(dPoint3);
tsFileWriter.write(tsRecord_d2);
}
tsFileWriter.close();
}
}
\ No newline at end of file
......@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册