diff --git a/docs/dev/batch/index.md b/docs/dev/batch/index.md index 7fb84e8d2728d3c0217a4f8639e56e0e00daf11b..cb3b42c6bbad5265b1848a88464f61edff0f6d62 100644 --- a/docs/dev/batch/index.md +++ b/docs/dev/batch/index.md @@ -571,7 +571,7 @@ data.reduceGroup { elements => elements.sum } data set.

{% highlight scala %} val input: DataSet[(Int, String, Double)] = // [...] -val output: DataSet[(Int, String, Doublr)] = input.aggregate(SUM, 0).aggregate(MIN, 2); +val output: DataSet[(Int, String, Double)] = input.aggregate(SUM, 0).aggregate(MIN, 2) {% endhighlight %}

You can also use short-hand syntax for minimum, maximum, and sum aggregations.

{% highlight scala %} @@ -1037,7 +1037,7 @@ val csvInput = env.readCsvFile[Person]( val values = env.fromElements("Foo", "bar", "foobar", "fubar") // generate a number sequence -val numbers = env.generateSequence(1, 10000000); +val numbers = env.generateSequence(1, 10000000) // read a file from the specified path of type TextInputFormat val tuples = env.readHadoopFile(new TextInputFormat, classOf[LongWritable], @@ -1288,7 +1288,7 @@ val values: DataSet[(String, Int, Double)] = // [...] values.writeAsCsv("file:///path/to/the/result/file", "\n", "|") // this writes tuples in the text formatting "(a, b, c)", rather than as CSV lines -values.writeAsText("file:///path/to/the/result/file"); +values.writeAsText("file:///path/to/the/result/file") // this writes values as strings using a user-defined formatting values map { tuple => tuple._1 + " - " + tuple._2 } @@ -1309,19 +1309,19 @@ val pData: DataSet[(BookPojo, Double)] = // [...] val sData: DataSet[String] = // [...] // sort output on String field in ascending order -tData.sortPartition(1, Order.ASCENDING).print; +tData.sortPartition(1, Order.ASCENDING).print() // sort output on Double field in descending and Int field in ascending order -tData.sortPartition(2, Order.DESCENDING).sortPartition(0, Order.ASCENDING).print; +tData.sortPartition(2, Order.DESCENDING).sortPartition(0, Order.ASCENDING).print() // sort output on the "author" field of nested BookPojo in descending order -pData.sortPartition("_1.author", Order.DESCENDING).writeAsText(...); +pData.sortPartition("_1.author", Order.DESCENDING).writeAsText(...) // sort output on the full tuple in ascending order -tData.sortPartition("_", Order.ASCENDING).writeAsCsv(...); +tData.sortPartition("_", Order.ASCENDING).writeAsCsv(...) // sort atomic type (String) output in descending order -sData.sortPartition("_", Order.DESCENDING).writeAsText(...); +sData.sortPartition("_", Order.DESCENDING).writeAsText(...) {% endhighlight %} @@ -1486,7 +1486,7 @@ val result = count map { c => c / 10000.0 * 4 } result.print() -env.execute("Iterative Pi Example"); +env.execute("Iterative Pi Example") {% endhighlight %} You can also check out the @@ -1693,7 +1693,7 @@ val env = ExecutionEnvironment.createLocalEnvironment() val lines = env.readTextFile(pathToTextFile) // build your program -env.execute(); +env.execute() {% endhighlight %} diff --git a/docs/dev/connectors/elasticsearch.md b/docs/dev/connectors/elasticsearch.md index b6ee63c0449f8a002b1ab9e7a26d19ed9bdb14da..8774fcbcee868e8d1fbe0f8adcda88c57a813237 100644 --- a/docs/dev/connectors/elasticsearch.md +++ b/docs/dev/connectors/elasticsearch.md @@ -159,7 +159,7 @@ input.addSink(new ElasticsearchSink(config, transportAddresses, new Elasticsearc return Requests.indexRequest() .index("my-index") .type("my-type") - .source(json); + .source(json) } })) {% endhighlight %} @@ -185,7 +185,7 @@ input.addSink(new ElasticsearchSink(config, transportAddresses, new Elasticsearc return Requests.indexRequest() .index("my-index") .type("my-type") - .source(json); + .source(json) } })) {% endhighlight %} @@ -298,7 +298,7 @@ input.addSink(new ElasticsearchSink(config, new ElasticsearchSinkFunction[String return Requests.indexRequest() .index("my-index") .type("my-type") - .source(json); + .source(json) } })) {% endhighlight %} diff --git a/docs/dev/connectors/kafka.md b/docs/dev/connectors/kafka.md index 6c80370db4089cdff19a5f8b8d21437d4d61d4c9..daf1903502ffadf7be42f84f5992b815be79bcfd 100644 --- a/docs/dev/connectors/kafka.md +++ b/docs/dev/connectors/kafka.md @@ -130,14 +130,14 @@ DataStream stream = env
{% highlight scala %} -val properties = new Properties(); -properties.setProperty("bootstrap.servers", "localhost:9092"); +val properties = new Properties() +properties.setProperty("bootstrap.servers", "localhost:9092") // only required for Kafka 0.8 -properties.setProperty("zookeeper.connect", "localhost:2181"); -properties.setProperty("group.id", "test"); +properties.setProperty("zookeeper.connect", "localhost:2181") +properties.setProperty("group.id", "test") stream = env .addSource(new FlinkKafkaConsumer08[String]("topic", new SimpleStringSchema(), properties)) - .print + .print() {% endhighlight %}
@@ -422,17 +422,17 @@ DataStream stream = env
{% highlight scala %} -val properties = new Properties(); -properties.setProperty("bootstrap.servers", "localhost:9092"); +val properties = new Properties() +properties.setProperty("bootstrap.servers", "localhost:9092") // only required for Kafka 0.8 -properties.setProperty("zookeeper.connect", "localhost:2181"); -properties.setProperty("group.id", "test"); +properties.setProperty("zookeeper.connect", "localhost:2181") +properties.setProperty("group.id", "test") -val myConsumer = new FlinkKafkaConsumer08[String]("topic", new SimpleStringSchema(), properties); -myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter()); +val myConsumer = new FlinkKafkaConsumer08[String]("topic", new SimpleStringSchema(), properties) +myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter()) stream = env .addSource(myConsumer) - .print + .print() {% endhighlight %}
diff --git a/docs/dev/connectors/kinesis.md b/docs/dev/connectors/kinesis.md index 2c8b88a6797a7142f4a0b8422bac8e806b4e6616..ff22ee038a7705a767317a36c252002d24caf12f 100644 --- a/docs/dev/connectors/kinesis.md +++ b/docs/dev/connectors/kinesis.md @@ -86,11 +86,11 @@ DataStream kinesis = env.addSource(new FlinkKinesisConsumer<>(
{% highlight scala %} -val consumerConfig = new Properties(); -consumerConfig.put(ConsumerConfigConstants.AWS_REGION, "us-east-1"); -consumerConfig.put(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id"); -consumerConfig.put(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key"); -consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST"); +val consumerConfig = new Properties() +consumerConfig.put(ConsumerConfigConstants.AWS_REGION, "us-east-1") +consumerConfig.put(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id") +consumerConfig.put(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key") +consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST") val env = StreamExecutionEnvironment.getEnvironment @@ -295,28 +295,28 @@ simpleStringStream.addSink(kinesis);
{% highlight scala %} -val producerConfig = new Properties(); +val producerConfig = new Properties() // Required configs -producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1"); -producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id"); -producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key"); +producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1") +producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id") +producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key") // Optional KPL configs -producerConfig.put("AggregationMaxCount", "4294967295"); -producerConfig.put("CollectionMaxCount", "1000"); -producerConfig.put("RecordTtl", "30000"); -producerConfig.put("RequestTimeout", "6000"); -producerConfig.put("ThreadPoolSize", "15"); +producerConfig.put("AggregationMaxCount", "4294967295") +producerConfig.put("CollectionMaxCount", "1000") +producerConfig.put("RecordTtl", "30000") +producerConfig.put("RequestTimeout", "6000") +producerConfig.put("ThreadPoolSize", "15") // Switch KinesisProducer's threading model -// producerConfig.put("ThreadingModel", "PER_REQUEST"); +// producerConfig.put("ThreadingModel", "PER_REQUEST") -val kinesis = new FlinkKinesisProducer[String](new SimpleStringSchema, producerConfig); -kinesis.setFailOnError(true); -kinesis.setDefaultStream("kinesis_stream_name"); -kinesis.setDefaultPartition("0"); +val kinesis = new FlinkKinesisProducer[String](new SimpleStringSchema, producerConfig) +kinesis.setFailOnError(true) +kinesis.setDefaultStream("kinesis_stream_name") +kinesis.setDefaultPartition("0") -val simpleStringStream = ...; -simpleStringStream.addSink(kinesis); +val simpleStringStream = ... +simpleStringStream.addSink(kinesis) {% endhighlight %}
@@ -359,11 +359,11 @@ producerConfig.put(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4567");
{% highlight scala %} -val producerConfig = new Properties(); -producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1"); -producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id"); -producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key"); -producerConfig.put(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4567"); +val producerConfig = new Properties() +producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1") +producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id") +producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key") +producerConfig.put(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4567") {% endhighlight %}
diff --git a/docs/dev/connectors/twitter.md b/docs/dev/connectors/twitter.md index a563be6e726f3eb58b99419c2823eefea43863e6..e6fe32abd9596cbc0ab54dca0ca289282b8daa99 100644 --- a/docs/dev/connectors/twitter.md +++ b/docs/dev/connectors/twitter.md @@ -67,12 +67,12 @@ DataStream streamSource = env.addSource(new TwitterSource(props));
{% highlight scala %} -val props = new Properties(); -props.setProperty(TwitterSource.CONSUMER_KEY, ""); -props.setProperty(TwitterSource.CONSUMER_SECRET, ""); -props.setProperty(TwitterSource.TOKEN, ""); -props.setProperty(TwitterSource.TOKEN_SECRET, ""); -DataStream streamSource = env.addSource(new TwitterSource(props)); +val props = new Properties() +props.setProperty(TwitterSource.CONSUMER_KEY, "") +props.setProperty(TwitterSource.CONSUMER_SECRET, "") +props.setProperty(TwitterSource.TOKEN, "") +props.setProperty(TwitterSource.TOKEN_SECRET, "") +val streamSource = env.addSource(new TwitterSource(props)) {% endhighlight %}
diff --git a/docs/dev/datastream_api.md b/docs/dev/datastream_api.md index 307679d4ba656263e7b1d30da1f855a0b56c0203..6bb755edf9148bb1b1fd5c8e845feee1dd07c8e8 100644 --- a/docs/dev/datastream_api.md +++ b/docs/dev/datastream_api.md @@ -113,7 +113,7 @@ object WindowWordCount { .timeWindow(Time.seconds(5)) .sum(1) - counts.print + counts.print() env.execute("Window Stream WordCount") } diff --git a/docs/dev/event_timestamps_watermarks.md b/docs/dev/event_timestamps_watermarks.md index 802a079a5c9fc30a06bb6664ddbfd562ed381216..acde9e48254d09b8be89eb48d037b8f6ea786c36 100644 --- a/docs/dev/event_timestamps_watermarks.md +++ b/docs/dev/event_timestamps_watermarks.md @@ -154,7 +154,7 @@ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val stream: DataStream[MyEvent] = env.readFile( myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100, - FilePathFilter.createDefaultFilter()); + FilePathFilter.createDefaultFilter()) val withTimestampsAndWatermarks: DataStream[MyEvent] = stream .filter( _.severity == WARNING ) @@ -240,19 +240,19 @@ public class TimeLagWatermarkGenerator extends AssignerWithPeriodicWatermarks @@ -729,7 +729,7 @@ pattern.times(2)

By default a relaxed internal contiguity (between subsequent events) is used. For more info on internal contiguity see consecutive.

{% highlight scala %} -pattern.times(2, 4); +pattern.times(2, 4) {% endhighlight %} @@ -765,7 +765,7 @@ pattern.oneOrMore().greedy() Pattern.begin("start").where(_.getName().equals("c")) .followedBy("middle").where(_.getName().equals("a")) .oneOrMore().consecutive() - .followedBy("end1").where(_.getName().equals("b")); + .followedBy("end1").where(_.getName().equals("b")) {% endhighlight %}

Will generate the following matches for an input sequence: C D A1 A2 A3 D A4 B

@@ -786,7 +786,7 @@ Pattern.begin("start").where(_.getName().equals("c")) Pattern.begin("start").where(_.getName().equals("c")) .followedBy("middle").where(_.getName().equals("a")) .oneOrMore().allowCombinations() - .followedBy("end1").where(_.getName().equals("b")); + .followedBy("end1").where(_.getName().equals("b")) {% endhighlight %}

Will generate the following matches for an input sequence: C D A1 A2 A3 D A4 B

@@ -1491,7 +1491,7 @@ val result: SingleOutputStreamOperator[ComplexEvent] = patternStream.select(outp pattern: Map[String, Iterable[Event]] => ComplexEvent() } -val timeoutResult: DataStream = result.getSideOutput(outputTag); +val timeoutResult: DataStream = result.getSideOutput(outputTag) ~~~ The `flatSelect` API call offers the same overloaded version which takes as the first parameter a timeout function and as second parameter a selection function. @@ -1510,7 +1510,7 @@ val result: SingleOutputStreamOperator[ComplexEvent] = patternStream.flatSelect( out.collect(ComplexEvent()) } -val timeoutResult: DataStream = result.getSideOutput(outputTag); +val timeoutResult: DataStream = result.getSideOutput(outputTag) ~~~ diff --git a/docs/dev/libs/gelly/graph_api.md b/docs/dev/libs/gelly/graph_api.md index 465c24f83893a559c1ddafa2b1dc1961e7e0a06d..f00275e1cfb7882e027ab766244cf3bec7318d9d 100644 --- a/docs/dev/libs/gelly/graph_api.md +++ b/docs/dev/libs/gelly/graph_api.md @@ -773,7 +773,7 @@ final class SelectLargeWeightNeighbors extends NeighborsFunctionWithVertexValue[ for (neighbor <- neighbors) { if (neighbor._1.getValue() > 0.5) { - out.collect(vertex, neighbor._2); + out.collect(vertex, neighbor._2) } } } diff --git a/docs/dev/libs/gelly/library_methods.md b/docs/dev/libs/gelly/library_methods.md index 93a2c5dc6118186dc3cc7f8c249f595224f04eac..015f85a66a2827ac2dfae593d6a46f829d5a52cb 100644 --- a/docs/dev/libs/gelly/library_methods.md +++ b/docs/dev/libs/gelly/library_methods.md @@ -55,7 +55,7 @@ val graph: Graph[java.lang.Long, java.lang.Long, NullValue] = ... val verticesWithCommunity = graph.run(new LabelPropagation[java.lang.Long, java.lang.Long, NullValue](30)) // print the result -verticesWithCommunity.print +verticesWithCommunity.print() {% endhighlight %} diff --git a/docs/dev/stream/operators/asyncio.md b/docs/dev/stream/operators/asyncio.md index 32945e4ac868c17ac2f5ffc88c1f1f83075dec6f..c4736381ff4f5c172b2096031c5d45144494d08f 100644 --- a/docs/dev/stream/operators/asyncio.md +++ b/docs/dev/stream/operators/asyncio.md @@ -150,7 +150,7 @@ class AsyncDatabaseRequest extends AsyncFunction[String, (String, String)] { // set the callback to be executed once the request by the client is complete // the callback simply forwards the result to the result future resultFuture.onSuccess { - case result: String => resultFuture.complete(Iterable((str, result))); + case result: String => resultFuture.complete(Iterable((str, result))) } } } diff --git a/docs/dev/stream/state/custom_serialization.md b/docs/dev/stream/state/custom_serialization.md index ca6b07dd00ae17c552c41bcb87686c754d961f8f..7f886d2025fddda784fdfa287dcf55a91b770c1d 100644 --- a/docs/dev/stream/state/custom_serialization.md +++ b/docs/dev/stream/state/custom_serialization.md @@ -61,7 +61,7 @@ val descriptor = new ListStateDescriptor[(String, Integer)]( new CustomTypeSerializer) ) -checkpointedState = getRuntimeContext.getListState(descriptor); +checkpointedState = getRuntimeContext.getListState(descriptor) {% endhighlight %} diff --git a/docs/dev/stream/testing.md b/docs/dev/stream/testing.md index e5bc02414244c524cd826d602b12ba5f644f5524..ce31629ee74a1bc034b74599b6523eae70de03e3 100644 --- a/docs/dev/stream/testing.md +++ b/docs/dev/stream/testing.md @@ -247,8 +247,8 @@ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100));
{% highlight scala %} -env.enableCheckpointing(500); -env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100)); +env.enableCheckpointing(500) +env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100)) {% endhighlight %}
diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md index 1cf2a0c348a2162e31315230346d7af70cc6a2e6..2b58a62ff65b09948c32fe7729e2903e5fff0ab0 100644 --- a/docs/dev/table/tableApi.md +++ b/docs/dev/table/tableApi.md @@ -131,7 +131,7 @@ val result: Table = orders .select('a.lowerCase(), 'b, 'rowtime) .window(Tumble over 1.hour on 'rowtime as 'hourlyWindow) .groupBy('hourlyWindow, 'a) - .select('a, 'hourlyWindow.end as 'hour, 'b.avg as 'avgBillingAmount); + .select('a, 'hourlyWindow.end as 'hour, 'b.avg as 'avgBillingAmount) {% endhighlight %} @@ -355,7 +355,7 @@ Table result = orders

Similar to a SQL OVER clause. Over window aggregates are computed for each row, based on a window (range) of preceding and succeeding rows. See the over windows section for more details.

- {% highlight scala %} +{% highlight java %} Table orders = tableEnv.scan("Orders"); Table result = orders // define window @@ -364,8 +364,8 @@ Table result = orders .orderBy("rowtime") .preceding("UNBOUNDED_RANGE") .following("CURRENT_RANGE") - .as("w") - .select("a, b.avg over w, b.max over w, b.min over w") // sliding aggregate + .as("w")) + .select("a, b.avg over w, b.max over w, b.min over w"); // sliding aggregate {% endhighlight %}

Note: All aggregates must be defined over the same window, i.e., same partitioning, sorting, and range. Currently, only windows with PRECEDING (UNBOUNDED and bounded) to CURRENT ROW range are supported. Ranges with FOLLOWING are not supported yet. ORDER BY must be specified on a single time attribute.

@@ -448,7 +448,7 @@ val result: Table = orders preceding UNBOUNDED_RANGE following CURRENT_RANGE as 'w) - .select('a, 'b.avg over 'w, 'b.max over 'w, 'b.min over 'w,) // sliding aggregate + .select('a, 'b.avg over 'w, 'b.max over 'w, 'b.min over 'w) // sliding aggregate {% endhighlight %}

Note: All aggregates must be defined over the same window, i.e., same partitioning, sorting, and range. Currently, only windows with PRECEDING (UNBOUNDED and bounded) to CURRENT ROW range are supported. Ranges with FOLLOWING are not supported yet. ORDER BY must be specified on a single time attribute.

@@ -614,9 +614,9 @@ Table result = orders

Similar to a SQL JOIN clause. Joins two tables. Both tables must have distinct field names and at least one equality join predicate must be defined through join operator or using a where or filter operator.

{% highlight scala %} -val left = ds1.toTable(tableEnv, 'a, 'b, 'c); -val right = ds2.toTable(tableEnv, 'd, 'e, 'f); -val result = left.join(right).where('a === 'd).select('a, 'b, 'e); +val left = ds1.toTable(tableEnv, 'a, 'b, 'c) +val right = ds2.toTable(tableEnv, 'd, 'e, 'f) +val result = left.join(right).where('a === 'd).select('a, 'b, 'e) {% endhighlight %} @@ -656,12 +656,12 @@ val fullOuterResult = left.fullOuterJoin(right, 'a === 'd).select('a, 'b, 'e)

Note: Currently, only INNER time-windowed joins are supported.

{% highlight scala %} -val left = ds1.toTable(tableEnv, 'a, 'b, 'c, 'ltime.rowtime); -val right = ds2.toTable(tableEnv, 'd, 'e, 'f, 'rtime.rowtime); +val left = ds1.toTable(tableEnv, 'a, 'b, 'c, 'ltime.rowtime) +val right = ds2.toTable(tableEnv, 'd, 'e, 'f, 'rtime.rowtime) val result = left.join(right) .where('a === 'd && 'ltime >= 'rtime - 5.minutes && 'ltime < 'rtime + 10.minutes) - .select('a, 'b, 'e, 'ltime); + .select('a, 'b, 'e, 'ltime) {% endhighlight %} @@ -856,9 +856,9 @@ Table result = left.select("a, b, c").where("a.in(RightTable)");

Similar to a SQL UNION clause. Unions two tables with duplicate records removed, both tables must have identical field types.

{% highlight scala %} -val left = ds1.toTable(tableEnv, 'a, 'b, 'c); -val right = ds2.toTable(tableEnv, 'a, 'b, 'c); -val result = left.union(right); +val left = ds1.toTable(tableEnv, 'a, 'b, 'c) +val right = ds2.toTable(tableEnv, 'a, 'b, 'c) +val result = left.union(right) {% endhighlight %} @@ -872,9 +872,9 @@ val result = left.union(right);

Similar to a SQL UNION ALL clause. Unions two tables, both tables must have identical field types.

{% highlight scala %} -val left = ds1.toTable(tableEnv, 'a, 'b, 'c); -val right = ds2.toTable(tableEnv, 'a, 'b, 'c); -val result = left.unionAll(right); +val left = ds1.toTable(tableEnv, 'a, 'b, 'c) +val right = ds2.toTable(tableEnv, 'a, 'b, 'c) +val result = left.unionAll(right) {% endhighlight %} @@ -887,9 +887,9 @@ val result = left.unionAll(right);

Similar to a SQL INTERSECT clause. Intersect returns records that exist in both tables. If a record is present in one or both tables more than once, it is returned just once, i.e., the resulting table has no duplicate records. Both tables must have identical field types.

{% highlight scala %} -val left = ds1.toTable(tableEnv, 'a, 'b, 'c); -val right = ds2.toTable(tableEnv, 'e, 'f, 'g); -val result = left.intersect(right); +val left = ds1.toTable(tableEnv, 'a, 'b, 'c) +val right = ds2.toTable(tableEnv, 'e, 'f, 'g) +val result = left.intersect(right) {% endhighlight %} @@ -902,9 +902,9 @@ val result = left.intersect(right);

Similar to a SQL INTERSECT ALL clause. IntersectAll returns records that exist in both tables. If a record is present in both tables more than once, it is returned as many times as it is present in both tables, i.e., the resulting table might have duplicate records. Both tables must have identical field types.

{% highlight scala %} -val left = ds1.toTable(tableEnv, 'a, 'b, 'c); -val right = ds2.toTable(tableEnv, 'e, 'f, 'g); -val result = left.intersectAll(right); +val left = ds1.toTable(tableEnv, 'a, 'b, 'c) +val right = ds2.toTable(tableEnv, 'e, 'f, 'g) +val result = left.intersectAll(right) {% endhighlight %} @@ -917,9 +917,9 @@ val result = left.intersectAll(right);

Similar to a SQL EXCEPT clause. Minus returns records from the left table that do not exist in the right table. Duplicate records in the left table are returned exactly once, i.e., duplicates are removed. Both tables must have identical field types.

{% highlight scala %} -val left = ds1.toTable(tableEnv, 'a, 'b, 'c); -val right = ds2.toTable(tableEnv, 'a, 'b, 'c); -val result = left.minus(right); +val left = ds1.toTable(tableEnv, 'a, 'b, 'c) +val right = ds2.toTable(tableEnv, 'a, 'b, 'c) +val result = left.minus(right) {% endhighlight %} @@ -932,9 +932,9 @@ val result = left.minus(right);

Similar to a SQL EXCEPT ALL clause. MinusAll returns the records that do not exist in the right table. A record that is present n times in the left table and m times in the right table is returned (n - m) times, i.e., as many duplicates as are present in the right table are removed. Both tables must have identical field types.

{% highlight scala %} -val left = ds1.toTable(tableEnv, 'a, 'b, 'c); -val right = ds2.toTable(tableEnv, 'a, 'b, 'c); -val result = left.minusAll(right); +val left = ds1.toTable(tableEnv, 'a, 'b, 'c) +val right = ds2.toTable(tableEnv, 'a, 'b, 'c) +val result = left.minusAll(right) {% endhighlight %} @@ -947,9 +947,9 @@ val result = left.minusAll(right);

Similar to a SQL IN clause. In returns true if an expression exists in a given table sub-query. The sub-query table must consist of one column. This column must have the same data type as the expression.

{% highlight scala %} -val left = ds1.toTable(tableEnv, 'a, 'b, 'c); -val right = ds2.toTable(tableEnv, 'a); -val result = left.select('a, 'b, 'c).where('a.in(right)); +val left = ds1.toTable(tableEnv, 'a, 'b, 'c) +val right = ds2.toTable(tableEnv, 'a) +val result = left.select('a, 'b, 'c).where('a.in(right)) {% endhighlight %} @@ -1030,8 +1030,8 @@ Table result3 = in.orderBy("a.asc").offset(10).fetch(5);

Similar to a SQL ORDER BY clause. Returns records globally sorted across all parallel partitions.

{% highlight scala %} -val in = ds.toTable(tableEnv, 'a, 'b, 'c); -val result = in.orderBy('a.asc); +val in = ds.toTable(tableEnv, 'a, 'b, 'c) +val result = in.orderBy('a.asc) {% endhighlight %} diff --git a/docs/dev/table/udfs.md b/docs/dev/table/udfs.md index 71567d880564c712ca8cbbdd549b224ef2b9a8a0..0e0930211b171d7237bfc6517c8d1cd6517ec772 100644 --- a/docs/dev/table/udfs.md +++ b/docs/dev/table/udfs.md @@ -93,7 +93,7 @@ myTable.select('string, hashCode('string)) // register and use the function in SQL tableEnv.registerFunction("hashCode", new HashCode(10)) -tableEnv.sqlQuery("SELECT string, HASHCODE(string) FROM MyTable"); +tableEnv.sqlQuery("SELECT string, HASHCODE(string) FROM MyTable") {% endhighlight %} @@ -198,17 +198,17 @@ val myTable = ... // table schema: [a: String] // Use the table function in the Scala Table API (Note: No registration required in Scala Table API). val split = new Split("#") // "as" specifies the field names of the generated table. -myTable.join(split('a) as ('word, 'length)).select('a, 'word, 'length); -myTable.leftOuterJoin(split('a) as ('word, 'length)).select('a, 'word, 'length); +myTable.join(split('a) as ('word, 'length)).select('a, 'word, 'length) +myTable.leftOuterJoin(split('a) as ('word, 'length)).select('a, 'word, 'length) // Register the table function to use it in SQL queries. tableEnv.registerFunction("split", new Split("#")) // Use the table function in SQL with LATERAL and TABLE keywords. // CROSS JOIN a table function (equivalent to "join" in Table API) -tableEnv.sqlQuery("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as T(word, length)"); +tableEnv.sqlQuery("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as T(word, length)") // LEFT JOIN a table function (equivalent to "leftOuterJoin" in Table API) -tableEnv.sqlQuery("SELECT a, word, length FROM MyTable LEFT JOIN TABLE(split(a)) as T(word, length) ON TRUE"); +tableEnv.sqlQuery("SELECT a, word, length FROM MyTable LEFT JOIN TABLE(split(a)) as T(word, length) ON TRUE") {% endhighlight %} **IMPORTANT:** Do not implement TableFunction as a Scala object. Scala object is a singleton and will cause concurrency issues. @@ -723,7 +723,7 @@ tableEnv.sqlQuery("SELECT string, HASHCODE(string) FROM MyTable"); {% highlight scala %} object hashCode extends ScalarFunction { - var hashcode_factor = 12; + var hashcode_factor = 12 override def open(context: FunctionContext): Unit = { // access "hashcode_factor" parameter @@ -743,7 +743,7 @@ myTable.select('string, hashCode('string)) // register and use the function in SQL tableEnv.registerFunction("hashCode", hashCode) -tableEnv.sqlQuery("SELECT string, HASHCODE(string) FROM MyTable"); +tableEnv.sqlQuery("SELECT string, HASHCODE(string) FROM MyTable") {% endhighlight %}