未验证 提交 581d34f5 编写于 作者: X Xiangdong Huang 提交者: GitHub

fix flink iotdb example for writing data with incorrect data types (#2181)

上级 20fed99e
......@@ -54,5 +54,30 @@
<artifactId>flink-tsfile-connector</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.library.version}</artifactId>
<version>${flink.version}</version>
<exclusions>
<!-- hive-serde 2.8.4 uses orc-core 1.3.2,
which is under incompatible license. So, exclude it.-->
<exclusion>
<groupId>org.apache.orc</groupId>
<artifactId>orc-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hive</groupId>
<artifactId>hive-storage-api</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</project>
......@@ -25,6 +25,9 @@ import java.security.SecureRandom;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
public class FlinkIoTDBSink {
public static void main(String[] args) throws Exception {
......@@ -37,19 +40,25 @@ public class FlinkIoTDBSink {
options.setUser("root");
options.setPassword("root");
options.setStorageGroup("root.sg");
options.setTimeseriesOptionList(Lists.newArrayList(new IoTDBOptions.TimeseriesOption("root.sg.d1.s1")));
//If the server enables auto_create_schema, then we do not need to register all timeseries here.
options.setTimeseriesOptionList(
Lists.newArrayList(new IoTDBOptions.TimeseriesOption(
"root.sg.d1.s1", TSDataType.DOUBLE, TSEncoding.GORILLA, CompressionType.SNAPPY
)));
IoTSerializationSchema serializationSchema = new DefaultIoTSerializationSchema();
IoTDBSink ioTDBSink = new IoTDBSink(options, serializationSchema)
// enable batching
.withBatchSize(10);
.withBatchSize(10)
// how many connectons to the server will be created for each parallelism
.withSessionPoolSize(3);
env.addSource(new SensorSource())
.name("sensor-source")
.setParallelism(1)
.addSink(ioTDBSink)
.name("iotdb-sink")
.setParallelism(1);
.name("iotdb-sink");
env.execute("iotdb-flink-example");
}
......
......@@ -104,6 +104,7 @@
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<maven.assembly.version>3.1.0</maven.assembly.version>
<scala.library.version>2.11</scala.library.version>
<scala.version>2.11.12</scala.version>
<hadoop2.version>2.7.3</hadoop2.version>
<hive2.version>2.3.6</hive2.version>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册