From 1ba2493b4c5c01dd8ee251ad78c4126ec213fa6e Mon Sep 17 00:00:00 2001 From: 151250176 <151250176@smail.nju.edu.cn> Date: Tue, 3 Sep 2019 17:36:45 +0800 Subject: [PATCH] merge wide and narrow convertor together and update doc --- docs/Documentation/UserGuide/9-Tools-spark.md | 28 +- .../iotdb/tsfile/qp/QueryProcessor.java | 25 +- .../apache/iotdb/tsfile/DefaultSource.scala | 22 +- .../apache/iotdb/tsfile/NarrowConverter.scala | 516 ++++++++++++++++++ .../iotdb/tsfile/TsFileOutputWriter.scala | 4 +- .../apache/iotdb/tsfile/WideConverter.scala | 423 ++++++++++++++ .../apache/iotdb/tsfile/ConverterTest.scala | 26 +- 7 files changed, 989 insertions(+), 55 deletions(-) create mode 100644 spark-tsfile/src/main/scala/org/apache/iotdb/tsfile/NarrowConverter.scala create mode 100755 spark-tsfile/src/main/scala/org/apache/iotdb/tsfile/WideConverter.scala diff --git a/docs/Documentation/UserGuide/9-Tools-spark.md b/docs/Documentation/UserGuide/9-Tools-spark.md index 4b191838b9..26388840c7 100644 --- a/docs/Documentation/UserGuide/9-Tools-spark.md +++ b/docs/Documentation/UserGuide/9-Tools-spark.md @@ -35,8 +35,8 @@ - Example 1: read from the local file system - Example 2: read from the hadoop file system - Example 3: read from a specific directory - - Example 4: query in old form - - Example 5: query in new form + - Example 4: query in wide form + - Example 5: query in narrow form - Example 6: write - Appendix A: Old Design of Schema Inference - the default way @@ -144,7 +144,7 @@ The corresponding SparkSQL table is as follows: | 5 | null | null | null | null | false | null | | 6 | null | null | ccc | null | null | null | -You can also use new table form which as follows: (You can see part 6 about how to use new form) +You can also use narrow table form which as follows: (You can see part 6 about how to use narrow form) | time | device_name | status | hardware | temperature | |------|-------------------------------|--------------------------|----------------------------|-------------------------------| @@ -167,22 +167,22 @@ NOTE: Remember to assign necessary read and write permissions in advance. ```scala import org.apache.iotdb.tsfile._ -val old_df = spark.read.tsfile("test.tsfile") -old_df.show +val wide_df = spark.read.tsfile("test.tsfile") +wide_df.show -val new_df = spark.read.tsfile("test.tsfile", true) -new_df.show +val narrow_df = spark.read.tsfile("test.tsfile", true) +narrow_df.show ``` ### Example 2: read from the hadoop file system ```scala import org.apache.iotdb.tsfile._ -val old_df = spark.read.tsfile("hdfs://localhost:9000/test.tsfile") -old_df.show +val wide_df = spark.read.tsfile("hdfs://localhost:9000/test.tsfile") +wide_df.show -val new_df = spark.read.tsfile("hdfs://localhost:9000/test.tsfile", true) -new_df.show +val narrow_df = spark.read.tsfile("hdfs://localhost:9000/test.tsfile", true) +narrow_df.show ``` ### Example 3: read from a specific directory @@ -197,7 +197,7 @@ Note 1: Global time ordering of all TsFiles in a directory is not supported now. Note 2: Measurements of the same name should have the same schema. -### Example 4: query in old form +### Example 4: query in wide form ```scala import org.apache.iotdb.tsfile._ @@ -215,7 +215,7 @@ val newDf = spark.sql("select count(*) from tsfile_table") newDf.show ``` -### Example 5: query in new form +### Example 5: query in narrow form ```scala import org.apache.iotdb.tsfile._ val df = spark.read.tsfile("hdfs://localhost:9000/test.tsfile", true) @@ -235,7 +235,7 @@ newDf.show ### Example 6: write ```scala -// we only support old_form table to write +// we only support wide_form table to write import org.apache.iotdb.tsfile._ val df = spark.read.tsfile("hdfs://localhost:9000/test.tsfile") diff --git a/spark-tsfile/src/main/java/org/apache/iotdb/tsfile/qp/QueryProcessor.java b/spark-tsfile/src/main/java/org/apache/iotdb/tsfile/qp/QueryProcessor.java index 27e2302ee3..34dd38fe78 100755 --- a/spark-tsfile/src/main/java/org/apache/iotdb/tsfile/qp/QueryProcessor.java +++ b/spark-tsfile/src/main/java/org/apache/iotdb/tsfile/qp/QueryProcessor.java @@ -43,7 +43,6 @@ import java.util.Map; * e.g. * TSFile's SQL: select s1,s2 from root.car.d1 where s1 = 10 * SparkSQL's SQL: select s1,s2 from XXX where delta_object = d1 - * */ public class QueryProcessor { @@ -53,7 +52,7 @@ public class QueryProcessor { List queryPlans = new ArrayList<>(); - if(filter != null) { + if (filter != null) { RemoveNotOptimizer removeNot = new RemoveNotOptimizer(); filter = removeNot.optimize(filter); @@ -78,11 +77,10 @@ public class QueryProcessor { } // merge query plan Map, List> pathMap = new HashMap<>(); - for(TSQueryPlan tsQueryPlan : queryPlans){ - if(pathMap.containsKey(tsQueryPlan.getPaths())){ + for (TSQueryPlan tsQueryPlan : queryPlans) { + if (pathMap.containsKey(tsQueryPlan.getPaths())) { pathMap.get(tsQueryPlan.getPaths()).add(tsQueryPlan); - } - else{ + } else { List plans = new ArrayList<>(); plans.add(tsQueryPlan); pathMap.put(tsQueryPlan.getPaths(), plans); @@ -91,13 +89,12 @@ public class QueryProcessor { queryPlans.clear(); - for(List plans : pathMap.values()){ + for (List plans : pathMap.values()) { TSQueryPlan mergePlan = null; - for(TSQueryPlan plan : plans){ - if(mergePlan == null){ + for (TSQueryPlan plan : plans) { + if (mergePlan == null) { mergePlan = plan; - } - else{ + } else { FilterOperator timeFilterOperator = new FilterOperator(SQLConstant.KW_OR); List timeFilterChildren = new ArrayList<>(); timeFilterChildren.add(mergePlan.getTimeFilterOperator()); @@ -115,8 +112,6 @@ public class QueryProcessor { } queryPlans.add(mergePlan); } - // - return queryPlans; } @@ -148,7 +143,7 @@ public class QueryProcessor { singleFilterList = filterOperator.getChildren(); } - if(singleFilterList == null) { + if (singleFilterList == null) { return null; } @@ -159,7 +154,7 @@ public class QueryProcessor { } else { String singlePath = child.getSinglePath(); if (columnNames.contains(singlePath)) { - if(!columnFilterOperators.contains(child)) + if (!columnFilterOperators.contains(child)) columnFilterOperators.add(child); else throw new QueryOperatorException( diff --git a/spark-tsfile/src/main/scala/org/apache/iotdb/tsfile/DefaultSource.scala b/spark-tsfile/src/main/scala/org/apache/iotdb/tsfile/DefaultSource.scala index 06ebfb0328..e18e80eaf3 100755 --- a/spark-tsfile/src/main/scala/org/apache/iotdb/tsfile/DefaultSource.scala +++ b/spark-tsfile/src/main/scala/org/apache/iotdb/tsfile/DefaultSource.scala @@ -59,15 +59,15 @@ private[tsfile] class DefaultSource extends FileFormat with DataSourceRegister { options.getOrElse(DefaultSource.path, throw new TSFileDataSourceException(s"${DefaultSource.path} must be specified for org.apache.iotdb.tsfile DataSource")) if(options.getOrElse(DefaultSource.isNewForm, "").equals("new_form")){ - val tsfileSchema = NewConverter.getUnionSeries(files, conf) + val tsfileSchema = NarrowConverter.getUnionSeries(files, conf) - NewConverter.toSqlSchema(tsfileSchema) + NarrowConverter.toSqlSchema(tsfileSchema) } else{ //get union series in TsFile - val tsfileSchema = Converter.getUnionSeries(files, conf) + val tsfileSchema = WideConverter.getUnionSeries(files, conf) - Converter.toSqlSchema(tsfileSchema) + WideConverter.toSqlSchema(tsfileSchema) } } @@ -108,14 +108,14 @@ private[tsfile] class DefaultSource extends FileFormat with DataSourceRegister { val tsFileMetaData = reader.readFileMetadata // get queriedSchema from requiredSchema - var queriedSchema = Converter.prepSchema(requiredSchema, tsFileMetaData) + var queriedSchema = WideConverter.prepSchema(requiredSchema, tsFileMetaData) val readTsFile: ReadOnlyTsFile = new ReadOnlyTsFile(reader) if (options.getOrElse(DefaultSource.isNewForm, "").equals("new_form")) { val device_names = tsFileMetaData.getDeviceMap.keySet() val measurement_names = tsFileMetaData.getMeasurementSchema.keySet() // construct queryExpression based on queriedSchema and filters - val queryExpressions = NewConverter.toQueryExpression(dataSchema, device_names, measurement_names, filters, reader, file.start.asInstanceOf[java.lang.Long], (file.start + file.length).asInstanceOf[java.lang.Long]) + val queryExpressions = NarrowConverter.toQueryExpression(dataSchema, device_names, measurement_names, filters, reader, file.start.asInstanceOf[java.lang.Long], (file.start + file.length).asInstanceOf[java.lang.Long]) val queryDataSets = Executor.query(readTsFile, queryExpressions, file.start.asInstanceOf[java.lang.Long], (file.start + file.length).asInstanceOf[java.lang.Long]) var queryDataSet : QueryDataSet = null @@ -166,16 +166,16 @@ private[tsfile] class DefaultSource extends FileFormat with DataSourceRegister { if (field.name == QueryConstant.RESERVED_TIME) { rowBuffer(index) = curRecord.getTimestamp } - else if(field.name == NewConverter.DEVICE_NAME){ + else if(field.name == NarrowConverter.DEVICE_NAME){ rowBuffer(index) = device_name } else { - val pos = paths.indexOf(new org.apache.iotdb.tsfile.read.common.Path(device_name + "." + field.name)) + val pos = paths.indexOf(new org.apache.iotdb.tsfile.read.common.Path(device_name, field.name)) var curField: Field = null if (pos != -1) { curField = fields.get(pos) } - rowBuffer(index) = NewConverter.toSqlValue(curField) + rowBuffer(index) = NarrowConverter.toSqlValue(curField) } index += 1 }) @@ -186,7 +186,7 @@ private[tsfile] class DefaultSource extends FileFormat with DataSourceRegister { } else { // construct queryExpression based on queriedSchema and filters - val queryExpression = Converter.toQueryExpression(queriedSchema, filters) + val queryExpression = WideConverter.toQueryExpression(queriedSchema, filters) val queryDataSet = readTsFile.query(queryExpression, file.start.asInstanceOf[java.lang.Long], @@ -222,7 +222,7 @@ private[tsfile] class DefaultSource extends FileFormat with DataSourceRegister { if (pos != -1) { curField = fields.get(pos) } - rowBuffer(index) = Converter.toSqlValue(curField) + rowBuffer(index) = WideConverter.toSqlValue(curField) } index += 1 }) diff --git a/spark-tsfile/src/main/scala/org/apache/iotdb/tsfile/NarrowConverter.scala b/spark-tsfile/src/main/scala/org/apache/iotdb/tsfile/NarrowConverter.scala new file mode 100644 index 0000000000..04dffd28dd --- /dev/null +++ b/spark-tsfile/src/main/scala/org/apache/iotdb/tsfile/NarrowConverter.scala @@ -0,0 +1,516 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.tsfile + +import java.util + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.FileStatus +import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor +import org.apache.iotdb.tsfile.common.constant.QueryConstant +import org.apache.iotdb.tsfile.file.metadata.TsFileMetaData +import org.apache.iotdb.tsfile.file.metadata.enums.{TSDataType, TSEncoding} +import org.apache.iotdb.tsfile.io.HDFSInput +import org.apache.iotdb.tsfile.qp.QueryProcessor +import org.apache.iotdb.tsfile.qp.common.{BasicOperator, FilterOperator, SQLConstant, TSQueryPlan} +import org.apache.iotdb.tsfile.read.TsFileSequenceReader +import org.apache.iotdb.tsfile.read.common.{Field, Path} +import org.apache.iotdb.tsfile.read.expression.impl.{BinaryExpression, GlobalTimeExpression, SingleSeriesExpression} +import org.apache.iotdb.tsfile.read.expression.{IExpression, QueryExpression} +import org.apache.iotdb.tsfile.read.filter.{TimeFilter, ValueFilter} +import org.apache.iotdb.tsfile.utils.Binary +import org.apache.iotdb.tsfile.write.record.TSRecord +import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint +import org.apache.iotdb.tsfile.write.schema.{MeasurementSchema, SchemaBuilder} +import org.apache.parquet.filter2.predicate.Operators.NotEq +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.sources._ +import org.apache.spark.sql.types._ + +import scala.collection.JavaConversions._ +import scala.collection.mutable +import scala.collection.mutable.ListBuffer + +/** + * This object contains methods that are used to convert schema and data between SparkSQL and TSFile. + * + */ +object NarrowConverter extends Converter{ + + val DEVICE_NAME = "device_name" + + /** + * Get union series in all tsfiles. + * e.g. (tsfile1:s1,s2) & (tsfile2:s2,s3) = s1,s2,s3 + * + * @param files tsfiles + * @param conf hadoop configuration + * @return union series + */ + def getUnionSeries(files: Seq[FileStatus], conf: Configuration): util.ArrayList[Series] = { + val unionSeries = new util.ArrayList[Series]() + var seriesSet: mutable.Set[String] = mutable.Set() + + files.foreach(f => { + val in = new HDFSInput(f.getPath, conf) + val reader = new TsFileSequenceReader(in) + val tsFileMetaData = reader.readFileMetadata + val measurements = tsFileMetaData.getMeasurementSchema + + measurements.foreach(m => { + if (!seriesSet.contains(m._1)) { + seriesSet += m._1 + unionSeries.add(new Series(m._1, m._2.getType) + ) + } + }) + + in.close() + }) + + unionSeries + } + + + /** + * Construct fields with the TSFile data type converted to the SparkSQL data type. + * + * @param tsfileSchema tsfileSchema + * @param addTimeField true to add a time field; false to not + * @return the converted list of fields + */ + override def toSqlField(tsfileSchema: util.ArrayList[Series], addTimeField: Boolean): ListBuffer[StructField] = { + val fields = new ListBuffer[StructField]() + + if (addTimeField) { + fields += StructField(QueryConstant.RESERVED_TIME, LongType, nullable = false) + } + fields += StructField(DEVICE_NAME, StringType, nullable = false) + + tsfileSchema.foreach((series: Series) => { + fields += StructField(series.getName, series.getType match { + case TSDataType.BOOLEAN => BooleanType + case TSDataType.INT32 => IntegerType + case TSDataType.INT64 => LongType + case TSDataType.FLOAT => FloatType + case TSDataType.DOUBLE => DoubleType + case TSDataType.TEXT => StringType + case other => throw new UnsupportedOperationException(s"Unsupported type $other") + }, nullable = true) + }) + + fields + } + + /** + * Prepare queriedSchema from requiredSchema. + * + * @param requiredSchema requiredSchema + * @param tsFileMetaData tsFileMetaData + * @return + */ + def prepSchema(requiredSchema: StructType, tsFileMetaData: TsFileMetaData): StructType = { + var queriedSchema: StructType = new StructType() + + if (requiredSchema.isEmpty + || (requiredSchema.size == 1 && requiredSchema.iterator.next().name == QueryConstant.RESERVED_TIME)) { + // for example, (i) select count(*) from table; (ii) select time from table + + val fileSchema = WideConverter.getSeries(tsFileMetaData) + queriedSchema = StructType(toSqlField(fileSchema, false).toList) + + } else { + // Remove nonexistent schema according to the current file's metadata. + // This may happen when queried TsFiles in the same folder do not have the same schema. + + val measurementIds = tsFileMetaData.getMeasurementSchema.keySet() + requiredSchema.foreach(f => { + if (!QueryConstant.RESERVED_TIME.equals(f.name) && !DEVICE_NAME.equals(f.name)) { + if (measurementIds.contains(f.name)) { + queriedSchema = queriedSchema.add(f) + } + } + }) + + } + + queriedSchema + } + + + /** + * Construct queryExpression based on queriedSchema and filters. + * + * @param schema schema + * @param device_name device_names + * @param measurement_name measurement_names + * @return query expression + */ + def toQueryExpression(schema: StructType, + device_name: util.Set[String], + measurement_name: util.Set[String], + filters: Seq[Filter], + in: TsFileSequenceReader, + start: java.lang.Long, + end: java.lang.Long): util.ArrayList[QueryExpression] = { + // build filter + var finalFilter: FilterOperator = null + //remove invalid filters + val validFilters = new ListBuffer[Filter]() + //query processor + val queryProcessor = new QueryProcessor() + filters.foreach(f => { + if (isValidFilter(f)) + validFilters.add(f) + } + ) + if (validFilters.nonEmpty) { + //construct filters to a binary tree + var filterTree = validFilters.get(0) + for (i <- 1 until validFilters.length) { + filterTree = And(filterTree, validFilters.get(i)) + } + + //convert filterTree to FilterOperator + finalFilter = transformFilter(filterTree) + } + + //get paths from device name and measurement name + val res = new util.ArrayList[QueryExpression] + val paths = new util.ArrayList[String](measurement_name) + + val columnNames = new util.ArrayList[String]() + columnNames += DEVICE_NAME + val queryPlans = queryProcessor.generatePlans(finalFilter, paths, columnNames, in, start, end) + + queryPlans.foreach(plan => { + res.add(queryToExpression(schema, plan)) + }) + + res + } + + /** + * Used in toQueryConfigs() to convert one query plan to one QueryConfig. + * + * @param queryPlan TsFile logical query plan + * @return TsFile physical query plan + */ + private def queryToExpression(schema: StructType, queryPlan: TSQueryPlan): QueryExpression = { + val selectedColumns = queryPlan.getPaths + val timeFilter = queryPlan.getTimeFilterOperator + val valueFilter = queryPlan.getValueFilterOperator + + val paths = new util.ArrayList[Path]() + selectedColumns.foreach(path => { + paths.add(new Path(path)) + }) + + val deviceName = paths.get(0).getDevice + var finalFilter: IExpression = null + if (timeFilter != null) { + finalFilter = transformFilterToExpression(schema, timeFilter, deviceName) + } + if (valueFilter != null) { + if (finalFilter != null) { + finalFilter = BinaryExpression.and(finalFilter, transformFilterToExpression(schema, valueFilter, deviceName)) + } + else { + finalFilter = transformFilterToExpression(schema, valueFilter, deviceName) + } + } + + QueryExpression.create(paths, finalFilter) + } + + private def isValidFilter(filter: Filter): Boolean = { + filter match { + case f: EqualTo => true + case f: GreaterThan => true + case f: GreaterThanOrEqual => true + case f: LessThan => true + case f: LessThanOrEqual => true + case f: Or => isValidFilter(f.left) && isValidFilter(f.right) + case f: And => isValidFilter(f.left) && isValidFilter(f.right) + case f: Not => isValidFilter(f.child) + case _ => false + } + } + + /** + * Transform sparkSQL's filter binary tree to filterOperator binary tree. + * + * @param node filter tree's node + * @return TSFile filterOperator binary tree + */ + private def transformFilter(node: Filter): FilterOperator = { + var operator: FilterOperator = null + node match { + case node: Not => + operator = new FilterOperator(SQLConstant.KW_NOT) + operator.addChildOPerator(transformFilter(node.child)) + operator + + case node: And => + operator = new FilterOperator(SQLConstant.KW_AND) + operator.addChildOPerator(transformFilter(node.left)) + operator.addChildOPerator(transformFilter(node.right)) + operator + + case node: Or => + operator = new FilterOperator(SQLConstant.KW_OR) + operator.addChildOPerator(transformFilter(node.left)) + operator.addChildOPerator(transformFilter(node.right)) + operator + + case node: EqualTo => + operator = new BasicOperator(SQLConstant.EQUAL, node.attribute, node.value.toString) + operator + + case node: LessThan => + operator = new BasicOperator(SQLConstant.LESSTHAN, node.attribute, node.value.toString) + operator + + case node: LessThanOrEqual => + operator = new BasicOperator(SQLConstant.LESSTHANOREQUALTO, node.attribute, node.value.toString) + operator + + case node: GreaterThan => + operator = new BasicOperator(SQLConstant.GREATERTHAN, node.attribute, node.value.toString) + operator + + case node: GreaterThanOrEqual => + operator = new BasicOperator(SQLConstant.GREATERTHANOREQUALTO, node.attribute, node.value.toString) + operator + + case _ => + throw new Exception("unsupported filter:" + node.toString) + } + } + + /** + * Transform SparkSQL's filter binary tree to TsFile's filter expression. + * + * @param schema to get relative columns' dataType information + * @param node filter tree's node + * @return TSFile filter expression + */ + private def transformFilterToExpression(schema: StructType, node: FilterOperator, device_name: String): IExpression = { + var filter: IExpression = null + node.getTokenIntType match { + case SQLConstant.KW_NOT => + throw new Exception("NOT filter is not supported now") + + case SQLConstant.KW_AND => + node.childOperators.foreach((child: FilterOperator) => { + if (filter == null) { + filter = transformFilterToExpression(schema, child, device_name) + } + else { + filter = BinaryExpression.and(filter, transformFilterToExpression(schema, child, device_name)) + } + }) + filter + + case SQLConstant.KW_OR => + node.childOperators.foreach((child: FilterOperator) => { + if (filter == null) { + filter = transformFilterToExpression(schema, child, device_name) + } + else { + filter = BinaryExpression.or(filter, transformFilterToExpression(schema, child, device_name)) + } + }) + filter + + + case SQLConstant.EQUAL => + val basicOperator = node.asInstanceOf[BasicOperator] + if (QueryConstant.RESERVED_TIME.equals(basicOperator.getSeriesPath.toLowerCase())) { + filter = new GlobalTimeExpression(TimeFilter.eq(java.lang.Long.parseLong(basicOperator.getSeriesValue))) + } else { + filter = constructExpression(schema, basicOperator.getSeriesPath, basicOperator.getSeriesValue, FilterTypes.Eq, device_name) + } + filter + + case SQLConstant.LESSTHAN => + val basicOperator = node.asInstanceOf[BasicOperator] + if (QueryConstant.RESERVED_TIME.equals(basicOperator.getSeriesPath.toLowerCase())) { + filter = new GlobalTimeExpression(TimeFilter.lt(java.lang.Long.parseLong(basicOperator.getSeriesValue))) + } else { + filter = constructExpression(schema, basicOperator.getSeriesPath, basicOperator.getSeriesValue, FilterTypes.Lt, device_name) + } + filter + + case SQLConstant.LESSTHANOREQUALTO => + val basicOperator = node.asInstanceOf[BasicOperator] + if (QueryConstant.RESERVED_TIME.equals(basicOperator.getSeriesPath.toLowerCase())) { + filter = new GlobalTimeExpression(TimeFilter.ltEq(java.lang.Long.parseLong(basicOperator.getSeriesValue))) + } else { + filter = constructExpression(schema, basicOperator.getSeriesPath, basicOperator.getSeriesValue, FilterTypes.LtEq, device_name) + } + filter + + case SQLConstant.GREATERTHAN => + val basicOperator = node.asInstanceOf[BasicOperator] + if (QueryConstant.RESERVED_TIME.equals(basicOperator.getSeriesPath.toLowerCase())) { + filter = new GlobalTimeExpression(TimeFilter.gt(java.lang.Long.parseLong(basicOperator.getSeriesValue))) + } else { + filter = constructExpression(schema, basicOperator.getSeriesPath, basicOperator.getSeriesValue, FilterTypes.Gt, device_name) + } + filter + + case SQLConstant.GREATERTHANOREQUALTO => + val basicOperator = node.asInstanceOf[BasicOperator] + if (QueryConstant.RESERVED_TIME.equals(basicOperator.getSeriesPath.toLowerCase())) { + filter = new GlobalTimeExpression(TimeFilter.gtEq(java.lang.Long.parseLong(basicOperator.getSeriesValue))) + } else { + filter = constructExpression(schema, basicOperator.getSeriesPath, basicOperator.getSeriesValue, FilterTypes.GtEq, device_name) + } + filter + + case other => + throw new Exception(s"Unsupported filter $other") + } + } + + + def constructExpression(schema: StructType, nodeName: String, nodeValue: String, filterType: FilterTypes.Value, device_name: String): IExpression = { + val fieldNames = schema.fieldNames + val index = fieldNames.indexOf(nodeName) + if (index == -1) { + // placeholder for an invalid filter in the current TsFile + val filter = new SingleSeriesExpression(new Path(device_name + SQLConstant.PATH_SEPARATOR + nodeName), null) + filter + } else { + val dataType = schema.get(index).dataType + + filterType match { + case FilterTypes.Eq => + dataType match { + case BooleanType => + val filter = new SingleSeriesExpression(new Path(device_name + SQLConstant.PATH_SEPARATOR + nodeName), + ValueFilter.eq(new java.lang.Boolean(nodeValue))) + filter + case IntegerType => + val filter = new SingleSeriesExpression(new Path(device_name + SQLConstant.PATH_SEPARATOR + nodeName), + ValueFilter.eq(new java.lang.Integer(nodeValue))) + filter + case LongType => + val filter = new SingleSeriesExpression(new Path(device_name + SQLConstant.PATH_SEPARATOR + nodeName), + ValueFilter.eq(new java.lang.Long(nodeValue))) + filter + case FloatType => + val filter = new SingleSeriesExpression(new Path(device_name + SQLConstant.PATH_SEPARATOR + nodeName), + ValueFilter.eq(new java.lang.Float(nodeValue))) + filter + case DoubleType => + val filter = new SingleSeriesExpression(new Path(device_name + SQLConstant.PATH_SEPARATOR + nodeName), + ValueFilter.eq(new java.lang.Double(nodeValue))) + filter + case StringType => + val filter = new SingleSeriesExpression(new Path(device_name + SQLConstant.PATH_SEPARATOR + nodeName), + ValueFilter.eq(nodeValue)) + filter + case other => throw new UnsupportedOperationException(s"Unsupported type $other") + } + case FilterTypes.Gt => + dataType match { + case IntegerType => + val filter = new SingleSeriesExpression(new Path(device_name + SQLConstant.PATH_SEPARATOR + nodeName), + ValueFilter.gt(new java.lang.Integer(nodeValue))) + filter + case LongType => + val filter = new SingleSeriesExpression(new Path(device_name + SQLConstant.PATH_SEPARATOR + nodeName), + ValueFilter.gt(new java.lang.Long(nodeValue))) + filter + case FloatType => + val filter = new SingleSeriesExpression(new Path(device_name + SQLConstant.PATH_SEPARATOR + nodeName), + ValueFilter.gt(new java.lang.Float(nodeValue))) + filter + case DoubleType => + val filter = new SingleSeriesExpression(new Path(device_name + SQLConstant.PATH_SEPARATOR + nodeName), + ValueFilter.gt(new java.lang.Double(nodeValue))) + filter + case other => throw new UnsupportedOperationException(s"Unsupported type $other") + } + case FilterTypes.GtEq => + dataType match { + case IntegerType => + val filter = new SingleSeriesExpression(new Path(device_name + SQLConstant.PATH_SEPARATOR + nodeName), + ValueFilter.gtEq(new java.lang.Integer(nodeValue))) + filter + case LongType => + val filter = new SingleSeriesExpression(new Path(device_name + SQLConstant.PATH_SEPARATOR + nodeName), + ValueFilter.gtEq(new java.lang.Long(nodeValue))) + filter + case FloatType => + val filter = new SingleSeriesExpression(new Path(device_name + SQLConstant.PATH_SEPARATOR + nodeName), + ValueFilter.gtEq(new java.lang.Float(nodeValue))) + filter + case DoubleType => + val filter = new SingleSeriesExpression(new Path(device_name + SQLConstant.PATH_SEPARATOR + nodeName), + ValueFilter.gtEq(new java.lang.Double(nodeValue))) + filter + case other => throw new UnsupportedOperationException(s"Unsupported type $other") + } + case FilterTypes.Lt => + dataType match { + case IntegerType => + val filter = new SingleSeriesExpression(new Path(device_name + SQLConstant.PATH_SEPARATOR + nodeName), + ValueFilter.lt(new java.lang.Integer(nodeValue))) + filter + case LongType => + val filter = new SingleSeriesExpression(new Path(device_name + SQLConstant.PATH_SEPARATOR + nodeName), + ValueFilter.lt(new java.lang.Long(nodeValue))) + filter + case FloatType => + val filter = new SingleSeriesExpression(new Path(device_name + SQLConstant.PATH_SEPARATOR + nodeName), + ValueFilter.lt(new java.lang.Float(nodeValue))) + filter + case DoubleType => + val filter = new SingleSeriesExpression(new Path(device_name + SQLConstant.PATH_SEPARATOR + nodeName), + ValueFilter.lt(new java.lang.Double(nodeValue))) + filter + case other => throw new UnsupportedOperationException(s"Unsupported type $other") + } + case FilterTypes.LtEq => + dataType match { + case IntegerType => + val filter = new SingleSeriesExpression(new Path(device_name + SQLConstant.PATH_SEPARATOR + nodeName), + ValueFilter.ltEq(new java.lang.Integer(nodeValue))) + filter + case LongType => + val filter = new SingleSeriesExpression(new Path(device_name + SQLConstant.PATH_SEPARATOR + nodeName), + ValueFilter.ltEq(new java.lang.Long(nodeValue))) + filter + case FloatType => + val filter = new SingleSeriesExpression(new Path(device_name + SQLConstant.PATH_SEPARATOR + nodeName), + ValueFilter.ltEq(new java.lang.Float(nodeValue))) + filter + case DoubleType => + val filter = new SingleSeriesExpression(new Path(device_name + SQLConstant.PATH_SEPARATOR + nodeName), + ValueFilter.ltEq(new java.lang.Double(nodeValue))) + filter + case other => throw new UnsupportedOperationException(s"Unsupported type $other") + } + } + } + } +} diff --git a/spark-tsfile/src/main/scala/org/apache/iotdb/tsfile/TsFileOutputWriter.scala b/spark-tsfile/src/main/scala/org/apache/iotdb/tsfile/TsFileOutputWriter.scala index 88a6755082..b421e94d78 100644 --- a/spark-tsfile/src/main/scala/org/apache/iotdb/tsfile/TsFileOutputWriter.scala +++ b/spark-tsfile/src/main/scala/org/apache/iotdb/tsfile/TsFileOutputWriter.scala @@ -33,13 +33,13 @@ private[tsfile] class TsFileOutputWriter( context: TaskAttemptContext) extends OutputWriter { private val recordWriter: RecordWriter[NullWritable, TSRecord] = { - val fileSchema = Converter.toTsFileSchema(dataSchema, options) + val fileSchema = WideConverter.toTsFileSchema(dataSchema, options) new TsFileOutputFormat(fileSchema).getRecordWriter(context) } override def write(row: InternalRow): Unit = { if (row != null) { - val tsRecord = Converter.toTsRecord(row, dataSchema) + val tsRecord = WideConverter.toTsRecord(row, dataSchema) tsRecord.foreach(r => { recordWriter.write(NullWritable.get(), r) }) diff --git a/spark-tsfile/src/main/scala/org/apache/iotdb/tsfile/WideConverter.scala b/spark-tsfile/src/main/scala/org/apache/iotdb/tsfile/WideConverter.scala new file mode 100755 index 0000000000..17683a3292 --- /dev/null +++ b/spark-tsfile/src/main/scala/org/apache/iotdb/tsfile/WideConverter.scala @@ -0,0 +1,423 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.tsfile + +import java.util + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.FileStatus +import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor +import org.apache.iotdb.tsfile.common.constant.QueryConstant +import org.apache.iotdb.tsfile.file.metadata.TsFileMetaData +import org.apache.iotdb.tsfile.file.metadata.enums.{TSDataType, TSEncoding} +import org.apache.iotdb.tsfile.io.HDFSInput +import org.apache.iotdb.tsfile.read.TsFileSequenceReader +import org.apache.iotdb.tsfile.read.common.{Field, Path} +import org.apache.iotdb.tsfile.read.expression.impl.{BinaryExpression, GlobalTimeExpression, SingleSeriesExpression} +import org.apache.iotdb.tsfile.read.expression.{IExpression, QueryExpression} +import org.apache.iotdb.tsfile.read.filter.{TimeFilter, ValueFilter} +import org.apache.iotdb.tsfile.utils.Binary +import org.apache.iotdb.tsfile.write.record.TSRecord +import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint +import org.apache.iotdb.tsfile.write.schema.{Schema, MeasurementSchema, SchemaBuilder} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.sources._ +import org.apache.spark.sql.types._ + +import scala.collection.JavaConversions._ +import scala.collection.mutable +import scala.collection.mutable.ListBuffer + + +/** + * This object contains methods that are used to convert schema and data between SparkSQL and TSFile. + * + */ +object WideConverter extends Converter{ + + /** + * Get series from the given tsFileMetaData. + * + * @param tsFileMetaData TsFileMetaData + * @return union series + */ + def getSeries(tsFileMetaData: TsFileMetaData): util.ArrayList[Series] = { + val series = new util.ArrayList[Series]() + + val devices = tsFileMetaData.getDeviceMap.keySet() + val measurements = tsFileMetaData.getMeasurementSchema + + devices.foreach(d => { + measurements.foreach(m => { + val fullPath = d + "." + m._1 + series.add(new Series(fullPath, m._2.getType) + ) + }) + }) + + series + } + + /** + * Get union series in all tsfiles. + * e.g. (tsfile1:s1,s2) & (tsfile2:s2,s3) = s1,s2,s3 + * + * @param files tsfiles + * @param conf hadoop configuration + * @return union series + */ + def getUnionSeries(files: Seq[FileStatus], conf: Configuration): util.ArrayList[Series] = { + val unionSeries = new util.ArrayList[Series]() + var seriesSet: mutable.Set[String] = mutable.Set() + + files.foreach(f => { + val in = new HDFSInput(f.getPath, conf) + val reader = new TsFileSequenceReader(in) + val tsFileMetaData = reader.readFileMetadata + val devices = tsFileMetaData.getDeviceMap.keySet() + val measurements = tsFileMetaData.getMeasurementSchema + + devices.foreach(d => { + measurements.foreach(m => { + val fullPath = d + "." + m._1 + if (!seriesSet.contains(fullPath)) { + seriesSet += fullPath + unionSeries.add(new Series(fullPath, m._2.getType) + ) + } + }) + }) + in.close() + }) + + unionSeries + } + + /** + * Prepare queriedSchema from requiredSchema. + * + * @param requiredSchema requiredSchema + * @param tsFileMetaData tsFileMetaData + * @return + */ + def prepSchema(requiredSchema: StructType, tsFileMetaData: TsFileMetaData): StructType = { + var queriedSchema: StructType = new StructType() + + if (requiredSchema.isEmpty + || (requiredSchema.size == 1 && requiredSchema.iterator.next().name == QueryConstant.RESERVED_TIME)) { + // for example, (i) select count(*) from table; (ii) select time from table + + val fileSchema = WideConverter.getSeries(tsFileMetaData) + queriedSchema = StructType(toSqlField(fileSchema, false).toList) + + } else { // Remove nonexistent schema according to the current file's metadata. + // This may happen when queried TsFiles in the same folder do not have the same schema. + + val devices = tsFileMetaData.getDeviceMap.keySet() + val measurementIds = tsFileMetaData.getMeasurementSchema.keySet() + requiredSchema.foreach(f => { + if (!QueryConstant.RESERVED_TIME.equals(f.name)) { + val path = new org.apache.iotdb.tsfile.read.common.Path(f.name) + if (devices.contains(path.getDevice) && measurementIds.contains(path.getMeasurement)) { + queriedSchema = queriedSchema.add(f) + } + } + }) + + } + + queriedSchema + } + + + /** + * Construct queryExpression based on queriedSchema and filters. + * + * @param schema selected columns + * @param filters filters + * @return query expression + */ + def toQueryExpression(schema: StructType, filters: Seq[Filter]): QueryExpression = { + //get paths from schema + val paths = new util.ArrayList[org.apache.iotdb.tsfile.read.common.Path] + schema.foreach(f => { + if (!QueryConstant.RESERVED_TIME.equals(f.name)) { // the time field is excluded + paths.add(new org.apache.iotdb.tsfile.read.common.Path(f.name)) + } + }) + + //remove invalid filters + val validFilters = new ListBuffer[Filter]() + filters.foreach { f => { + if (isValidFilter(f)) + validFilters.add(f) + } + } + if (validFilters.isEmpty) { + val queryExpression = QueryExpression.create(paths, null) + queryExpression + } else { + //construct filters to a binary tree + var filterTree = validFilters.get(0) + for (i <- 1 until validFilters.length) { + filterTree = And(filterTree, validFilters.get(i)) + } + + //convert filterTree to FilterOperator + val finalFilter = transformFilter(schema, filterTree) + + // create query expression + val queryExpression = QueryExpression.create(paths, finalFilter) + + queryExpression + } + } + + /** + * Construct fields with the TSFile data type converted to the SparkSQL data type. + * + * @param tsfileSchema tsfileSchema + * @param addTimeField true to add a time field; false to not + * @return the converted list of fields + */ + def toSqlField(tsfileSchema: util.ArrayList[Series], addTimeField: Boolean): ListBuffer[StructField] = { + val fields = new ListBuffer[StructField]() + + if (addTimeField) { + fields += StructField(QueryConstant.RESERVED_TIME, LongType, nullable = false) + } + + tsfileSchema.foreach((series: Series) => { + fields += StructField(series.getName, series.getType match { + case TSDataType.BOOLEAN => BooleanType + case TSDataType.INT32 => IntegerType + case TSDataType.INT64 => LongType + case TSDataType.FLOAT => FloatType + case TSDataType.DOUBLE => DoubleType + case TSDataType.TEXT => StringType + case other => throw new UnsupportedOperationException(s"Unsupported type $other") + }, nullable = true) + }) + + fields + } + + private def isValidFilter(filter: Filter): Boolean = { + filter match { + case f: EqualTo => true + case f: GreaterThan => true + case f: GreaterThanOrEqual => true + case f: LessThan => true + case f: LessThanOrEqual => true + case f: Or => isValidFilter(f.left) && isValidFilter(f.right) + case f: And => isValidFilter(f.left) && isValidFilter(f.right) + case f: Not => isValidFilter(f.child) + case _ => false + } + } + + /** + * Transform SparkSQL's filter binary tree to TsFile's filter expression. + * + * @param schema to get relative columns' dataType information + * @param node filter tree's node + * @return TSFile filter expression + */ + private def transformFilter(schema: StructType, node: Filter): IExpression = { + var filter: IExpression = null + node match { + case node: Not => + throw new Exception("NOT filter is not supported now") + + case node: And => + filter = BinaryExpression.and(transformFilter(schema, node.left), transformFilter(schema, node.right)) + filter + + case node: Or => + filter = BinaryExpression.or(transformFilter(schema, node.left), transformFilter(schema, node.right)) + filter + + case node: EqualTo => + if (QueryConstant.RESERVED_TIME.equals(node.attribute.toLowerCase())) { + filter = new GlobalTimeExpression(TimeFilter.eq(node.value.asInstanceOf[java.lang.Long])) + } else { + filter = constructFilter(schema, node.attribute, node.value, FilterTypes.Eq) + } + filter + + case node: LessThan => + if (QueryConstant.RESERVED_TIME.equals(node.attribute.toLowerCase())) { + filter = new GlobalTimeExpression(TimeFilter.lt(node.value.asInstanceOf[java.lang.Long])) + } else { + filter = constructFilter(schema, node.attribute, node.value, FilterTypes.Lt) + } + filter + + case node: LessThanOrEqual => + if (QueryConstant.RESERVED_TIME.equals(node.attribute.toLowerCase())) { + filter = new GlobalTimeExpression(TimeFilter.ltEq(node.value.asInstanceOf[java.lang.Long])) + } else { + filter = constructFilter(schema, node.attribute, node.value, FilterTypes.LtEq) + } + filter + + case node: GreaterThan => + if (QueryConstant.RESERVED_TIME.equals(node.attribute.toLowerCase())) { + filter = new GlobalTimeExpression(TimeFilter.gt(node.value.asInstanceOf[java.lang.Long])) + } else { + filter = constructFilter(schema, node.attribute, node.value, FilterTypes.Gt) + } + filter + + case node: GreaterThanOrEqual => + if (QueryConstant.RESERVED_TIME.equals(node.attribute.toLowerCase())) { + filter = new GlobalTimeExpression(TimeFilter.gtEq(node.value.asInstanceOf[java.lang.Long])) + } else { + filter = constructFilter(schema, node.attribute, node.value, FilterTypes.GtEq) + } + filter + + case other => + throw new Exception(s"Unsupported filter $other") + } + } + + def constructFilter(schema: StructType, nodeName: String, nodeValue: Any, filterType: FilterTypes.Value): IExpression = { + val fieldNames = schema.fieldNames + val index = fieldNames.indexOf(nodeName) + if (index == -1) { + // placeholder for an invalid filter in the current TsFile + val filter = new SingleSeriesExpression(new Path(nodeName), null) + filter + } else { + val dataType = schema.get(index).dataType + + filterType match { + case FilterTypes.Eq => + dataType match { + case BooleanType => + val filter = new SingleSeriesExpression(new Path(nodeName), + ValueFilter.eq(nodeValue.asInstanceOf[java.lang.Boolean])) + filter + case IntegerType => + val filter = new SingleSeriesExpression(new Path(nodeName), + ValueFilter.eq(nodeValue.asInstanceOf[java.lang.Integer])) + filter + case LongType => + val filter = new SingleSeriesExpression(new Path(nodeName), + ValueFilter.eq(nodeValue.asInstanceOf[java.lang.Long])) + filter + case FloatType => + val filter = new SingleSeriesExpression(new Path(nodeName), + ValueFilter.eq(nodeValue.asInstanceOf[java.lang.Float])) + filter + case DoubleType => + val filter = new SingleSeriesExpression(new Path(nodeName), + ValueFilter.eq(nodeValue.asInstanceOf[java.lang.Double])) + filter + case StringType => + val filter = new SingleSeriesExpression(new Path(nodeName), + ValueFilter.eq(new Binary(nodeValue.toString))) + filter + case other => throw new UnsupportedOperationException(s"Unsupported type $other") + } + case FilterTypes.Gt => + dataType match { + case IntegerType => + val filter = new SingleSeriesExpression(new Path(nodeName), + ValueFilter.gt(nodeValue.asInstanceOf[java.lang.Integer])) + filter + case LongType => + val filter = new SingleSeriesExpression(new Path(nodeName), + ValueFilter.gt(nodeValue.asInstanceOf[java.lang.Long])) + filter + case FloatType => + val filter = new SingleSeriesExpression(new Path(nodeName), + ValueFilter.gt(nodeValue.asInstanceOf[java.lang.Float])) + filter + case DoubleType => + val filter = new SingleSeriesExpression(new Path(nodeName), + ValueFilter.gt(nodeValue.asInstanceOf[java.lang.Double])) + filter + case other => throw new UnsupportedOperationException(s"Unsupported type $other") + } + case FilterTypes.GtEq => + dataType match { + case IntegerType => + val filter = new SingleSeriesExpression(new Path(nodeName), + ValueFilter.gtEq(nodeValue.asInstanceOf[java.lang.Integer])) + filter + case LongType => + val filter = new SingleSeriesExpression(new Path(nodeName), + ValueFilter.gtEq(nodeValue.asInstanceOf[java.lang.Long])) + filter + case FloatType => + val filter = new SingleSeriesExpression(new Path(nodeName), + ValueFilter.gtEq(nodeValue.asInstanceOf[java.lang.Float])) + filter + case DoubleType => + val filter = new SingleSeriesExpression(new Path(nodeName), + ValueFilter.gtEq(nodeValue.asInstanceOf[java.lang.Double])) + filter + case other => throw new UnsupportedOperationException(s"Unsupported type $other") + } + case FilterTypes.Lt => + dataType match { + case IntegerType => + val filter = new SingleSeriesExpression(new Path(nodeName), + ValueFilter.lt(nodeValue.asInstanceOf[java.lang.Integer])) + filter + case LongType => + val filter = new SingleSeriesExpression(new Path(nodeName), + ValueFilter.lt(nodeValue.asInstanceOf[java.lang.Long])) + filter + case FloatType => + val filter = new SingleSeriesExpression(new Path(nodeName), + ValueFilter.lt(nodeValue.asInstanceOf[java.lang.Float])) + filter + case DoubleType => + val filter = new SingleSeriesExpression(new Path(nodeName), + ValueFilter.lt(nodeValue.asInstanceOf[java.lang.Double])) + filter + case other => throw new UnsupportedOperationException(s"Unsupported type $other") + } + case FilterTypes.LtEq => + dataType match { + case IntegerType => + val filter = new SingleSeriesExpression(new Path(nodeName), + ValueFilter.ltEq(nodeValue.asInstanceOf[java.lang.Integer])) + filter + case LongType => + val filter = new SingleSeriesExpression(new Path(nodeName), + ValueFilter.ltEq(nodeValue.asInstanceOf[java.lang.Long])) + filter + case FloatType => + val filter = new SingleSeriesExpression(new Path(nodeName), + ValueFilter.ltEq(nodeValue.asInstanceOf[java.lang.Float])) + filter + case DoubleType => + val filter = new SingleSeriesExpression(new Path(nodeName), + ValueFilter.ltEq(nodeValue.asInstanceOf[java.lang.Double])) + filter + case other => throw new UnsupportedOperationException(s"Unsupported type $other") + } + } + } + } +} diff --git a/spark-tsfile/src/test/scala/org/apache/iotdb/tsfile/ConverterTest.scala b/spark-tsfile/src/test/scala/org/apache/iotdb/tsfile/ConverterTest.scala index 5edc490eed..2bbe8724cb 100644 --- a/spark-tsfile/src/test/scala/org/apache/iotdb/tsfile/ConverterTest.scala +++ b/spark-tsfile/src/test/scala/org/apache/iotdb/tsfile/ConverterTest.scala @@ -87,7 +87,7 @@ class ConverterTest extends FunSuite with BeforeAndAfterAll { val reader: TsFileSequenceReader = new TsFileSequenceReader(in) val tsFileMetaData = reader.readFileMetadata - val series = Converter.getSeries(tsFileMetaData) + val series = WideConverter.getSeries(tsFileMetaData) Assert.assertEquals(6, series.size()) Assert.assertEquals("[device_1.sensor_3,INT32]", series.get(0).toString) @@ -111,7 +111,7 @@ class ConverterTest extends FunSuite with BeforeAndAfterAll { val statusSeq: Seq[FileStatus] = Seq(status1, status2) - val tsfileSchema = Converter.getUnionSeries(statusSeq, conf) + val tsfileSchema = WideConverter.getUnionSeries(statusSeq, conf) Assert.assertEquals(tsfileSchema.size(), 6) Assert.assertEquals("[device_1.sensor_3,INT32]", tsfileSchema.get(0).toString) @@ -136,12 +136,12 @@ class ConverterTest extends FunSuite with BeforeAndAfterAll { val stringField = new Field(TSDataType.TEXT) stringField.setBinaryV(new Binary("pass")) - Assert.assertEquals(Converter.toSqlValue(boolField), true) - Assert.assertEquals(Converter.toSqlValue(intField), 32) - Assert.assertEquals(Converter.toSqlValue(longField), 64l) - Assert.assertEquals(Converter.toSqlValue(floatField), 3.14f) - Assert.assertEquals(Converter.toSqlValue(doubleField), 0.618d) - Assert.assertEquals(Converter.toSqlValue(stringField), "pass") + Assert.assertEquals(WideConverter.toSqlValue(boolField), true) + Assert.assertEquals(WideConverter.toSqlValue(intField), 32) + Assert.assertEquals(WideConverter.toSqlValue(longField), 64l) + Assert.assertEquals(WideConverter.toSqlValue(floatField), 3.14f) + Assert.assertEquals(WideConverter.toSqlValue(doubleField), 0.618d) + Assert.assertEquals(WideConverter.toSqlValue(stringField), "pass") } test("testToSparkSqlSchema") { @@ -153,7 +153,7 @@ class ConverterTest extends FunSuite with BeforeAndAfterAll { fields.add(new Series("device_2.sensor_1", TSDataType.FLOAT)) fields.add(new Series("device_2.sensor_2", TSDataType.INT32)) - val sqlSchema = Converter.toSqlSchema(fields) + val sqlSchema = WideConverter.toSqlSchema(fields) val expectedFields: util.ArrayList[StructField] = new util.ArrayList[StructField]() expectedFields.add(StructField(QueryConstant.RESERVED_TIME, LongType, false)) @@ -179,7 +179,7 @@ class ConverterTest extends FunSuite with BeforeAndAfterAll { requiredFields.add(StructField("device_1.sensor_2", IntegerType, true)) val requiredSchema = StructType(requiredFields) - val filteredSchema = Converter.prepSchema(requiredSchema, tsFileMetaData) + val filteredSchema = WideConverter.prepSchema(requiredSchema, tsFileMetaData) Assert.assertEquals(3, filteredSchema.size) val fields = filteredSchema.fields @@ -199,7 +199,7 @@ class ConverterTest extends FunSuite with BeforeAndAfterAll { requiredFields.add(StructField(QueryConstant.RESERVED_TIME, LongType, false)) val requiredSchema = StructType(requiredFields) - val filteredSchema = Converter.prepSchema(requiredSchema, tsFileMetaData) + val filteredSchema = WideConverter.prepSchema(requiredSchema, tsFileMetaData) Assert.assertEquals(6, filteredSchema.size) val fields = filteredSchema.fields @@ -225,7 +225,7 @@ class ConverterTest extends FunSuite with BeforeAndAfterAll { val schema = StructType(fields) val row: InternalRow = new GenericInternalRow(Array(1L, null, 1.2f, 20, 19, 2.3f, 11)) - val records = Converter.toTsRecord(row, schema) + val records = WideConverter.toTsRecord(row, schema) Assert.assertEquals(2, records.size) Assert.assertEquals(1, records(0).time) @@ -256,7 +256,7 @@ class ConverterTest extends FunSuite with BeforeAndAfterAll { val ft4 = LessThan("time", 4L) val filters: Seq[Filter] = Seq(ft1, ft2_3, ft4) - val expression = Converter.toQueryExpression(schema, filters) + val expression = WideConverter.toQueryExpression(schema, filters) Assert.assertEquals(true, expression.hasQueryFilter) Assert.assertEquals(2, expression.getSelectedSeries.size()) -- GitLab