diff --git a/example/flink/pom.xml b/example/flink/pom.xml index 398c789b5b79f52e792c2985e0a7bced83837f41..31220fd8114f0c31a0ea8033d4a369639e8e0af2 100644 --- a/example/flink/pom.xml +++ b/example/flink/pom.xml @@ -54,5 +54,30 @@ flink-tsfile-connector ${project.version} + + org.apache.flink + flink-clients_${scala.library.version} + ${flink.version} + + + + org.apache.orc + orc-core + + + org.apache.hive + hive-storage-api + + + com.google.guava + guava + + + org.apache.commons + commons-compress + + + diff --git a/example/flink/src/main/java/org/apache/iotdb/flink/FlinkIoTDBSink.java b/example/flink/src/main/java/org/apache/iotdb/flink/FlinkIoTDBSink.java index 35e2143e89268a2053cf9955a1c7511bbbab4964..87b06a632b4746a2b2ab4a23f7d5c4ca7b10b626 100644 --- a/example/flink/src/main/java/org/apache/iotdb/flink/FlinkIoTDBSink.java +++ b/example/flink/src/main/java/org/apache/iotdb/flink/FlinkIoTDBSink.java @@ -25,6 +25,9 @@ import java.security.SecureRandom; import java.util.HashMap; import java.util.Map; import java.util.Random; +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; public class FlinkIoTDBSink { public static void main(String[] args) throws Exception { @@ -37,19 +40,25 @@ public class FlinkIoTDBSink { options.setUser("root"); options.setPassword("root"); options.setStorageGroup("root.sg"); - options.setTimeseriesOptionList(Lists.newArrayList(new IoTDBOptions.TimeseriesOption("root.sg.d1.s1"))); + + //If the server enables auto_create_schema, then we do not need to register all timeseries here. + options.setTimeseriesOptionList( + Lists.newArrayList(new IoTDBOptions.TimeseriesOption( + "root.sg.d1.s1", TSDataType.DOUBLE, TSEncoding.GORILLA, CompressionType.SNAPPY + ))); IoTSerializationSchema serializationSchema = new DefaultIoTSerializationSchema(); IoTDBSink ioTDBSink = new IoTDBSink(options, serializationSchema) // enable batching - .withBatchSize(10); + .withBatchSize(10) + // how many connectons to the server will be created for each parallelism + .withSessionPoolSize(3); env.addSource(new SensorSource()) .name("sensor-source") .setParallelism(1) .addSink(ioTDBSink) - .name("iotdb-sink") - .setParallelism(1); + .name("iotdb-sink"); env.execute("iotdb-flink-example"); } diff --git a/pom.xml b/pom.xml index 1cf788da80f9ce967d3ef23606ef125829107131..11e035ed3467e6aed04e0325f9fda45bc1d1c1cc 100644 --- a/pom.xml +++ b/pom.xml @@ -104,6 +104,7 @@ 1.8 1.8 3.1.0 + 2.11 2.11.12 2.7.3 2.3.6