From 45a665866ed830456d0d44ece0c02c37166e22fe Mon Sep 17 00:00:00 2001 From: Xuan Ronaldo Date: Mon, 6 Dec 2021 11:30:43 +0800 Subject: [PATCH] [IoTDB-1989] IoTDB support insert data from Spark (#4477) 1. fix the bug of count statement. 2. update the ITs of spark connector. --- .../Ecosystem Integration/Spark IoTDB.md | 53 +++++- .../Ecosystem Integration/Spark IoTDB.md | 58 ++++++- spark-iotdb-connector/pom.xml | 36 +--- .../org/apache/iotdb/spark/db/Converter.scala | 6 +- .../apache/iotdb/spark/db/DefaultSource.scala | 30 +++- .../apache/iotdb/spark/db/IoTDBOptions.scala | 2 +- .../org/apache/iotdb/spark/db/IoTDBRDD.scala | 7 +- .../iotdb/spark/db/tools/DataFrameTools.java | 162 ++++++++++++++++++ .../org/apache/iotdb/spark/db/IoTDBTest.scala | 32 ++-- .../iotdb/spark/db/IoTDBWriteTest.scala | 117 +++++++++++++ 10 files changed, 435 insertions(+), 68 deletions(-) create mode 100644 spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/tools/DataFrameTools.java create mode 100644 spark-iotdb-connector/src/test/scala/org/apache/iotdb/spark/db/IoTDBWriteTest.scala diff --git a/docs/UserGuide/Ecosystem Integration/Spark IoTDB.md b/docs/UserGuide/Ecosystem Integration/Spark IoTDB.md index a91180199d..36388b8efc 100644 --- a/docs/UserGuide/Ecosystem Integration/Spark IoTDB.md +++ b/docs/UserGuide/Ecosystem Integration/Spark IoTDB.md @@ -18,7 +18,7 @@ under the License. --> -## Spark-IoTDB +# Spark-IoTDB ### version The versions required for Spark and Java are as follow: @@ -45,8 +45,9 @@ mvn clean scala:compile compile install ``` +## Read Data from IoTDB -#### spark-shell user guide +### spark-shell user guide ``` spark-shell --jars spark-iotdb-connector-0.13.0-SNAPSHOT.jar,iotdb-jdbc-0.13.0-SNAPSHOT-jar-with-dependencies.jar @@ -76,7 +77,7 @@ df.printSchema() df.show() ``` -#### Schema Inference +### Schema Inference Take the following TsFile structure as an example: There are three Measurements in the TsFile schema: status, temperature, and hardware. The basic information of these three measurements is as follows: @@ -114,7 +115,7 @@ You can also use narrow table form which as follows: (You can see part 4 about h | 5 | root.ln.wf02.wt01 | false | null | null | | 6 | root.ln.wf02.wt02 | null | ccc | null | -#### Get narrow form of data +### Get narrow form of data ``` spark-shell --jars spark-iotdb-connector-0.13.0-SNAPSHOT.jar,iotdb-jdbc-0.13.0-SNAPSHOT-jar-with-dependencies.jar @@ -127,7 +128,7 @@ df.printSchema() df.show() ``` -#### Java user guide +### Java user guide ``` import org.apache.spark.sql.Dataset; @@ -158,3 +159,45 @@ public class Example { } ``` +## Write Data to IoTDB + +### User Guide +``` scala +// import narrow table +val df = spark.createDataFrame(List( + (1L, "root.test.d0",1, 1L, 1.0F, 1.0D, true, "hello"), + (2L, "root.test.d0", 2, 2L, 2.0F, 2.0D, false, "world"))) + +val dfWithColumn = df.withColumnRenamed("_1", "Time") + .withColumnRenamed("_2", "device_name") + .withColumnRenamed("_3", "s0") + .withColumnRenamed("_4", "s1") + .withColumnRenamed("_5", "s2") + .withColumnRenamed("_6", "s3") + .withColumnRenamed("_7", "s4") + .withColumnRenamed("_8", "s5") +dfWithColumn + .write + .format("org.apache.iotdb.spark.db") + .option("url", "jdbc:iotdb://127.0.0.1:6667/") + .save + +// import wide table +val df = spark.createDataFrame(List( + (1L, 1, 1L, 1.0F, 1.0D, true, "hello"), + (2L, 2, 2L, 2.0F, 2.0D, false, "world"))) + +val dfWithColumn = df.withColumnRenamed("_1", "Time") + .withColumnRenamed("_2", "root.test.d0.s0") + .withColumnRenamed("_3", "root.test.d0.s1") + .withColumnRenamed("_4", "root.test.d0.s2") + .withColumnRenamed("_5", "root.test.d0.s3") + .withColumnRenamed("_6", "root.test.d0.s4") + .withColumnRenamed("_7", "root.test.d0.s5") +dfWithColumn.write.format("org.apache.iotdb.spark.db") + .option("url", "jdbc:iotdb://127.0.0.1:6667/") + .save +``` + +### Notes +1. You can directly write data to IoTDB whatever the dataframe contains a wide table or a narrow table. \ No newline at end of file diff --git a/docs/zh/UserGuide/Ecosystem Integration/Spark IoTDB.md b/docs/zh/UserGuide/Ecosystem Integration/Spark IoTDB.md index f28d82a247..21b0cda8b2 100644 --- a/docs/zh/UserGuide/Ecosystem Integration/Spark IoTDB.md +++ b/docs/zh/UserGuide/Ecosystem Integration/Spark IoTDB.md @@ -19,9 +19,9 @@ --> -## Spark-IoTDB +# Spark-IoTDB -### 版本 +## 版本 Spark 和 Java 所需的版本如下: @@ -29,7 +29,7 @@ Spark 和 Java 所需的版本如下: | ------------- | ------------- | ------------ | -------- | | `2.4.3` | `2.11` | `1.8` | `0.13.0-SNAPSHOT` | -### 安装 +## 安装 mvn clean scala:compile compile install @@ -43,7 +43,9 @@ mvn clean scala:compile compile install ``` -#### Spark-shell 用户指南 +## 从IoTDB读取数据 + +### Spark-shell 用户指南 ``` spark-shell --jars spark-iotdb-connector-0.13.0-SNAPSHOT.jar,iotdb-jdbc-0.13.0-SNAPSHOT-jar-with-dependencies.jar @@ -73,7 +75,7 @@ df.printSchema() df.show() ``` -#### 模式推断 +### 模式推断 以下 TsFile 结构为例:TsFile 模式中有三个度量:状态,温度和硬件。 这三种测量的基本信息如下: @@ -118,7 +120,7 @@ time|d1.status|time|d1.temperature |time | d2.hardware |time|d2.status | 5 | root.ln.wf02.wt01 | false | null | null | | 6 | root.ln.wf02.wt02 | null | ccc | null | -#### 获取窄表格式的数据 +### 获取窄表格式的数据 ``` spark-shell --jars spark-iotdb-connector-0.13.0-SNAPSHOT.jar,iotdb-jdbc-0.13.0-SNAPSHOT-jar-with-dependencies.jar @@ -131,7 +133,7 @@ df.printSchema() df.show() ``` -#### Java 用户指南 +### Java 用户指南 ``` import org.apache.spark.sql.Dataset; @@ -161,3 +163,45 @@ public class Example { } } ``` + +## 写数据到IoTDB +### 用户指南 +``` scala +// import narrow table +val df = spark.createDataFrame(List( + (1L, "root.test.d0",1, 1L, 1.0F, 1.0D, true, "hello"), + (2L, "root.test.d0", 2, 2L, 2.0F, 2.0D, false, "world"))) + +val dfWithColumn = df.withColumnRenamed("_1", "Time") + .withColumnRenamed("_2", "device_name") + .withColumnRenamed("_3", "s0") + .withColumnRenamed("_4", "s1") + .withColumnRenamed("_5", "s2") + .withColumnRenamed("_6", "s3") + .withColumnRenamed("_7", "s4") + .withColumnRenamed("_8", "s5") +dfWithColumn + .write + .format("org.apache.iotdb.spark.db") + .option("url", "jdbc:iotdb://127.0.0.1:6667/") + .save + +// import wide table +val df = spark.createDataFrame(List( + (1L, 1, 1L, 1.0F, 1.0D, true, "hello"), + (2L, 2, 2L, 2.0F, 2.0D, false, "world"))) + +val dfWithColumn = df.withColumnRenamed("_1", "Time") + .withColumnRenamed("_2", "root.test.d0.s0") + .withColumnRenamed("_3", "root.test.d0.s1") + .withColumnRenamed("_4", "root.test.d0.s2") + .withColumnRenamed("_5", "root.test.d0.s3") + .withColumnRenamed("_6", "root.test.d0.s4") + .withColumnRenamed("_7", "root.test.d0.s5") +dfWithColumn.write.format("org.apache.iotdb.spark.db") + .option("url", "jdbc:iotdb://127.0.0.1:6667/") + .save +``` + +### 注意 +1. 无论dataframe中存放的是窄表还是宽表,都可以直接将数据写到IoTDB中。 \ No newline at end of file diff --git a/spark-iotdb-connector/pom.xml b/spark-iotdb-connector/pom.xml index f10ec0fe37..c37cb7b44e 100644 --- a/spark-iotdb-connector/pom.xml +++ b/spark-iotdb-connector/pom.xml @@ -32,6 +32,8 @@ UTF-8 1.8 + 2.11 + 2.11.12 @@ -52,35 +54,10 @@ test - org.apache.hadoop - hadoop-client - - - com.google.guava - guava - - - - commons-configuration - commons-configuration - - - - - - commons-configuration - commons-configuration - 1.6 - - - commons-lang - commons-lang - - + org.apache.iotdb + iotdb-session + ${project.version} - com.google.guava guava @@ -97,16 +74,19 @@ org.apache.spark spark-core_2.11 + ${spark.version} provided org.apache.spark spark-sql_2.11 + ${spark.version} provided org.scala-lang scala-library + ${scala.version} org.scalatest diff --git a/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/Converter.scala b/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/Converter.scala index da3435dadd..023e4f8ac7 100644 --- a/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/Converter.scala +++ b/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/Converter.scala @@ -65,7 +65,11 @@ object Converter { } val colCount = resultSetMetaData.getColumnCount - for (i <- 2 to colCount) { + var startIndex = 2 + if (!"Time".equals(resultSetMetaData.getColumnName(1))) { + startIndex = 1 + } + for (i <- startIndex to colCount) { fields += StructField(resultSetMetaData.getColumnLabel(i), resultSetMetaData.getColumnType(i) match { case Types.BOOLEAN => BooleanType case Types.INTEGER => IntegerType diff --git a/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/DefaultSource.scala b/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/DefaultSource.scala index 60ec74acdc..9812b14e5a 100644 --- a/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/DefaultSource.scala +++ b/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/DefaultSource.scala @@ -19,11 +19,14 @@ package org.apache.iotdb.spark.db -import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, RelationProvider} + +import org.apache.iotdb.spark.db.tools.DataFrameTools +import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode} +import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, RelationProvider} import org.slf4j.LoggerFactory -private[iotdb] class DefaultSource extends RelationProvider with DataSourceRegister { + +private[iotdb] class DefaultSource extends RelationProvider with DataSourceRegister with CreatableRelationProvider { private final val logger = LoggerFactory.getLogger(classOf[DefaultSource]) override def shortName(): String = "tsfile" @@ -34,10 +37,27 @@ private[iotdb] class DefaultSource extends RelationProvider with DataSourceRegis val iotdbOptions = new IoTDBOptions(parameters) - if (iotdbOptions.url == null || iotdbOptions.sql == null) { - sys.error("IoTDB url or sql not specified") + if ("".equals(iotdbOptions.sql)) { + sys.error("sql not specified") } + new IoTDBRelation(iotdbOptions)(sqlContext.sparkSession) + } + + override def createRelation(sqlContext: SQLContext, mode: SaveMode, parameters: Map[String, String], data: DataFrame): BaseRelation = { + if (!data.columns.contains("Time")) { + sys.error("No `Time` column") + } + val iotdbOptions = new IoTDBOptions(parameters) + if (!data.columns.contains("device_name")) { + data.columns.foreach(column => if (!column.startsWith("root.") && column != "Time") sys.error("Invalidate column: " + column)) + val narrowDf = Transformer.toNarrowForm(sqlContext.sparkSession, data) + DataFrameTools.insertDataFrame(iotdbOptions, narrowDf) + } else { + DataFrameTools.insertDataFrame(iotdbOptions, data) + } + + new IoTDBRelation(iotdbOptions)(sqlContext.sparkSession) } } \ No newline at end of file diff --git a/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/IoTDBOptions.scala b/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/IoTDBOptions.scala index c4aaa3da9d..62bb59ed50 100644 --- a/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/IoTDBOptions.scala +++ b/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/IoTDBOptions.scala @@ -29,7 +29,7 @@ class IoTDBOptions( val password = parameters.getOrElse("password", "root") - val sql = parameters.getOrElse("sql", sys.error("Option 'sql' not specified")) + val sql = parameters.getOrElse("sql", "") val numPartition = parameters.getOrElse("numPartition", "1") diff --git a/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/IoTDBRDD.scala b/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/IoTDBRDD.scala index c5d2cb3733..7c811e4fa7 100644 --- a/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/IoTDBRDD.scala +++ b/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/IoTDBRDD.scala @@ -60,11 +60,10 @@ class IoTDBRDD private[iotdb]( val part = split.asInstanceOf[IoTDBPartition] var taskInfo: String = _ - Option(TaskContext.get()).foreach { taskContext => { + Option(TaskContext.get()).foreach { taskContext => taskContext.addTaskCompletionListener { _ => conn.close() } taskInfo = "task Id: " + taskContext.taskAttemptId() + " partition Id: " + taskContext.partitionId() } - } Class.forName("org.apache.iotdb.jdbc.IoTDBDriver") val conn: Connection = DriverManager.getConnection(options.url, options.user, options.password) @@ -112,8 +111,8 @@ class IoTDBRDD private[iotdb]( override def hasNext: Boolean = { if (!finished && !gotNext) { - nextValue = getNext - gotNext = true + nextValue = getNext + gotNext = true } !finished } diff --git a/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/tools/DataFrameTools.java b/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/tools/DataFrameTools.java new file mode 100644 index 0000000000..aee647d7a8 --- /dev/null +++ b/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/tools/DataFrameTools.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.spark.db.tools; + +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.session.Session; +import org.apache.iotdb.spark.db.IoTDBOptions; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; + +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.thrift.annotation.Nullable; +import scala.Tuple2; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.apache.iotdb.tsfile.file.metadata.enums.TSDataType.DOUBLE; +import static org.apache.iotdb.tsfile.file.metadata.enums.TSDataType.BOOLEAN; +import static org.apache.iotdb.tsfile.file.metadata.enums.TSDataType.FLOAT; +import static org.apache.iotdb.tsfile.file.metadata.enums.TSDataType.INT32; +import static org.apache.iotdb.tsfile.file.metadata.enums.TSDataType.INT64; +import static org.apache.iotdb.tsfile.file.metadata.enums.TSDataType.TEXT; + +public class DataFrameTools { + /** + * * insert a narrow dataframe into IoTDB + * + * @param options the options to create a IoTDB session + * @param dataFrame a dataframe of narrow table + * @return + */ + public static void insertDataFrame(IoTDBOptions options, Dataset dataFrame) { + List> sensorTypes = new ArrayList<>(Arrays.asList(dataFrame.dtypes())); + sensorTypes.remove(0); + sensorTypes.remove(0); + + List devices = dataFrame.select("device_name").distinct().collectAsList(); + + Dataset repartition = dataFrame.repartition(dataFrame.col("device_name")); + + for (Row row : devices) { + String device = row.get(0).toString(); + repartition + .where(String.format("device_name == '%s'", device)) + .foreachPartition( + partition -> { + String[] hostPort = options.url().split("//")[1].replace("/", "").split(":"); + Session session = + new Session.Builder() + .host(hostPort[0]) + .port(Integer.valueOf(hostPort[1])) + .username(String.valueOf(options.user())) + .password(String.valueOf(options.password())) + .build(); + session.open(); + partition.forEachRemaining( + record -> { + ArrayList measurements = new ArrayList<>(); + ArrayList types = new ArrayList<>(); + ArrayList values = new ArrayList<>(); + for (int i = 2; i < record.length(); i++) { + Object value = record.get(i); + if (value == null) { + continue; + } + value = + typeTrans(record.get(i).toString(), getType(sensorTypes.get(i - 2)._2)); + + values.add(value); + measurements.add(sensorTypes.get(i - 2)._1); + types.add(getType(sensorTypes.get(i - 2)._2)); + } + try { + session.insertRecord( + record.get(1).toString(), + (Long) record.get(0), + measurements, + types, + values); + } catch (IoTDBConnectionException e) { + e.printStackTrace(); + } catch (StatementExecutionException e) { + e.printStackTrace(); + } + }); + session.close(); + }); + } + } + /** + * @param value + * @param type + * @return + */ + private static Object typeTrans(String value, TSDataType type) { + try { + switch (type) { + case TEXT: + return value; + case BOOLEAN: + return Boolean.valueOf(value); + case INT32: + return Integer.valueOf(value); + case INT64: + return Long.valueOf(value); + case FLOAT: + return Float.valueOf(value); + case DOUBLE: + return Double.valueOf(value); + default: + return null; + } + } catch (NumberFormatException e) { + return null; + } + } + + /** + * return the TSDataType + * + * @param typeStr + * @return + */ + private static TSDataType getType(String typeStr) { + switch (typeStr) { + case "StringType": + return TEXT; + case "BooleanType": + return BOOLEAN; + case "IntegerType": + return INT32; + case "LongType": + return INT64; + case "FloatType": + return FLOAT; + case "DoubleType": + return DOUBLE; + default: + return null; + } + } +} diff --git a/spark-iotdb-connector/src/test/scala/org/apache/iotdb/spark/db/IoTDBTest.scala b/spark-iotdb-connector/src/test/scala/org/apache/iotdb/spark/db/IoTDBTest.scala index 2e84ac9ec0..26ba987afd 100644 --- a/spark-iotdb-connector/src/test/scala/org/apache/iotdb/spark/db/IoTDBTest.scala +++ b/spark-iotdb-connector/src/test/scala/org/apache/iotdb/spark/db/IoTDBTest.scala @@ -20,7 +20,6 @@ package org.apache.iotdb.spark.db import java.io.ByteArrayOutputStream - import org.apache.iotdb.db.conf.IoTDBConstant import org.apache.iotdb.db.service.IoTDB import org.apache.iotdb.jdbc.Config @@ -70,32 +69,32 @@ class IoTDBTest extends FunSuite with BeforeAndAfterAll { } test("test show data") { - val df = spark.read.format("org.apache.iotdb.sparkdb") - .option("url", "jdbc:iotdb://127.0.0.1:6667/").option("sql", "select * from root.**").load + val df = spark.read.format("org.apache.iotdb.spark.db") + .option("url", "jdbc:iotdb://127.0.0.1:6667/").option("sql", "select ** from root").load Assert.assertEquals(7505, df.count()) } test("test show data with partition") { - val df = spark.read.format("org.apache.iotdb.sparkdb") + val df = spark.read.format("org.apache.iotdb.spark.db") .option("url", "jdbc:iotdb://127.0.0.1:6667/") - .option("sql", "select * from root.**") + .option("sql", "select ** from root") .option("lowerBound", 1).option("upperBound", System.nanoTime() / 1000 / 1000) .option("numPartition", 10).load Assert.assertEquals(7505, df.count()) } test("test filter data") { - val df = spark.read.format("org.apache.iotdb.sparkdb") + val df = spark.read.format("org.apache.iotdb.spark.db") .option("url", "jdbc:iotdb://127.0.0.1:6667/") - .option("sql", "select * from root.** where time < 2000 and time > 1000").load + .option("sql", "select ** from root where time < 2000 and time > 1000").load Assert.assertEquals(499, df.count()) } test("test filter data with partition") { - val df = spark.read.format("org.apache.iotdb.sparkdb") + val df = spark.read.format("org.apache.iotdb.spark.db") .option("url", "jdbc:iotdb://127.0.0.1:6667/") - .option("sql", "select * from root.** where time < 2000 and time > 1000") + .option("sql", "select ** from root where time < 2000 and time > 1000") .option("lowerBound", 1) .option("upperBound", 10000).option("numPartition", 10).load @@ -103,17 +102,17 @@ class IoTDBTest extends FunSuite with BeforeAndAfterAll { } test("test transform to narrow") { - val df = spark.read.format("org.apache.iotdb.sparkdb") + val df = spark.read.format("org.apache.iotdb.spark.db") .option("url", "jdbc:iotdb://127.0.0.1:6667/") - .option("sql", "select * from root.** where time < 1100 and time > 1000").load + .option("sql", "select ** from root where time < 1100 and time > 1000").load val narrow_df = Transformer.toNarrowForm(spark, df) Assert.assertEquals(198, narrow_df.count()) } test("test transform to narrow with partition") { - val df = spark.read.format("org.apache.iotdb.sparkdb") + val df = spark.read.format("org.apache.iotdb.spark.db") .option("url", "jdbc:iotdb://127.0.0.1:6667/") - .option("sql", "select * from root.** where time < 1100 and time > 1000") + .option("sql", "select ** from root where time < 1100 and time > 1000") .option("lowerBound", 1).option("upperBound", 10000) .option("numPartition", 10).load val narrow_df = Transformer.toNarrowForm(spark, df) @@ -121,16 +120,16 @@ class IoTDBTest extends FunSuite with BeforeAndAfterAll { } test("test transform back to wide") { - val df = spark.read.format("org.apache.iotdb.sparkdb") + val df = spark.read.format("org.apache.iotdb.spark.db") .option("url", "jdbc:iotdb://127.0.0.1:6667/") - .option("sql", "select * from root.** where time < 1100 and time > 1000").load + .option("sql", "select ** from root where time < 1100 and time > 1000").load val narrow_df = Transformer.toNarrowForm(spark, df) val wide_df = Transformer.toWideForm(spark, narrow_df) Assert.assertEquals(99, wide_df.count()) } test("test aggregate sql") { - val df = spark.read.format("org.apache.iotdb.sparkdb") + val df = spark.read.format("org.apache.iotdb.spark.db") .option("url", "jdbc:iotdb://127.0.0.1:6667/") .option("sql", "select count(d0.s0),count(d0.s1) from root.vehicle").load @@ -139,7 +138,6 @@ class IoTDBTest extends FunSuite with BeforeAndAfterAll { df.show(df.count.toInt, false) } val actual = outCapture.toByteArray.map(_.toChar) - val expect = "+-------------------------+-------------------------+\n" + "|count(root.vehicle.d0.s0)|count(root.vehicle.d0.s1)|\n" + diff --git a/spark-iotdb-connector/src/test/scala/org/apache/iotdb/spark/db/IoTDBWriteTest.scala b/spark-iotdb-connector/src/test/scala/org/apache/iotdb/spark/db/IoTDBWriteTest.scala new file mode 100644 index 0000000000..0c5928daf4 --- /dev/null +++ b/spark-iotdb-connector/src/test/scala/org/apache/iotdb/spark/db/IoTDBWriteTest.scala @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.spark.db + +import org.apache.iotdb.db.conf.IoTDBConstant +import org.apache.iotdb.db.service.IoTDB +import org.apache.iotdb.jdbc.Config +import org.apache.iotdb.session.Session +import org.apache.spark.sql.SparkSession +import org.junit.{AfterClass, Before} +import org.scalatest.{BeforeAndAfterAll, FunSuite} + +class IoTDBWriteTest extends FunSuite with BeforeAndAfterAll { + private var daemon: IoTDB = _ + private var spark: SparkSession = _ + private var session: Session = _ + + @Before + override protected def beforeAll(): Unit = { + System.setProperty(IoTDBConstant.IOTDB_CONF, "src/test/resources/") + super.beforeAll() + + EnvironmentUtils.closeStatMonitor() + daemon = IoTDB.getInstance + daemon.active() + EnvironmentUtils.envSetUp() + Class.forName(Config.JDBC_DRIVER_NAME) + + spark = SparkSession + .builder() + .config("spark.master", "local") + .appName("TSFile test") + .getOrCreate() + + session = new Session("127.0.0.1", 6667, "root", "root") + session.open() + } + + @AfterClass + override protected def afterAll(): Unit = { + if (spark != null) { + spark.sparkContext.stop() + } + + daemon.stop() + EnvironmentUtils.cleanEnv() + + session.close() + super.afterAll() + } + + test("test insert wide data") { + val df = spark.createDataFrame(List( + (1L, 1, 1L, 1.0F, 1.0D, true, "hello"), + (2L, 2, 2L, 2.0F, 2.0D, false, "world"))) + + val dfWithColumn = df.withColumnRenamed("_1", "Time") + .withColumnRenamed("_2", "root.test.d0.int") + .withColumnRenamed("_3", "root.test.d0.long") + .withColumnRenamed("_4", "root.test.d0.float") + .withColumnRenamed("_5", "root.test.d0.double") + .withColumnRenamed("_6", "root.test.d0.boolean") + .withColumnRenamed("_7", "root.test.d0.text") + dfWithColumn.write.format("org.apache.iotdb.spark.db") + .option("url", "jdbc:iotdb://127.0.0.1:6667/") + .save + + val result = session.executeQueryStatement("select ** from root") + var size = 0 + while (result.hasNext) { + size += 1 + } + assertResult(2)(size) + } + + test("test insert narrow data") { + val df = spark.createDataFrame(List( + (1L, "root.test.d0",1, 1L, 1.0F, 1.0D, true, "hello"), + (2L, "root.test.d0", 2, 2L, 2.0F, 2.0D, false, "world"))) + + val dfWithColumn = df.withColumnRenamed("_1", "Time") + .withColumnRenamed("_2", "device_name") + .withColumnRenamed("_3", "int") + .withColumnRenamed("_4", "long") + .withColumnRenamed("_5", "float") + .withColumnRenamed("_6", "double") + .withColumnRenamed("_7", "boolean") + .withColumnRenamed("_8", "text") + dfWithColumn.write.format("org.apache.iotdb.spark.db") + .option("url", "jdbc:iotdb://127.0.0.1:6667/") + .save + + val result = session.executeQueryStatement("select ** from root") + var size = 0 + while (result.hasNext) { + size += 1 + } + assertResult(2)(size) + } +} -- GitLab