未验证 提交 45a66586 编写于 作者: X Xuan Ronaldo 提交者: GitHub

[IoTDB-1989] IoTDB support insert data from Spark (#4477)

1. fix the bug of count statement.
2. update the ITs of spark connector.
上级 31548f49
......@@ -18,7 +18,7 @@
under the License.
-->
## Spark-IoTDB
# Spark-IoTDB
### version
The versions required for Spark and Java are as follow:
......@@ -45,8 +45,9 @@ mvn clean scala:compile compile install
</dependency>
```
## Read Data from IoTDB
#### spark-shell user guide
### spark-shell user guide
```
spark-shell --jars spark-iotdb-connector-0.13.0-SNAPSHOT.jar,iotdb-jdbc-0.13.0-SNAPSHOT-jar-with-dependencies.jar
......@@ -76,7 +77,7 @@ df.printSchema()
df.show()
```
#### Schema Inference
### Schema Inference
Take the following TsFile structure as an example: There are three Measurements in the TsFile schema: status, temperature, and hardware. The basic information of these three measurements is as follows:
......@@ -114,7 +115,7 @@ You can also use narrow table form which as follows: (You can see part 4 about h
| 5 | root.ln.wf02.wt01 | false | null | null |
| 6 | root.ln.wf02.wt02 | null | ccc | null |
#### Get narrow form of data
### Get narrow form of data
```
spark-shell --jars spark-iotdb-connector-0.13.0-SNAPSHOT.jar,iotdb-jdbc-0.13.0-SNAPSHOT-jar-with-dependencies.jar
......@@ -127,7 +128,7 @@ df.printSchema()
df.show()
```
#### Java user guide
### Java user guide
```
import org.apache.spark.sql.Dataset;
......@@ -158,3 +159,45 @@ public class Example {
}
```
## Write Data to IoTDB
### User Guide
``` scala
// import narrow table
val df = spark.createDataFrame(List(
(1L, "root.test.d0",1, 1L, 1.0F, 1.0D, true, "hello"),
(2L, "root.test.d0", 2, 2L, 2.0F, 2.0D, false, "world")))
val dfWithColumn = df.withColumnRenamed("_1", "Time")
.withColumnRenamed("_2", "device_name")
.withColumnRenamed("_3", "s0")
.withColumnRenamed("_4", "s1")
.withColumnRenamed("_5", "s2")
.withColumnRenamed("_6", "s3")
.withColumnRenamed("_7", "s4")
.withColumnRenamed("_8", "s5")
dfWithColumn
.write
.format("org.apache.iotdb.spark.db")
.option("url", "jdbc:iotdb://127.0.0.1:6667/")
.save
// import wide table
val df = spark.createDataFrame(List(
(1L, 1, 1L, 1.0F, 1.0D, true, "hello"),
(2L, 2, 2L, 2.0F, 2.0D, false, "world")))
val dfWithColumn = df.withColumnRenamed("_1", "Time")
.withColumnRenamed("_2", "root.test.d0.s0")
.withColumnRenamed("_3", "root.test.d0.s1")
.withColumnRenamed("_4", "root.test.d0.s2")
.withColumnRenamed("_5", "root.test.d0.s3")
.withColumnRenamed("_6", "root.test.d0.s4")
.withColumnRenamed("_7", "root.test.d0.s5")
dfWithColumn.write.format("org.apache.iotdb.spark.db")
.option("url", "jdbc:iotdb://127.0.0.1:6667/")
.save
```
### Notes
1. You can directly write data to IoTDB whatever the dataframe contains a wide table or a narrow table.
\ No newline at end of file
......@@ -19,9 +19,9 @@
-->
## Spark-IoTDB
# Spark-IoTDB
### 版本
## 版本
Spark 和 Java 所需的版本如下:
......@@ -29,7 +29,7 @@ Spark 和 Java 所需的版本如下:
| ------------- | ------------- | ------------ | -------- |
| `2.4.3` | `2.11` | `1.8` | `0.13.0-SNAPSHOT` |
### 安装
## 安装
mvn clean scala:compile compile install
......@@ -43,7 +43,9 @@ mvn clean scala:compile compile install
</dependency>
```
#### Spark-shell 用户指南
## 从IoTDB读取数据
### Spark-shell 用户指南
```
spark-shell --jars spark-iotdb-connector-0.13.0-SNAPSHOT.jar,iotdb-jdbc-0.13.0-SNAPSHOT-jar-with-dependencies.jar
......@@ -73,7 +75,7 @@ df.printSchema()
df.show()
```
#### 模式推断
### 模式推断
以下 TsFile 结构为例:TsFile 模式中有三个度量:状态,温度和硬件。 这三种测量的基本信息如下:
......@@ -118,7 +120,7 @@ time|d1.status|time|d1.temperature |time | d2.hardware |time|d2.status
| 5 | root.ln.wf02.wt01 | false | null | null |
| 6 | root.ln.wf02.wt02 | null | ccc | null |
#### 获取窄表格式的数据
### 获取窄表格式的数据
```
spark-shell --jars spark-iotdb-connector-0.13.0-SNAPSHOT.jar,iotdb-jdbc-0.13.0-SNAPSHOT-jar-with-dependencies.jar
......@@ -131,7 +133,7 @@ df.printSchema()
df.show()
```
#### Java 用户指南
### Java 用户指南
```
import org.apache.spark.sql.Dataset;
......@@ -161,3 +163,45 @@ public class Example {
}
}
```
## 写数据到IoTDB
### 用户指南
``` scala
// import narrow table
val df = spark.createDataFrame(List(
(1L, "root.test.d0",1, 1L, 1.0F, 1.0D, true, "hello"),
(2L, "root.test.d0", 2, 2L, 2.0F, 2.0D, false, "world")))
val dfWithColumn = df.withColumnRenamed("_1", "Time")
.withColumnRenamed("_2", "device_name")
.withColumnRenamed("_3", "s0")
.withColumnRenamed("_4", "s1")
.withColumnRenamed("_5", "s2")
.withColumnRenamed("_6", "s3")
.withColumnRenamed("_7", "s4")
.withColumnRenamed("_8", "s5")
dfWithColumn
.write
.format("org.apache.iotdb.spark.db")
.option("url", "jdbc:iotdb://127.0.0.1:6667/")
.save
// import wide table
val df = spark.createDataFrame(List(
(1L, 1, 1L, 1.0F, 1.0D, true, "hello"),
(2L, 2, 2L, 2.0F, 2.0D, false, "world")))
val dfWithColumn = df.withColumnRenamed("_1", "Time")
.withColumnRenamed("_2", "root.test.d0.s0")
.withColumnRenamed("_3", "root.test.d0.s1")
.withColumnRenamed("_4", "root.test.d0.s2")
.withColumnRenamed("_5", "root.test.d0.s3")
.withColumnRenamed("_6", "root.test.d0.s4")
.withColumnRenamed("_7", "root.test.d0.s5")
dfWithColumn.write.format("org.apache.iotdb.spark.db")
.option("url", "jdbc:iotdb://127.0.0.1:6667/")
.save
```
### 注意
1. 无论dataframe中存放的是窄表还是宽表,都可以直接将数据写到IoTDB中。
\ No newline at end of file
......@@ -32,6 +32,8 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<compile.version>1.8</compile.version>
<scala.library.version>2.11</scala.library.version>
<scala.version>2.11.12</scala.version>
</properties>
<dependencies>
<dependency>
......@@ -52,35 +54,10 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<exclusions>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
<!--commons-configuration uses commons-lang:commons-lang:2.4
while others use commons-lang 2.6-->
<exclusion>
<groupId>commons-configuration</groupId>
<artifactId>commons-configuration</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- this is just used by hadoop-common-->
<dependency>
<groupId>commons-configuration</groupId>
<artifactId>commons-configuration</artifactId>
<version>1.6</version>
<exclusions>
<exclusion>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
</exclusion>
</exclusions>
<groupId>org.apache.iotdb</groupId>
<artifactId>iotdb-session</artifactId>
<version>${project.version}</version>
</dependency>
<!-- many of hadoop dependencies use guava11, but org.apache.curator from hadoop-common uses
guava16 -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
......@@ -97,16 +74,19 @@
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
......
......@@ -65,7 +65,11 @@ object Converter {
}
val colCount = resultSetMetaData.getColumnCount
for (i <- 2 to colCount) {
var startIndex = 2
if (!"Time".equals(resultSetMetaData.getColumnName(1))) {
startIndex = 1
}
for (i <- startIndex to colCount) {
fields += StructField(resultSetMetaData.getColumnLabel(i), resultSetMetaData.getColumnType(i) match {
case Types.BOOLEAN => BooleanType
case Types.INTEGER => IntegerType
......
......@@ -19,11 +19,14 @@
package org.apache.iotdb.spark.db
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, RelationProvider}
import org.apache.iotdb.spark.db.tools.DataFrameTools
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, RelationProvider}
import org.slf4j.LoggerFactory
private[iotdb] class DefaultSource extends RelationProvider with DataSourceRegister {
private[iotdb] class DefaultSource extends RelationProvider with DataSourceRegister with CreatableRelationProvider {
private final val logger = LoggerFactory.getLogger(classOf[DefaultSource])
override def shortName(): String = "tsfile"
......@@ -34,10 +37,27 @@ private[iotdb] class DefaultSource extends RelationProvider with DataSourceRegis
val iotdbOptions = new IoTDBOptions(parameters)
if (iotdbOptions.url == null || iotdbOptions.sql == null) {
sys.error("IoTDB url or sql not specified")
if ("".equals(iotdbOptions.sql)) {
sys.error("sql not specified")
}
new IoTDBRelation(iotdbOptions)(sqlContext.sparkSession)
}
override def createRelation(sqlContext: SQLContext, mode: SaveMode, parameters: Map[String, String], data: DataFrame): BaseRelation = {
if (!data.columns.contains("Time")) {
sys.error("No `Time` column")
}
val iotdbOptions = new IoTDBOptions(parameters)
if (!data.columns.contains("device_name")) {
data.columns.foreach(column => if (!column.startsWith("root.") && column != "Time") sys.error("Invalidate column: " + column))
val narrowDf = Transformer.toNarrowForm(sqlContext.sparkSession, data)
DataFrameTools.insertDataFrame(iotdbOptions, narrowDf)
} else {
DataFrameTools.insertDataFrame(iotdbOptions, data)
}
new IoTDBRelation(iotdbOptions)(sqlContext.sparkSession)
}
}
\ No newline at end of file
......@@ -29,7 +29,7 @@ class IoTDBOptions(
val password = parameters.getOrElse("password", "root")
val sql = parameters.getOrElse("sql", sys.error("Option 'sql' not specified"))
val sql = parameters.getOrElse("sql", "")
val numPartition = parameters.getOrElse("numPartition", "1")
......
......@@ -60,11 +60,10 @@ class IoTDBRDD private[iotdb](
val part = split.asInstanceOf[IoTDBPartition]
var taskInfo: String = _
Option(TaskContext.get()).foreach { taskContext => {
Option(TaskContext.get()).foreach { taskContext =>
taskContext.addTaskCompletionListener { _ => conn.close() }
taskInfo = "task Id: " + taskContext.taskAttemptId() + " partition Id: " + taskContext.partitionId()
}
}
Class.forName("org.apache.iotdb.jdbc.IoTDBDriver")
val conn: Connection = DriverManager.getConnection(options.url, options.user, options.password)
......@@ -112,8 +111,8 @@ class IoTDBRDD private[iotdb](
override def hasNext: Boolean = {
if (!finished && !gotNext) {
nextValue = getNext
gotNext = true
nextValue = getNext
gotNext = true
}
!finished
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.spark.db.tools;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.Session;
import org.apache.iotdb.spark.db.IoTDBOptions;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.thrift.annotation.Nullable;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import static org.apache.iotdb.tsfile.file.metadata.enums.TSDataType.DOUBLE;
import static org.apache.iotdb.tsfile.file.metadata.enums.TSDataType.BOOLEAN;
import static org.apache.iotdb.tsfile.file.metadata.enums.TSDataType.FLOAT;
import static org.apache.iotdb.tsfile.file.metadata.enums.TSDataType.INT32;
import static org.apache.iotdb.tsfile.file.metadata.enums.TSDataType.INT64;
import static org.apache.iotdb.tsfile.file.metadata.enums.TSDataType.TEXT;
public class DataFrameTools {
/**
* * insert a narrow dataframe into IoTDB
*
* @param options the options to create a IoTDB session
* @param dataFrame a dataframe of narrow table
* @return
*/
public static void insertDataFrame(IoTDBOptions options, Dataset<Row> dataFrame) {
List<Tuple2<String, String>> sensorTypes = new ArrayList<>(Arrays.asList(dataFrame.dtypes()));
sensorTypes.remove(0);
sensorTypes.remove(0);
List<Row> devices = dataFrame.select("device_name").distinct().collectAsList();
Dataset<Row> repartition = dataFrame.repartition(dataFrame.col("device_name"));
for (Row row : devices) {
String device = row.get(0).toString();
repartition
.where(String.format("device_name == '%s'", device))
.foreachPartition(
partition -> {
String[] hostPort = options.url().split("//")[1].replace("/", "").split(":");
Session session =
new Session.Builder()
.host(hostPort[0])
.port(Integer.valueOf(hostPort[1]))
.username(String.valueOf(options.user()))
.password(String.valueOf(options.password()))
.build();
session.open();
partition.forEachRemaining(
record -> {
ArrayList<String> measurements = new ArrayList<>();
ArrayList<TSDataType> types = new ArrayList<>();
ArrayList<Object> values = new ArrayList<>();
for (int i = 2; i < record.length(); i++) {
Object value = record.get(i);
if (value == null) {
continue;
}
value =
typeTrans(record.get(i).toString(), getType(sensorTypes.get(i - 2)._2));
values.add(value);
measurements.add(sensorTypes.get(i - 2)._1);
types.add(getType(sensorTypes.get(i - 2)._2));
}
try {
session.insertRecord(
record.get(1).toString(),
(Long) record.get(0),
measurements,
types,
values);
} catch (IoTDBConnectionException e) {
e.printStackTrace();
} catch (StatementExecutionException e) {
e.printStackTrace();
}
});
session.close();
});
}
}
/**
* @param value
* @param type
* @return
*/
private static Object typeTrans(String value, TSDataType type) {
try {
switch (type) {
case TEXT:
return value;
case BOOLEAN:
return Boolean.valueOf(value);
case INT32:
return Integer.valueOf(value);
case INT64:
return Long.valueOf(value);
case FLOAT:
return Float.valueOf(value);
case DOUBLE:
return Double.valueOf(value);
default:
return null;
}
} catch (NumberFormatException e) {
return null;
}
}
/**
* return the TSDataType
*
* @param typeStr
* @return
*/
private static TSDataType getType(String typeStr) {
switch (typeStr) {
case "StringType":
return TEXT;
case "BooleanType":
return BOOLEAN;
case "IntegerType":
return INT32;
case "LongType":
return INT64;
case "FloatType":
return FLOAT;
case "DoubleType":
return DOUBLE;
default:
return null;
}
}
}
......@@ -20,7 +20,6 @@
package org.apache.iotdb.spark.db
import java.io.ByteArrayOutputStream
import org.apache.iotdb.db.conf.IoTDBConstant
import org.apache.iotdb.db.service.IoTDB
import org.apache.iotdb.jdbc.Config
......@@ -70,32 +69,32 @@ class IoTDBTest extends FunSuite with BeforeAndAfterAll {
}
test("test show data") {
val df = spark.read.format("org.apache.iotdb.sparkdb")
.option("url", "jdbc:iotdb://127.0.0.1:6667/").option("sql", "select * from root.**").load
val df = spark.read.format("org.apache.iotdb.spark.db")
.option("url", "jdbc:iotdb://127.0.0.1:6667/").option("sql", "select ** from root").load
Assert.assertEquals(7505, df.count())
}
test("test show data with partition") {
val df = spark.read.format("org.apache.iotdb.sparkdb")
val df = spark.read.format("org.apache.iotdb.spark.db")
.option("url", "jdbc:iotdb://127.0.0.1:6667/")
.option("sql", "select * from root.**")
.option("sql", "select ** from root")
.option("lowerBound", 1).option("upperBound", System.nanoTime() / 1000 / 1000)
.option("numPartition", 10).load
Assert.assertEquals(7505, df.count())
}
test("test filter data") {
val df = spark.read.format("org.apache.iotdb.sparkdb")
val df = spark.read.format("org.apache.iotdb.spark.db")
.option("url", "jdbc:iotdb://127.0.0.1:6667/")
.option("sql", "select * from root.** where time < 2000 and time > 1000").load
.option("sql", "select ** from root where time < 2000 and time > 1000").load
Assert.assertEquals(499, df.count())
}
test("test filter data with partition") {
val df = spark.read.format("org.apache.iotdb.sparkdb")
val df = spark.read.format("org.apache.iotdb.spark.db")
.option("url", "jdbc:iotdb://127.0.0.1:6667/")
.option("sql", "select * from root.** where time < 2000 and time > 1000")
.option("sql", "select ** from root where time < 2000 and time > 1000")
.option("lowerBound", 1)
.option("upperBound", 10000).option("numPartition", 10).load
......@@ -103,17 +102,17 @@ class IoTDBTest extends FunSuite with BeforeAndAfterAll {
}
test("test transform to narrow") {
val df = spark.read.format("org.apache.iotdb.sparkdb")
val df = spark.read.format("org.apache.iotdb.spark.db")
.option("url", "jdbc:iotdb://127.0.0.1:6667/")
.option("sql", "select * from root.** where time < 1100 and time > 1000").load
.option("sql", "select ** from root where time < 1100 and time > 1000").load
val narrow_df = Transformer.toNarrowForm(spark, df)
Assert.assertEquals(198, narrow_df.count())
}
test("test transform to narrow with partition") {
val df = spark.read.format("org.apache.iotdb.sparkdb")
val df = spark.read.format("org.apache.iotdb.spark.db")
.option("url", "jdbc:iotdb://127.0.0.1:6667/")
.option("sql", "select * from root.** where time < 1100 and time > 1000")
.option("sql", "select ** from root where time < 1100 and time > 1000")
.option("lowerBound", 1).option("upperBound", 10000)
.option("numPartition", 10).load
val narrow_df = Transformer.toNarrowForm(spark, df)
......@@ -121,16 +120,16 @@ class IoTDBTest extends FunSuite with BeforeAndAfterAll {
}
test("test transform back to wide") {
val df = spark.read.format("org.apache.iotdb.sparkdb")
val df = spark.read.format("org.apache.iotdb.spark.db")
.option("url", "jdbc:iotdb://127.0.0.1:6667/")
.option("sql", "select * from root.** where time < 1100 and time > 1000").load
.option("sql", "select ** from root where time < 1100 and time > 1000").load
val narrow_df = Transformer.toNarrowForm(spark, df)
val wide_df = Transformer.toWideForm(spark, narrow_df)
Assert.assertEquals(99, wide_df.count())
}
test("test aggregate sql") {
val df = spark.read.format("org.apache.iotdb.sparkdb")
val df = spark.read.format("org.apache.iotdb.spark.db")
.option("url", "jdbc:iotdb://127.0.0.1:6667/")
.option("sql", "select count(d0.s0),count(d0.s1) from root.vehicle").load
......@@ -139,7 +138,6 @@ class IoTDBTest extends FunSuite with BeforeAndAfterAll {
df.show(df.count.toInt, false)
}
val actual = outCapture.toByteArray.map(_.toChar)
val expect =
"+-------------------------+-------------------------+\n" +
"|count(root.vehicle.d0.s0)|count(root.vehicle.d0.s1)|\n" +
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.spark.db
import org.apache.iotdb.db.conf.IoTDBConstant
import org.apache.iotdb.db.service.IoTDB
import org.apache.iotdb.jdbc.Config
import org.apache.iotdb.session.Session
import org.apache.spark.sql.SparkSession
import org.junit.{AfterClass, Before}
import org.scalatest.{BeforeAndAfterAll, FunSuite}
class IoTDBWriteTest extends FunSuite with BeforeAndAfterAll {
private var daemon: IoTDB = _
private var spark: SparkSession = _
private var session: Session = _
@Before
override protected def beforeAll(): Unit = {
System.setProperty(IoTDBConstant.IOTDB_CONF, "src/test/resources/")
super.beforeAll()
EnvironmentUtils.closeStatMonitor()
daemon = IoTDB.getInstance
daemon.active()
EnvironmentUtils.envSetUp()
Class.forName(Config.JDBC_DRIVER_NAME)
spark = SparkSession
.builder()
.config("spark.master", "local")
.appName("TSFile test")
.getOrCreate()
session = new Session("127.0.0.1", 6667, "root", "root")
session.open()
}
@AfterClass
override protected def afterAll(): Unit = {
if (spark != null) {
spark.sparkContext.stop()
}
daemon.stop()
EnvironmentUtils.cleanEnv()
session.close()
super.afterAll()
}
test("test insert wide data") {
val df = spark.createDataFrame(List(
(1L, 1, 1L, 1.0F, 1.0D, true, "hello"),
(2L, 2, 2L, 2.0F, 2.0D, false, "world")))
val dfWithColumn = df.withColumnRenamed("_1", "Time")
.withColumnRenamed("_2", "root.test.d0.int")
.withColumnRenamed("_3", "root.test.d0.long")
.withColumnRenamed("_4", "root.test.d0.float")
.withColumnRenamed("_5", "root.test.d0.double")
.withColumnRenamed("_6", "root.test.d0.boolean")
.withColumnRenamed("_7", "root.test.d0.text")
dfWithColumn.write.format("org.apache.iotdb.spark.db")
.option("url", "jdbc:iotdb://127.0.0.1:6667/")
.save
val result = session.executeQueryStatement("select ** from root")
var size = 0
while (result.hasNext) {
size += 1
}
assertResult(2)(size)
}
test("test insert narrow data") {
val df = spark.createDataFrame(List(
(1L, "root.test.d0",1, 1L, 1.0F, 1.0D, true, "hello"),
(2L, "root.test.d0", 2, 2L, 2.0F, 2.0D, false, "world")))
val dfWithColumn = df.withColumnRenamed("_1", "Time")
.withColumnRenamed("_2", "device_name")
.withColumnRenamed("_3", "int")
.withColumnRenamed("_4", "long")
.withColumnRenamed("_5", "float")
.withColumnRenamed("_6", "double")
.withColumnRenamed("_7", "boolean")
.withColumnRenamed("_8", "text")
dfWithColumn.write.format("org.apache.iotdb.spark.db")
.option("url", "jdbc:iotdb://127.0.0.1:6667/")
.save
val result = session.executeQueryStatement("select ** from root")
var size = 0
while (result.hasNext) {
size += 1
}
assertResult(2)(size)
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册