提交 1ba2493b 编写于 作者: 1 151250176

merge wide and narrow convertor together and update doc

上级 efabf333
...@@ -35,8 +35,8 @@ ...@@ -35,8 +35,8 @@
- Example 1: read from the local file system - Example 1: read from the local file system
- Example 2: read from the hadoop file system - Example 2: read from the hadoop file system
- Example 3: read from a specific directory - Example 3: read from a specific directory
- Example 4: query in old form - Example 4: query in wide form
- Example 5: query in new form - Example 5: query in narrow form
- Example 6: write - Example 6: write
- Appendix A: Old Design of Schema Inference - Appendix A: Old Design of Schema Inference
- the default way - the default way
...@@ -144,7 +144,7 @@ The corresponding SparkSQL table is as follows: ...@@ -144,7 +144,7 @@ The corresponding SparkSQL table is as follows:
| 5 | null | null | null | null | false | null | | 5 | null | null | null | null | false | null |
| 6 | null | null | ccc | null | null | null | | 6 | null | null | ccc | null | null | null |
You can also use new table form which as follows: (You can see part 6 about how to use new form) You can also use narrow table form which as follows: (You can see part 6 about how to use narrow form)
| time | device_name | status | hardware | temperature | | time | device_name | status | hardware | temperature |
|------|-------------------------------|--------------------------|----------------------------|-------------------------------| |------|-------------------------------|--------------------------|----------------------------|-------------------------------|
...@@ -167,22 +167,22 @@ NOTE: Remember to assign necessary read and write permissions in advance. ...@@ -167,22 +167,22 @@ NOTE: Remember to assign necessary read and write permissions in advance.
```scala ```scala
import org.apache.iotdb.tsfile._ import org.apache.iotdb.tsfile._
val old_df = spark.read.tsfile("test.tsfile") val wide_df = spark.read.tsfile("test.tsfile")
old_df.show wide_df.show
val new_df = spark.read.tsfile("test.tsfile", true) val narrow_df = spark.read.tsfile("test.tsfile", true)
new_df.show narrow_df.show
``` ```
### Example 2: read from the hadoop file system ### Example 2: read from the hadoop file system
```scala ```scala
import org.apache.iotdb.tsfile._ import org.apache.iotdb.tsfile._
val old_df = spark.read.tsfile("hdfs://localhost:9000/test.tsfile") val wide_df = spark.read.tsfile("hdfs://localhost:9000/test.tsfile")
old_df.show wide_df.show
val new_df = spark.read.tsfile("hdfs://localhost:9000/test.tsfile", true) val narrow_df = spark.read.tsfile("hdfs://localhost:9000/test.tsfile", true)
new_df.show narrow_df.show
``` ```
### Example 3: read from a specific directory ### Example 3: read from a specific directory
...@@ -197,7 +197,7 @@ Note 1: Global time ordering of all TsFiles in a directory is not supported now. ...@@ -197,7 +197,7 @@ Note 1: Global time ordering of all TsFiles in a directory is not supported now.
Note 2: Measurements of the same name should have the same schema. Note 2: Measurements of the same name should have the same schema.
### Example 4: query in old form ### Example 4: query in wide form
```scala ```scala
import org.apache.iotdb.tsfile._ import org.apache.iotdb.tsfile._
...@@ -215,7 +215,7 @@ val newDf = spark.sql("select count(*) from tsfile_table") ...@@ -215,7 +215,7 @@ val newDf = spark.sql("select count(*) from tsfile_table")
newDf.show newDf.show
``` ```
### Example 5: query in new form ### Example 5: query in narrow form
```scala ```scala
import org.apache.iotdb.tsfile._ import org.apache.iotdb.tsfile._
val df = spark.read.tsfile("hdfs://localhost:9000/test.tsfile", true) val df = spark.read.tsfile("hdfs://localhost:9000/test.tsfile", true)
...@@ -235,7 +235,7 @@ newDf.show ...@@ -235,7 +235,7 @@ newDf.show
### Example 6: write ### Example 6: write
```scala ```scala
// we only support old_form table to write // we only support wide_form table to write
import org.apache.iotdb.tsfile._ import org.apache.iotdb.tsfile._
val df = spark.read.tsfile("hdfs://localhost:9000/test.tsfile") val df = spark.read.tsfile("hdfs://localhost:9000/test.tsfile")
......
...@@ -43,7 +43,6 @@ import java.util.Map; ...@@ -43,7 +43,6 @@ import java.util.Map;
* e.g. * e.g.
* TSFile's SQL: select s1,s2 from root.car.d1 where s1 = 10 * TSFile's SQL: select s1,s2 from root.car.d1 where s1 = 10
* SparkSQL's SQL: select s1,s2 from XXX where delta_object = d1 * SparkSQL's SQL: select s1,s2 from XXX where delta_object = d1
*
*/ */
public class QueryProcessor { public class QueryProcessor {
...@@ -53,7 +52,7 @@ public class QueryProcessor { ...@@ -53,7 +52,7 @@ public class QueryProcessor {
List<TSQueryPlan> queryPlans = new ArrayList<>(); List<TSQueryPlan> queryPlans = new ArrayList<>();
if(filter != null) { if (filter != null) {
RemoveNotOptimizer removeNot = new RemoveNotOptimizer(); RemoveNotOptimizer removeNot = new RemoveNotOptimizer();
filter = removeNot.optimize(filter); filter = removeNot.optimize(filter);
...@@ -78,11 +77,10 @@ public class QueryProcessor { ...@@ -78,11 +77,10 @@ public class QueryProcessor {
} }
// merge query plan // merge query plan
Map<List<String>, List<TSQueryPlan>> pathMap = new HashMap<>(); Map<List<String>, List<TSQueryPlan>> pathMap = new HashMap<>();
for(TSQueryPlan tsQueryPlan : queryPlans){ for (TSQueryPlan tsQueryPlan : queryPlans) {
if(pathMap.containsKey(tsQueryPlan.getPaths())){ if (pathMap.containsKey(tsQueryPlan.getPaths())) {
pathMap.get(tsQueryPlan.getPaths()).add(tsQueryPlan); pathMap.get(tsQueryPlan.getPaths()).add(tsQueryPlan);
} } else {
else{
List<TSQueryPlan> plans = new ArrayList<>(); List<TSQueryPlan> plans = new ArrayList<>();
plans.add(tsQueryPlan); plans.add(tsQueryPlan);
pathMap.put(tsQueryPlan.getPaths(), plans); pathMap.put(tsQueryPlan.getPaths(), plans);
...@@ -91,13 +89,12 @@ public class QueryProcessor { ...@@ -91,13 +89,12 @@ public class QueryProcessor {
queryPlans.clear(); queryPlans.clear();
for(List<TSQueryPlan> plans : pathMap.values()){ for (List<TSQueryPlan> plans : pathMap.values()) {
TSQueryPlan mergePlan = null; TSQueryPlan mergePlan = null;
for(TSQueryPlan plan : plans){ for (TSQueryPlan plan : plans) {
if(mergePlan == null){ if (mergePlan == null) {
mergePlan = plan; mergePlan = plan;
} } else {
else{
FilterOperator timeFilterOperator = new FilterOperator(SQLConstant.KW_OR); FilterOperator timeFilterOperator = new FilterOperator(SQLConstant.KW_OR);
List<FilterOperator> timeFilterChildren = new ArrayList<>(); List<FilterOperator> timeFilterChildren = new ArrayList<>();
timeFilterChildren.add(mergePlan.getTimeFilterOperator()); timeFilterChildren.add(mergePlan.getTimeFilterOperator());
...@@ -115,8 +112,6 @@ public class QueryProcessor { ...@@ -115,8 +112,6 @@ public class QueryProcessor {
} }
queryPlans.add(mergePlan); queryPlans.add(mergePlan);
} }
//
return queryPlans; return queryPlans;
} }
...@@ -148,7 +143,7 @@ public class QueryProcessor { ...@@ -148,7 +143,7 @@ public class QueryProcessor {
singleFilterList = filterOperator.getChildren(); singleFilterList = filterOperator.getChildren();
} }
if(singleFilterList == null) { if (singleFilterList == null) {
return null; return null;
} }
...@@ -159,7 +154,7 @@ public class QueryProcessor { ...@@ -159,7 +154,7 @@ public class QueryProcessor {
} else { } else {
String singlePath = child.getSinglePath(); String singlePath = child.getSinglePath();
if (columnNames.contains(singlePath)) { if (columnNames.contains(singlePath)) {
if(!columnFilterOperators.contains(child)) if (!columnFilterOperators.contains(child))
columnFilterOperators.add(child); columnFilterOperators.add(child);
else else
throw new QueryOperatorException( throw new QueryOperatorException(
......
...@@ -59,15 +59,15 @@ private[tsfile] class DefaultSource extends FileFormat with DataSourceRegister { ...@@ -59,15 +59,15 @@ private[tsfile] class DefaultSource extends FileFormat with DataSourceRegister {
options.getOrElse(DefaultSource.path, throw new TSFileDataSourceException(s"${DefaultSource.path} must be specified for org.apache.iotdb.tsfile DataSource")) options.getOrElse(DefaultSource.path, throw new TSFileDataSourceException(s"${DefaultSource.path} must be specified for org.apache.iotdb.tsfile DataSource"))
if(options.getOrElse(DefaultSource.isNewForm, "").equals("new_form")){ if(options.getOrElse(DefaultSource.isNewForm, "").equals("new_form")){
val tsfileSchema = NewConverter.getUnionSeries(files, conf) val tsfileSchema = NarrowConverter.getUnionSeries(files, conf)
NewConverter.toSqlSchema(tsfileSchema) NarrowConverter.toSqlSchema(tsfileSchema)
} }
else{ else{
//get union series in TsFile //get union series in TsFile
val tsfileSchema = Converter.getUnionSeries(files, conf) val tsfileSchema = WideConverter.getUnionSeries(files, conf)
Converter.toSqlSchema(tsfileSchema) WideConverter.toSqlSchema(tsfileSchema)
} }
} }
...@@ -108,14 +108,14 @@ private[tsfile] class DefaultSource extends FileFormat with DataSourceRegister { ...@@ -108,14 +108,14 @@ private[tsfile] class DefaultSource extends FileFormat with DataSourceRegister {
val tsFileMetaData = reader.readFileMetadata val tsFileMetaData = reader.readFileMetadata
// get queriedSchema from requiredSchema // get queriedSchema from requiredSchema
var queriedSchema = Converter.prepSchema(requiredSchema, tsFileMetaData) var queriedSchema = WideConverter.prepSchema(requiredSchema, tsFileMetaData)
val readTsFile: ReadOnlyTsFile = new ReadOnlyTsFile(reader) val readTsFile: ReadOnlyTsFile = new ReadOnlyTsFile(reader)
if (options.getOrElse(DefaultSource.isNewForm, "").equals("new_form")) { if (options.getOrElse(DefaultSource.isNewForm, "").equals("new_form")) {
val device_names = tsFileMetaData.getDeviceMap.keySet() val device_names = tsFileMetaData.getDeviceMap.keySet()
val measurement_names = tsFileMetaData.getMeasurementSchema.keySet() val measurement_names = tsFileMetaData.getMeasurementSchema.keySet()
// construct queryExpression based on queriedSchema and filters // construct queryExpression based on queriedSchema and filters
val queryExpressions = NewConverter.toQueryExpression(dataSchema, device_names, measurement_names, filters, reader, file.start.asInstanceOf[java.lang.Long], (file.start + file.length).asInstanceOf[java.lang.Long]) val queryExpressions = NarrowConverter.toQueryExpression(dataSchema, device_names, measurement_names, filters, reader, file.start.asInstanceOf[java.lang.Long], (file.start + file.length).asInstanceOf[java.lang.Long])
val queryDataSets = Executor.query(readTsFile, queryExpressions, file.start.asInstanceOf[java.lang.Long], (file.start + file.length).asInstanceOf[java.lang.Long]) val queryDataSets = Executor.query(readTsFile, queryExpressions, file.start.asInstanceOf[java.lang.Long], (file.start + file.length).asInstanceOf[java.lang.Long])
var queryDataSet : QueryDataSet = null var queryDataSet : QueryDataSet = null
...@@ -166,16 +166,16 @@ private[tsfile] class DefaultSource extends FileFormat with DataSourceRegister { ...@@ -166,16 +166,16 @@ private[tsfile] class DefaultSource extends FileFormat with DataSourceRegister {
if (field.name == QueryConstant.RESERVED_TIME) { if (field.name == QueryConstant.RESERVED_TIME) {
rowBuffer(index) = curRecord.getTimestamp rowBuffer(index) = curRecord.getTimestamp
} }
else if(field.name == NewConverter.DEVICE_NAME){ else if(field.name == NarrowConverter.DEVICE_NAME){
rowBuffer(index) = device_name rowBuffer(index) = device_name
} }
else { else {
val pos = paths.indexOf(new org.apache.iotdb.tsfile.read.common.Path(device_name + "." + field.name)) val pos = paths.indexOf(new org.apache.iotdb.tsfile.read.common.Path(device_name, field.name))
var curField: Field = null var curField: Field = null
if (pos != -1) { if (pos != -1) {
curField = fields.get(pos) curField = fields.get(pos)
} }
rowBuffer(index) = NewConverter.toSqlValue(curField) rowBuffer(index) = NarrowConverter.toSqlValue(curField)
} }
index += 1 index += 1
}) })
...@@ -186,7 +186,7 @@ private[tsfile] class DefaultSource extends FileFormat with DataSourceRegister { ...@@ -186,7 +186,7 @@ private[tsfile] class DefaultSource extends FileFormat with DataSourceRegister {
} }
else { else {
// construct queryExpression based on queriedSchema and filters // construct queryExpression based on queriedSchema and filters
val queryExpression = Converter.toQueryExpression(queriedSchema, filters) val queryExpression = WideConverter.toQueryExpression(queriedSchema, filters)
val queryDataSet = readTsFile.query(queryExpression, file.start.asInstanceOf[java.lang.Long], val queryDataSet = readTsFile.query(queryExpression, file.start.asInstanceOf[java.lang.Long],
...@@ -222,7 +222,7 @@ private[tsfile] class DefaultSource extends FileFormat with DataSourceRegister { ...@@ -222,7 +222,7 @@ private[tsfile] class DefaultSource extends FileFormat with DataSourceRegister {
if (pos != -1) { if (pos != -1) {
curField = fields.get(pos) curField = fields.get(pos)
} }
rowBuffer(index) = Converter.toSqlValue(curField) rowBuffer(index) = WideConverter.toSqlValue(curField)
} }
index += 1 index += 1
}) })
......
...@@ -33,13 +33,13 @@ private[tsfile] class TsFileOutputWriter( ...@@ -33,13 +33,13 @@ private[tsfile] class TsFileOutputWriter(
context: TaskAttemptContext) extends OutputWriter { context: TaskAttemptContext) extends OutputWriter {
private val recordWriter: RecordWriter[NullWritable, TSRecord] = { private val recordWriter: RecordWriter[NullWritable, TSRecord] = {
val fileSchema = Converter.toTsFileSchema(dataSchema, options) val fileSchema = WideConverter.toTsFileSchema(dataSchema, options)
new TsFileOutputFormat(fileSchema).getRecordWriter(context) new TsFileOutputFormat(fileSchema).getRecordWriter(context)
} }
override def write(row: InternalRow): Unit = { override def write(row: InternalRow): Unit = {
if (row != null) { if (row != null) {
val tsRecord = Converter.toTsRecord(row, dataSchema) val tsRecord = WideConverter.toTsRecord(row, dataSchema)
tsRecord.foreach(r => { tsRecord.foreach(r => {
recordWriter.write(NullWritable.get(), r) recordWriter.write(NullWritable.get(), r)
}) })
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.tsfile
import java.util
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileStatus
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor
import org.apache.iotdb.tsfile.common.constant.QueryConstant
import org.apache.iotdb.tsfile.file.metadata.TsFileMetaData
import org.apache.iotdb.tsfile.file.metadata.enums.{TSDataType, TSEncoding}
import org.apache.iotdb.tsfile.io.HDFSInput
import org.apache.iotdb.tsfile.read.TsFileSequenceReader
import org.apache.iotdb.tsfile.read.common.{Field, Path}
import org.apache.iotdb.tsfile.read.expression.impl.{BinaryExpression, GlobalTimeExpression, SingleSeriesExpression}
import org.apache.iotdb.tsfile.read.expression.{IExpression, QueryExpression}
import org.apache.iotdb.tsfile.read.filter.{TimeFilter, ValueFilter}
import org.apache.iotdb.tsfile.utils.Binary
import org.apache.iotdb.tsfile.write.record.TSRecord
import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint
import org.apache.iotdb.tsfile.write.schema.{Schema, MeasurementSchema, SchemaBuilder}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
/**
* This object contains methods that are used to convert schema and data between SparkSQL and TSFile.
*
*/
object WideConverter extends Converter{
/**
* Get series from the given tsFileMetaData.
*
* @param tsFileMetaData TsFileMetaData
* @return union series
*/
def getSeries(tsFileMetaData: TsFileMetaData): util.ArrayList[Series] = {
val series = new util.ArrayList[Series]()
val devices = tsFileMetaData.getDeviceMap.keySet()
val measurements = tsFileMetaData.getMeasurementSchema
devices.foreach(d => {
measurements.foreach(m => {
val fullPath = d + "." + m._1
series.add(new Series(fullPath, m._2.getType)
)
})
})
series
}
/**
* Get union series in all tsfiles.
* e.g. (tsfile1:s1,s2) & (tsfile2:s2,s3) = s1,s2,s3
*
* @param files tsfiles
* @param conf hadoop configuration
* @return union series
*/
def getUnionSeries(files: Seq[FileStatus], conf: Configuration): util.ArrayList[Series] = {
val unionSeries = new util.ArrayList[Series]()
var seriesSet: mutable.Set[String] = mutable.Set()
files.foreach(f => {
val in = new HDFSInput(f.getPath, conf)
val reader = new TsFileSequenceReader(in)
val tsFileMetaData = reader.readFileMetadata
val devices = tsFileMetaData.getDeviceMap.keySet()
val measurements = tsFileMetaData.getMeasurementSchema
devices.foreach(d => {
measurements.foreach(m => {
val fullPath = d + "." + m._1
if (!seriesSet.contains(fullPath)) {
seriesSet += fullPath
unionSeries.add(new Series(fullPath, m._2.getType)
)
}
})
})
in.close()
})
unionSeries
}
/**
* Prepare queriedSchema from requiredSchema.
*
* @param requiredSchema requiredSchema
* @param tsFileMetaData tsFileMetaData
* @return
*/
def prepSchema(requiredSchema: StructType, tsFileMetaData: TsFileMetaData): StructType = {
var queriedSchema: StructType = new StructType()
if (requiredSchema.isEmpty
|| (requiredSchema.size == 1 && requiredSchema.iterator.next().name == QueryConstant.RESERVED_TIME)) {
// for example, (i) select count(*) from table; (ii) select time from table
val fileSchema = WideConverter.getSeries(tsFileMetaData)
queriedSchema = StructType(toSqlField(fileSchema, false).toList)
} else { // Remove nonexistent schema according to the current file's metadata.
// This may happen when queried TsFiles in the same folder do not have the same schema.
val devices = tsFileMetaData.getDeviceMap.keySet()
val measurementIds = tsFileMetaData.getMeasurementSchema.keySet()
requiredSchema.foreach(f => {
if (!QueryConstant.RESERVED_TIME.equals(f.name)) {
val path = new org.apache.iotdb.tsfile.read.common.Path(f.name)
if (devices.contains(path.getDevice) && measurementIds.contains(path.getMeasurement)) {
queriedSchema = queriedSchema.add(f)
}
}
})
}
queriedSchema
}
/**
* Construct queryExpression based on queriedSchema and filters.
*
* @param schema selected columns
* @param filters filters
* @return query expression
*/
def toQueryExpression(schema: StructType, filters: Seq[Filter]): QueryExpression = {
//get paths from schema
val paths = new util.ArrayList[org.apache.iotdb.tsfile.read.common.Path]
schema.foreach(f => {
if (!QueryConstant.RESERVED_TIME.equals(f.name)) { // the time field is excluded
paths.add(new org.apache.iotdb.tsfile.read.common.Path(f.name))
}
})
//remove invalid filters
val validFilters = new ListBuffer[Filter]()
filters.foreach { f => {
if (isValidFilter(f))
validFilters.add(f)
}
}
if (validFilters.isEmpty) {
val queryExpression = QueryExpression.create(paths, null)
queryExpression
} else {
//construct filters to a binary tree
var filterTree = validFilters.get(0)
for (i <- 1 until validFilters.length) {
filterTree = And(filterTree, validFilters.get(i))
}
//convert filterTree to FilterOperator
val finalFilter = transformFilter(schema, filterTree)
// create query expression
val queryExpression = QueryExpression.create(paths, finalFilter)
queryExpression
}
}
/**
* Construct fields with the TSFile data type converted to the SparkSQL data type.
*
* @param tsfileSchema tsfileSchema
* @param addTimeField true to add a time field; false to not
* @return the converted list of fields
*/
def toSqlField(tsfileSchema: util.ArrayList[Series], addTimeField: Boolean): ListBuffer[StructField] = {
val fields = new ListBuffer[StructField]()
if (addTimeField) {
fields += StructField(QueryConstant.RESERVED_TIME, LongType, nullable = false)
}
tsfileSchema.foreach((series: Series) => {
fields += StructField(series.getName, series.getType match {
case TSDataType.BOOLEAN => BooleanType
case TSDataType.INT32 => IntegerType
case TSDataType.INT64 => LongType
case TSDataType.FLOAT => FloatType
case TSDataType.DOUBLE => DoubleType
case TSDataType.TEXT => StringType
case other => throw new UnsupportedOperationException(s"Unsupported type $other")
}, nullable = true)
})
fields
}
private def isValidFilter(filter: Filter): Boolean = {
filter match {
case f: EqualTo => true
case f: GreaterThan => true
case f: GreaterThanOrEqual => true
case f: LessThan => true
case f: LessThanOrEqual => true
case f: Or => isValidFilter(f.left) && isValidFilter(f.right)
case f: And => isValidFilter(f.left) && isValidFilter(f.right)
case f: Not => isValidFilter(f.child)
case _ => false
}
}
/**
* Transform SparkSQL's filter binary tree to TsFile's filter expression.
*
* @param schema to get relative columns' dataType information
* @param node filter tree's node
* @return TSFile filter expression
*/
private def transformFilter(schema: StructType, node: Filter): IExpression = {
var filter: IExpression = null
node match {
case node: Not =>
throw new Exception("NOT filter is not supported now")
case node: And =>
filter = BinaryExpression.and(transformFilter(schema, node.left), transformFilter(schema, node.right))
filter
case node: Or =>
filter = BinaryExpression.or(transformFilter(schema, node.left), transformFilter(schema, node.right))
filter
case node: EqualTo =>
if (QueryConstant.RESERVED_TIME.equals(node.attribute.toLowerCase())) {
filter = new GlobalTimeExpression(TimeFilter.eq(node.value.asInstanceOf[java.lang.Long]))
} else {
filter = constructFilter(schema, node.attribute, node.value, FilterTypes.Eq)
}
filter
case node: LessThan =>
if (QueryConstant.RESERVED_TIME.equals(node.attribute.toLowerCase())) {
filter = new GlobalTimeExpression(TimeFilter.lt(node.value.asInstanceOf[java.lang.Long]))
} else {
filter = constructFilter(schema, node.attribute, node.value, FilterTypes.Lt)
}
filter
case node: LessThanOrEqual =>
if (QueryConstant.RESERVED_TIME.equals(node.attribute.toLowerCase())) {
filter = new GlobalTimeExpression(TimeFilter.ltEq(node.value.asInstanceOf[java.lang.Long]))
} else {
filter = constructFilter(schema, node.attribute, node.value, FilterTypes.LtEq)
}
filter
case node: GreaterThan =>
if (QueryConstant.RESERVED_TIME.equals(node.attribute.toLowerCase())) {
filter = new GlobalTimeExpression(TimeFilter.gt(node.value.asInstanceOf[java.lang.Long]))
} else {
filter = constructFilter(schema, node.attribute, node.value, FilterTypes.Gt)
}
filter
case node: GreaterThanOrEqual =>
if (QueryConstant.RESERVED_TIME.equals(node.attribute.toLowerCase())) {
filter = new GlobalTimeExpression(TimeFilter.gtEq(node.value.asInstanceOf[java.lang.Long]))
} else {
filter = constructFilter(schema, node.attribute, node.value, FilterTypes.GtEq)
}
filter
case other =>
throw new Exception(s"Unsupported filter $other")
}
}
def constructFilter(schema: StructType, nodeName: String, nodeValue: Any, filterType: FilterTypes.Value): IExpression = {
val fieldNames = schema.fieldNames
val index = fieldNames.indexOf(nodeName)
if (index == -1) {
// placeholder for an invalid filter in the current TsFile
val filter = new SingleSeriesExpression(new Path(nodeName), null)
filter
} else {
val dataType = schema.get(index).dataType
filterType match {
case FilterTypes.Eq =>
dataType match {
case BooleanType =>
val filter = new SingleSeriesExpression(new Path(nodeName),
ValueFilter.eq(nodeValue.asInstanceOf[java.lang.Boolean]))
filter
case IntegerType =>
val filter = new SingleSeriesExpression(new Path(nodeName),
ValueFilter.eq(nodeValue.asInstanceOf[java.lang.Integer]))
filter
case LongType =>
val filter = new SingleSeriesExpression(new Path(nodeName),
ValueFilter.eq(nodeValue.asInstanceOf[java.lang.Long]))
filter
case FloatType =>
val filter = new SingleSeriesExpression(new Path(nodeName),
ValueFilter.eq(nodeValue.asInstanceOf[java.lang.Float]))
filter
case DoubleType =>
val filter = new SingleSeriesExpression(new Path(nodeName),
ValueFilter.eq(nodeValue.asInstanceOf[java.lang.Double]))
filter
case StringType =>
val filter = new SingleSeriesExpression(new Path(nodeName),
ValueFilter.eq(new Binary(nodeValue.toString)))
filter
case other => throw new UnsupportedOperationException(s"Unsupported type $other")
}
case FilterTypes.Gt =>
dataType match {
case IntegerType =>
val filter = new SingleSeriesExpression(new Path(nodeName),
ValueFilter.gt(nodeValue.asInstanceOf[java.lang.Integer]))
filter
case LongType =>
val filter = new SingleSeriesExpression(new Path(nodeName),
ValueFilter.gt(nodeValue.asInstanceOf[java.lang.Long]))
filter
case FloatType =>
val filter = new SingleSeriesExpression(new Path(nodeName),
ValueFilter.gt(nodeValue.asInstanceOf[java.lang.Float]))
filter
case DoubleType =>
val filter = new SingleSeriesExpression(new Path(nodeName),
ValueFilter.gt(nodeValue.asInstanceOf[java.lang.Double]))
filter
case other => throw new UnsupportedOperationException(s"Unsupported type $other")
}
case FilterTypes.GtEq =>
dataType match {
case IntegerType =>
val filter = new SingleSeriesExpression(new Path(nodeName),
ValueFilter.gtEq(nodeValue.asInstanceOf[java.lang.Integer]))
filter
case LongType =>
val filter = new SingleSeriesExpression(new Path(nodeName),
ValueFilter.gtEq(nodeValue.asInstanceOf[java.lang.Long]))
filter
case FloatType =>
val filter = new SingleSeriesExpression(new Path(nodeName),
ValueFilter.gtEq(nodeValue.asInstanceOf[java.lang.Float]))
filter
case DoubleType =>
val filter = new SingleSeriesExpression(new Path(nodeName),
ValueFilter.gtEq(nodeValue.asInstanceOf[java.lang.Double]))
filter
case other => throw new UnsupportedOperationException(s"Unsupported type $other")
}
case FilterTypes.Lt =>
dataType match {
case IntegerType =>
val filter = new SingleSeriesExpression(new Path(nodeName),
ValueFilter.lt(nodeValue.asInstanceOf[java.lang.Integer]))
filter
case LongType =>
val filter = new SingleSeriesExpression(new Path(nodeName),
ValueFilter.lt(nodeValue.asInstanceOf[java.lang.Long]))
filter
case FloatType =>
val filter = new SingleSeriesExpression(new Path(nodeName),
ValueFilter.lt(nodeValue.asInstanceOf[java.lang.Float]))
filter
case DoubleType =>
val filter = new SingleSeriesExpression(new Path(nodeName),
ValueFilter.lt(nodeValue.asInstanceOf[java.lang.Double]))
filter
case other => throw new UnsupportedOperationException(s"Unsupported type $other")
}
case FilterTypes.LtEq =>
dataType match {
case IntegerType =>
val filter = new SingleSeriesExpression(new Path(nodeName),
ValueFilter.ltEq(nodeValue.asInstanceOf[java.lang.Integer]))
filter
case LongType =>
val filter = new SingleSeriesExpression(new Path(nodeName),
ValueFilter.ltEq(nodeValue.asInstanceOf[java.lang.Long]))
filter
case FloatType =>
val filter = new SingleSeriesExpression(new Path(nodeName),
ValueFilter.ltEq(nodeValue.asInstanceOf[java.lang.Float]))
filter
case DoubleType =>
val filter = new SingleSeriesExpression(new Path(nodeName),
ValueFilter.ltEq(nodeValue.asInstanceOf[java.lang.Double]))
filter
case other => throw new UnsupportedOperationException(s"Unsupported type $other")
}
}
}
}
}
...@@ -87,7 +87,7 @@ class ConverterTest extends FunSuite with BeforeAndAfterAll { ...@@ -87,7 +87,7 @@ class ConverterTest extends FunSuite with BeforeAndAfterAll {
val reader: TsFileSequenceReader = new TsFileSequenceReader(in) val reader: TsFileSequenceReader = new TsFileSequenceReader(in)
val tsFileMetaData = reader.readFileMetadata val tsFileMetaData = reader.readFileMetadata
val series = Converter.getSeries(tsFileMetaData) val series = WideConverter.getSeries(tsFileMetaData)
Assert.assertEquals(6, series.size()) Assert.assertEquals(6, series.size())
Assert.assertEquals("[device_1.sensor_3,INT32]", series.get(0).toString) Assert.assertEquals("[device_1.sensor_3,INT32]", series.get(0).toString)
...@@ -111,7 +111,7 @@ class ConverterTest extends FunSuite with BeforeAndAfterAll { ...@@ -111,7 +111,7 @@ class ConverterTest extends FunSuite with BeforeAndAfterAll {
val statusSeq: Seq[FileStatus] = Seq(status1, status2) val statusSeq: Seq[FileStatus] = Seq(status1, status2)
val tsfileSchema = Converter.getUnionSeries(statusSeq, conf) val tsfileSchema = WideConverter.getUnionSeries(statusSeq, conf)
Assert.assertEquals(tsfileSchema.size(), 6) Assert.assertEquals(tsfileSchema.size(), 6)
Assert.assertEquals("[device_1.sensor_3,INT32]", tsfileSchema.get(0).toString) Assert.assertEquals("[device_1.sensor_3,INT32]", tsfileSchema.get(0).toString)
...@@ -136,12 +136,12 @@ class ConverterTest extends FunSuite with BeforeAndAfterAll { ...@@ -136,12 +136,12 @@ class ConverterTest extends FunSuite with BeforeAndAfterAll {
val stringField = new Field(TSDataType.TEXT) val stringField = new Field(TSDataType.TEXT)
stringField.setBinaryV(new Binary("pass")) stringField.setBinaryV(new Binary("pass"))
Assert.assertEquals(Converter.toSqlValue(boolField), true) Assert.assertEquals(WideConverter.toSqlValue(boolField), true)
Assert.assertEquals(Converter.toSqlValue(intField), 32) Assert.assertEquals(WideConverter.toSqlValue(intField), 32)
Assert.assertEquals(Converter.toSqlValue(longField), 64l) Assert.assertEquals(WideConverter.toSqlValue(longField), 64l)
Assert.assertEquals(Converter.toSqlValue(floatField), 3.14f) Assert.assertEquals(WideConverter.toSqlValue(floatField), 3.14f)
Assert.assertEquals(Converter.toSqlValue(doubleField), 0.618d) Assert.assertEquals(WideConverter.toSqlValue(doubleField), 0.618d)
Assert.assertEquals(Converter.toSqlValue(stringField), "pass") Assert.assertEquals(WideConverter.toSqlValue(stringField), "pass")
} }
test("testToSparkSqlSchema") { test("testToSparkSqlSchema") {
...@@ -153,7 +153,7 @@ class ConverterTest extends FunSuite with BeforeAndAfterAll { ...@@ -153,7 +153,7 @@ class ConverterTest extends FunSuite with BeforeAndAfterAll {
fields.add(new Series("device_2.sensor_1", TSDataType.FLOAT)) fields.add(new Series("device_2.sensor_1", TSDataType.FLOAT))
fields.add(new Series("device_2.sensor_2", TSDataType.INT32)) fields.add(new Series("device_2.sensor_2", TSDataType.INT32))
val sqlSchema = Converter.toSqlSchema(fields) val sqlSchema = WideConverter.toSqlSchema(fields)
val expectedFields: util.ArrayList[StructField] = new util.ArrayList[StructField]() val expectedFields: util.ArrayList[StructField] = new util.ArrayList[StructField]()
expectedFields.add(StructField(QueryConstant.RESERVED_TIME, LongType, false)) expectedFields.add(StructField(QueryConstant.RESERVED_TIME, LongType, false))
...@@ -179,7 +179,7 @@ class ConverterTest extends FunSuite with BeforeAndAfterAll { ...@@ -179,7 +179,7 @@ class ConverterTest extends FunSuite with BeforeAndAfterAll {
requiredFields.add(StructField("device_1.sensor_2", IntegerType, true)) requiredFields.add(StructField("device_1.sensor_2", IntegerType, true))
val requiredSchema = StructType(requiredFields) val requiredSchema = StructType(requiredFields)
val filteredSchema = Converter.prepSchema(requiredSchema, tsFileMetaData) val filteredSchema = WideConverter.prepSchema(requiredSchema, tsFileMetaData)
Assert.assertEquals(3, filteredSchema.size) Assert.assertEquals(3, filteredSchema.size)
val fields = filteredSchema.fields val fields = filteredSchema.fields
...@@ -199,7 +199,7 @@ class ConverterTest extends FunSuite with BeforeAndAfterAll { ...@@ -199,7 +199,7 @@ class ConverterTest extends FunSuite with BeforeAndAfterAll {
requiredFields.add(StructField(QueryConstant.RESERVED_TIME, LongType, false)) requiredFields.add(StructField(QueryConstant.RESERVED_TIME, LongType, false))
val requiredSchema = StructType(requiredFields) val requiredSchema = StructType(requiredFields)
val filteredSchema = Converter.prepSchema(requiredSchema, tsFileMetaData) val filteredSchema = WideConverter.prepSchema(requiredSchema, tsFileMetaData)
Assert.assertEquals(6, filteredSchema.size) Assert.assertEquals(6, filteredSchema.size)
val fields = filteredSchema.fields val fields = filteredSchema.fields
...@@ -225,7 +225,7 @@ class ConverterTest extends FunSuite with BeforeAndAfterAll { ...@@ -225,7 +225,7 @@ class ConverterTest extends FunSuite with BeforeAndAfterAll {
val schema = StructType(fields) val schema = StructType(fields)
val row: InternalRow = new GenericInternalRow(Array(1L, null, 1.2f, 20, 19, 2.3f, 11)) val row: InternalRow = new GenericInternalRow(Array(1L, null, 1.2f, 20, 19, 2.3f, 11))
val records = Converter.toTsRecord(row, schema) val records = WideConverter.toTsRecord(row, schema)
Assert.assertEquals(2, records.size) Assert.assertEquals(2, records.size)
Assert.assertEquals(1, records(0).time) Assert.assertEquals(1, records(0).time)
...@@ -256,7 +256,7 @@ class ConverterTest extends FunSuite with BeforeAndAfterAll { ...@@ -256,7 +256,7 @@ class ConverterTest extends FunSuite with BeforeAndAfterAll {
val ft4 = LessThan("time", 4L) val ft4 = LessThan("time", 4L)
val filters: Seq[Filter] = Seq(ft1, ft2_3, ft4) val filters: Seq[Filter] = Seq(ft1, ft2_3, ft4)
val expression = Converter.toQueryExpression(schema, filters) val expression = WideConverter.toQueryExpression(schema, filters)
Assert.assertEquals(true, expression.hasQueryFilter) Assert.assertEquals(true, expression.hasQueryFilter)
Assert.assertEquals(2, expression.getSelectedSeries.size()) Assert.assertEquals(2, expression.getSelectedSeries.size())
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册