From ae925b63f25dce989766dbbc155abfbb0e7c992c Mon Sep 17 00:00:00 2001 From: okumin Date: Mon, 1 Jan 2018 15:25:54 +0900 Subject: [PATCH] [hotfix] [docs] Fix Scala code snippets in docs. * remove unneeded semi-colons * add `()` to `print` method * typically, methods with some side-effects are invoked with `()` * fix a few misc issues This closes #5221. --- docs/dev/batch/index.md | 20 +++--- docs/dev/connectors/elasticsearch.md | 6 +- docs/dev/connectors/kafka.md | 24 +++---- docs/dev/connectors/kinesis.md | 52 +++++++------- docs/dev/connectors/twitter.md | 12 ++-- docs/dev/datastream_api.md | 2 +- docs/dev/event_timestamps_watermarks.md | 12 ++-- docs/dev/libs/cep.md | 30 ++++---- docs/dev/libs/gelly/graph_api.md | 2 +- docs/dev/libs/gelly/library_methods.md | 2 +- docs/dev/stream/operators/asyncio.md | 2 +- docs/dev/stream/state/custom_serialization.md | 2 +- docs/dev/stream/testing.md | 4 +- docs/dev/table/tableApi.md | 68 +++++++++---------- docs/dev/table/udfs.md | 14 ++-- 15 files changed, 126 insertions(+), 126 deletions(-) diff --git a/docs/dev/batch/index.md b/docs/dev/batch/index.md index 7fb84e8d272..cb3b42c6bba 100644 --- a/docs/dev/batch/index.md +++ b/docs/dev/batch/index.md @@ -571,7 +571,7 @@ data.reduceGroup { elements => elements.sum } data set.

{% highlight scala %} val input: DataSet[(Int, String, Double)] = // [...] -val output: DataSet[(Int, String, Doublr)] = input.aggregate(SUM, 0).aggregate(MIN, 2); +val output: DataSet[(Int, String, Double)] = input.aggregate(SUM, 0).aggregate(MIN, 2) {% endhighlight %}

You can also use short-hand syntax for minimum, maximum, and sum aggregations.

{% highlight scala %} @@ -1037,7 +1037,7 @@ val csvInput = env.readCsvFile[Person]( val values = env.fromElements("Foo", "bar", "foobar", "fubar") // generate a number sequence -val numbers = env.generateSequence(1, 10000000); +val numbers = env.generateSequence(1, 10000000) // read a file from the specified path of type TextInputFormat val tuples = env.readHadoopFile(new TextInputFormat, classOf[LongWritable], @@ -1288,7 +1288,7 @@ val values: DataSet[(String, Int, Double)] = // [...] values.writeAsCsv("file:///path/to/the/result/file", "\n", "|") // this writes tuples in the text formatting "(a, b, c)", rather than as CSV lines -values.writeAsText("file:///path/to/the/result/file"); +values.writeAsText("file:///path/to/the/result/file") // this writes values as strings using a user-defined formatting values map { tuple => tuple._1 + " - " + tuple._2 } @@ -1309,19 +1309,19 @@ val pData: DataSet[(BookPojo, Double)] = // [...] val sData: DataSet[String] = // [...] // sort output on String field in ascending order -tData.sortPartition(1, Order.ASCENDING).print; +tData.sortPartition(1, Order.ASCENDING).print() // sort output on Double field in descending and Int field in ascending order -tData.sortPartition(2, Order.DESCENDING).sortPartition(0, Order.ASCENDING).print; +tData.sortPartition(2, Order.DESCENDING).sortPartition(0, Order.ASCENDING).print() // sort output on the "author" field of nested BookPojo in descending order -pData.sortPartition("_1.author", Order.DESCENDING).writeAsText(...); +pData.sortPartition("_1.author", Order.DESCENDING).writeAsText(...) // sort output on the full tuple in ascending order -tData.sortPartition("_", Order.ASCENDING).writeAsCsv(...); +tData.sortPartition("_", Order.ASCENDING).writeAsCsv(...) // sort atomic type (String) output in descending order -sData.sortPartition("_", Order.DESCENDING).writeAsText(...); +sData.sortPartition("_", Order.DESCENDING).writeAsText(...) {% endhighlight %} @@ -1486,7 +1486,7 @@ val result = count map { c => c / 10000.0 * 4 } result.print() -env.execute("Iterative Pi Example"); +env.execute("Iterative Pi Example") {% endhighlight %} You can also check out the @@ -1693,7 +1693,7 @@ val env = ExecutionEnvironment.createLocalEnvironment() val lines = env.readTextFile(pathToTextFile) // build your program -env.execute(); +env.execute() {% endhighlight %} diff --git a/docs/dev/connectors/elasticsearch.md b/docs/dev/connectors/elasticsearch.md index b6ee63c0449..8774fcbcee8 100644 --- a/docs/dev/connectors/elasticsearch.md +++ b/docs/dev/connectors/elasticsearch.md @@ -159,7 +159,7 @@ input.addSink(new ElasticsearchSink(config, transportAddresses, new Elasticsearc return Requests.indexRequest() .index("my-index") .type("my-type") - .source(json); + .source(json) } })) {% endhighlight %} @@ -185,7 +185,7 @@ input.addSink(new ElasticsearchSink(config, transportAddresses, new Elasticsearc return Requests.indexRequest() .index("my-index") .type("my-type") - .source(json); + .source(json) } })) {% endhighlight %} @@ -298,7 +298,7 @@ input.addSink(new ElasticsearchSink(config, new ElasticsearchSinkFunction[String return Requests.indexRequest() .index("my-index") .type("my-type") - .source(json); + .source(json) } })) {% endhighlight %} diff --git a/docs/dev/connectors/kafka.md b/docs/dev/connectors/kafka.md index 6c80370db40..daf1903502f 100644 --- a/docs/dev/connectors/kafka.md +++ b/docs/dev/connectors/kafka.md @@ -130,14 +130,14 @@ DataStream stream = env
{% highlight scala %} -val properties = new Properties(); -properties.setProperty("bootstrap.servers", "localhost:9092"); +val properties = new Properties() +properties.setProperty("bootstrap.servers", "localhost:9092") // only required for Kafka 0.8 -properties.setProperty("zookeeper.connect", "localhost:2181"); -properties.setProperty("group.id", "test"); +properties.setProperty("zookeeper.connect", "localhost:2181") +properties.setProperty("group.id", "test") stream = env .addSource(new FlinkKafkaConsumer08[String]("topic", new SimpleStringSchema(), properties)) - .print + .print() {% endhighlight %}
@@ -422,17 +422,17 @@ DataStream stream = env
{% highlight scala %} -val properties = new Properties(); -properties.setProperty("bootstrap.servers", "localhost:9092"); +val properties = new Properties() +properties.setProperty("bootstrap.servers", "localhost:9092") // only required for Kafka 0.8 -properties.setProperty("zookeeper.connect", "localhost:2181"); -properties.setProperty("group.id", "test"); +properties.setProperty("zookeeper.connect", "localhost:2181") +properties.setProperty("group.id", "test") -val myConsumer = new FlinkKafkaConsumer08[String]("topic", new SimpleStringSchema(), properties); -myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter()); +val myConsumer = new FlinkKafkaConsumer08[String]("topic", new SimpleStringSchema(), properties) +myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter()) stream = env .addSource(myConsumer) - .print + .print() {% endhighlight %}
diff --git a/docs/dev/connectors/kinesis.md b/docs/dev/connectors/kinesis.md index 2c8b88a6797..ff22ee038a7 100644 --- a/docs/dev/connectors/kinesis.md +++ b/docs/dev/connectors/kinesis.md @@ -86,11 +86,11 @@ DataStream kinesis = env.addSource(new FlinkKinesisConsumer<>(
{% highlight scala %} -val consumerConfig = new Properties(); -consumerConfig.put(ConsumerConfigConstants.AWS_REGION, "us-east-1"); -consumerConfig.put(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id"); -consumerConfig.put(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key"); -consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST"); +val consumerConfig = new Properties() +consumerConfig.put(ConsumerConfigConstants.AWS_REGION, "us-east-1") +consumerConfig.put(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id") +consumerConfig.put(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key") +consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST") val env = StreamExecutionEnvironment.getEnvironment @@ -295,28 +295,28 @@ simpleStringStream.addSink(kinesis);
{% highlight scala %} -val producerConfig = new Properties(); +val producerConfig = new Properties() // Required configs -producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1"); -producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id"); -producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key"); +producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1") +producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id") +producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key") // Optional KPL configs -producerConfig.put("AggregationMaxCount", "4294967295"); -producerConfig.put("CollectionMaxCount", "1000"); -producerConfig.put("RecordTtl", "30000"); -producerConfig.put("RequestTimeout", "6000"); -producerConfig.put("ThreadPoolSize", "15"); +producerConfig.put("AggregationMaxCount", "4294967295") +producerConfig.put("CollectionMaxCount", "1000") +producerConfig.put("RecordTtl", "30000") +producerConfig.put("RequestTimeout", "6000") +producerConfig.put("ThreadPoolSize", "15") // Switch KinesisProducer's threading model -// producerConfig.put("ThreadingModel", "PER_REQUEST"); +// producerConfig.put("ThreadingModel", "PER_REQUEST") -val kinesis = new FlinkKinesisProducer[String](new SimpleStringSchema, producerConfig); -kinesis.setFailOnError(true); -kinesis.setDefaultStream("kinesis_stream_name"); -kinesis.setDefaultPartition("0"); +val kinesis = new FlinkKinesisProducer[String](new SimpleStringSchema, producerConfig) +kinesis.setFailOnError(true) +kinesis.setDefaultStream("kinesis_stream_name") +kinesis.setDefaultPartition("0") -val simpleStringStream = ...; -simpleStringStream.addSink(kinesis); +val simpleStringStream = ... +simpleStringStream.addSink(kinesis) {% endhighlight %}
@@ -359,11 +359,11 @@ producerConfig.put(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4567");
{% highlight scala %} -val producerConfig = new Properties(); -producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1"); -producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id"); -producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key"); -producerConfig.put(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4567"); +val producerConfig = new Properties() +producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1") +producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id") +producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key") +producerConfig.put(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4567") {% endhighlight %}
diff --git a/docs/dev/connectors/twitter.md b/docs/dev/connectors/twitter.md index a563be6e726..e6fe32abd95 100644 --- a/docs/dev/connectors/twitter.md +++ b/docs/dev/connectors/twitter.md @@ -67,12 +67,12 @@ DataStream streamSource = env.addSource(new TwitterSource(props));
{% highlight scala %} -val props = new Properties(); -props.setProperty(TwitterSource.CONSUMER_KEY, ""); -props.setProperty(TwitterSource.CONSUMER_SECRET, ""); -props.setProperty(TwitterSource.TOKEN, ""); -props.setProperty(TwitterSource.TOKEN_SECRET, ""); -DataStream streamSource = env.addSource(new TwitterSource(props)); +val props = new Properties() +props.setProperty(TwitterSource.CONSUMER_KEY, "") +props.setProperty(TwitterSource.CONSUMER_SECRET, "") +props.setProperty(TwitterSource.TOKEN, "") +props.setProperty(TwitterSource.TOKEN_SECRET, "") +val streamSource = env.addSource(new TwitterSource(props)) {% endhighlight %}
diff --git a/docs/dev/datastream_api.md b/docs/dev/datastream_api.md index 307679d4ba6..6bb755edf91 100644 --- a/docs/dev/datastream_api.md +++ b/docs/dev/datastream_api.md @@ -113,7 +113,7 @@ object WindowWordCount { .timeWindow(Time.seconds(5)) .sum(1) - counts.print + counts.print() env.execute("Window Stream WordCount") } diff --git a/docs/dev/event_timestamps_watermarks.md b/docs/dev/event_timestamps_watermarks.md index 802a079a5c9..acde9e48254 100644 --- a/docs/dev/event_timestamps_watermarks.md +++ b/docs/dev/event_timestamps_watermarks.md @@ -154,7 +154,7 @@ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val stream: DataStream[MyEvent] = env.readFile( myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100, - FilePathFilter.createDefaultFilter()); + FilePathFilter.createDefaultFilter()) val withTimestampsAndWatermarks: DataStream[MyEvent] = stream .filter( _.severity == WARNING ) @@ -240,19 +240,19 @@ public class TimeLagWatermarkGenerator extends AssignerWithPeriodicWatermarks @@ -729,7 +729,7 @@ pattern.times(2)

By default a relaxed internal contiguity (between subsequent events) is used. For more info on internal contiguity see consecutive.

{% highlight scala %} -pattern.times(2, 4); +pattern.times(2, 4) {% endhighlight %} @@ -765,7 +765,7 @@ pattern.oneOrMore().greedy() Pattern.begin("start").where(_.getName().equals("c")) .followedBy("middle").where(_.getName().equals("a")) .oneOrMore().consecutive() - .followedBy("end1").where(_.getName().equals("b")); + .followedBy("end1").where(_.getName().equals("b")) {% endhighlight %}

Will generate the following matches for an input sequence: C D A1 A2 A3 D A4 B

@@ -786,7 +786,7 @@ Pattern.begin("start").where(_.getName().equals("c")) Pattern.begin("start").where(_.getName().equals("c")) .followedBy("middle").where(_.getName().equals("a")) .oneOrMore().allowCombinations() - .followedBy("end1").where(_.getName().equals("b")); + .followedBy("end1").where(_.getName().equals("b")) {% endhighlight %}

Will generate the following matches for an input sequence: C D A1 A2 A3 D A4 B

@@ -1491,7 +1491,7 @@ val result: SingleOutputStreamOperator[ComplexEvent] = patternStream.select(outp pattern: Map[String, Iterable[Event]] => ComplexEvent() } -val timeoutResult: DataStream = result.getSideOutput(outputTag); +val timeoutResult: DataStream = result.getSideOutput(outputTag) ~~~ The `flatSelect` API call offers the same overloaded version which takes as the first parameter a timeout function and as second parameter a selection function. @@ -1510,7 +1510,7 @@ val result: SingleOutputStreamOperator[ComplexEvent] = patternStream.flatSelect( out.collect(ComplexEvent()) } -val timeoutResult: DataStream = result.getSideOutput(outputTag); +val timeoutResult: DataStream = result.getSideOutput(outputTag) ~~~ diff --git a/docs/dev/libs/gelly/graph_api.md b/docs/dev/libs/gelly/graph_api.md index 465c24f8389..f00275e1cfb 100644 --- a/docs/dev/libs/gelly/graph_api.md +++ b/docs/dev/libs/gelly/graph_api.md @@ -773,7 +773,7 @@ final class SelectLargeWeightNeighbors extends NeighborsFunctionWithVertexValue[ for (neighbor <- neighbors) { if (neighbor._1.getValue() > 0.5) { - out.collect(vertex, neighbor._2); + out.collect(vertex, neighbor._2) } } } diff --git a/docs/dev/libs/gelly/library_methods.md b/docs/dev/libs/gelly/library_methods.md index 93a2c5dc611..015f85a66a2 100644 --- a/docs/dev/libs/gelly/library_methods.md +++ b/docs/dev/libs/gelly/library_methods.md @@ -55,7 +55,7 @@ val graph: Graph[java.lang.Long, java.lang.Long, NullValue] = ... val verticesWithCommunity = graph.run(new LabelPropagation[java.lang.Long, java.lang.Long, NullValue](30)) // print the result -verticesWithCommunity.print +verticesWithCommunity.print() {% endhighlight %} diff --git a/docs/dev/stream/operators/asyncio.md b/docs/dev/stream/operators/asyncio.md index 32945e4ac86..c4736381ff4 100644 --- a/docs/dev/stream/operators/asyncio.md +++ b/docs/dev/stream/operators/asyncio.md @@ -150,7 +150,7 @@ class AsyncDatabaseRequest extends AsyncFunction[String, (String, String)] { // set the callback to be executed once the request by the client is complete // the callback simply forwards the result to the result future resultFuture.onSuccess { - case result: String => resultFuture.complete(Iterable((str, result))); + case result: String => resultFuture.complete(Iterable((str, result))) } } } diff --git a/docs/dev/stream/state/custom_serialization.md b/docs/dev/stream/state/custom_serialization.md index ca6b07dd00a..7f886d2025f 100644 --- a/docs/dev/stream/state/custom_serialization.md +++ b/docs/dev/stream/state/custom_serialization.md @@ -61,7 +61,7 @@ val descriptor = new ListStateDescriptor[(String, Integer)]( new CustomTypeSerializer) ) -checkpointedState = getRuntimeContext.getListState(descriptor); +checkpointedState = getRuntimeContext.getListState(descriptor) {% endhighlight %} diff --git a/docs/dev/stream/testing.md b/docs/dev/stream/testing.md index e5bc0241424..ce31629ee74 100644 --- a/docs/dev/stream/testing.md +++ b/docs/dev/stream/testing.md @@ -247,8 +247,8 @@ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100));
{% highlight scala %} -env.enableCheckpointing(500); -env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100)); +env.enableCheckpointing(500) +env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100)) {% endhighlight %}
diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md index 1cf2a0c348a..2b58a62ff65 100644 --- a/docs/dev/table/tableApi.md +++ b/docs/dev/table/tableApi.md @@ -131,7 +131,7 @@ val result: Table = orders .select('a.lowerCase(), 'b, 'rowtime) .window(Tumble over 1.hour on 'rowtime as 'hourlyWindow) .groupBy('hourlyWindow, 'a) - .select('a, 'hourlyWindow.end as 'hour, 'b.avg as 'avgBillingAmount); + .select('a, 'hourlyWindow.end as 'hour, 'b.avg as 'avgBillingAmount) {% endhighlight %} @@ -355,7 +355,7 @@ Table result = orders

Similar to a SQL OVER clause. Over window aggregates are computed for each row, based on a window (range) of preceding and succeeding rows. See the over windows section for more details.

- {% highlight scala %} +{% highlight java %} Table orders = tableEnv.scan("Orders"); Table result = orders // define window @@ -364,8 +364,8 @@ Table result = orders .orderBy("rowtime") .preceding("UNBOUNDED_RANGE") .following("CURRENT_RANGE") - .as("w") - .select("a, b.avg over w, b.max over w, b.min over w") // sliding aggregate + .as("w")) + .select("a, b.avg over w, b.max over w, b.min over w"); // sliding aggregate {% endhighlight %}

Note: All aggregates must be defined over the same window, i.e., same partitioning, sorting, and range. Currently, only windows with PRECEDING (UNBOUNDED and bounded) to CURRENT ROW range are supported. Ranges with FOLLOWING are not supported yet. ORDER BY must be specified on a single time attribute.

@@ -448,7 +448,7 @@ val result: Table = orders preceding UNBOUNDED_RANGE following CURRENT_RANGE as 'w) - .select('a, 'b.avg over 'w, 'b.max over 'w, 'b.min over 'w,) // sliding aggregate + .select('a, 'b.avg over 'w, 'b.max over 'w, 'b.min over 'w) // sliding aggregate {% endhighlight %}

Note: All aggregates must be defined over the same window, i.e., same partitioning, sorting, and range. Currently, only windows with PRECEDING (UNBOUNDED and bounded) to CURRENT ROW range are supported. Ranges with FOLLOWING are not supported yet. ORDER BY must be specified on a single time attribute.

@@ -614,9 +614,9 @@ Table result = orders

Similar to a SQL JOIN clause. Joins two tables. Both tables must have distinct field names and at least one equality join predicate must be defined through join operator or using a where or filter operator.

{% highlight scala %} -val left = ds1.toTable(tableEnv, 'a, 'b, 'c); -val right = ds2.toTable(tableEnv, 'd, 'e, 'f); -val result = left.join(right).where('a === 'd).select('a, 'b, 'e); +val left = ds1.toTable(tableEnv, 'a, 'b, 'c) +val right = ds2.toTable(tableEnv, 'd, 'e, 'f) +val result = left.join(right).where('a === 'd).select('a, 'b, 'e) {% endhighlight %} @@ -656,12 +656,12 @@ val fullOuterResult = left.fullOuterJoin(right, 'a === 'd).select('a, 'b, 'e)

Note: Currently, only INNER time-windowed joins are supported.

{% highlight scala %} -val left = ds1.toTable(tableEnv, 'a, 'b, 'c, 'ltime.rowtime); -val right = ds2.toTable(tableEnv, 'd, 'e, 'f, 'rtime.rowtime); +val left = ds1.toTable(tableEnv, 'a, 'b, 'c, 'ltime.rowtime) +val right = ds2.toTable(tableEnv, 'd, 'e, 'f, 'rtime.rowtime) val result = left.join(right) .where('a === 'd && 'ltime >= 'rtime - 5.minutes && 'ltime < 'rtime + 10.minutes) - .select('a, 'b, 'e, 'ltime); + .select('a, 'b, 'e, 'ltime) {% endhighlight %} @@ -856,9 +856,9 @@ Table result = left.select("a, b, c").where("a.in(RightTable)");

Similar to a SQL UNION clause. Unions two tables with duplicate records removed, both tables must have identical field types.

{% highlight scala %} -val left = ds1.toTable(tableEnv, 'a, 'b, 'c); -val right = ds2.toTable(tableEnv, 'a, 'b, 'c); -val result = left.union(right); +val left = ds1.toTable(tableEnv, 'a, 'b, 'c) +val right = ds2.toTable(tableEnv, 'a, 'b, 'c) +val result = left.union(right) {% endhighlight %} @@ -872,9 +872,9 @@ val result = left.union(right);

Similar to a SQL UNION ALL clause. Unions two tables, both tables must have identical field types.

{% highlight scala %} -val left = ds1.toTable(tableEnv, 'a, 'b, 'c); -val right = ds2.toTable(tableEnv, 'a, 'b, 'c); -val result = left.unionAll(right); +val left = ds1.toTable(tableEnv, 'a, 'b, 'c) +val right = ds2.toTable(tableEnv, 'a, 'b, 'c) +val result = left.unionAll(right) {% endhighlight %} @@ -887,9 +887,9 @@ val result = left.unionAll(right);

Similar to a SQL INTERSECT clause. Intersect returns records that exist in both tables. If a record is present in one or both tables more than once, it is returned just once, i.e., the resulting table has no duplicate records. Both tables must have identical field types.

{% highlight scala %} -val left = ds1.toTable(tableEnv, 'a, 'b, 'c); -val right = ds2.toTable(tableEnv, 'e, 'f, 'g); -val result = left.intersect(right); +val left = ds1.toTable(tableEnv, 'a, 'b, 'c) +val right = ds2.toTable(tableEnv, 'e, 'f, 'g) +val result = left.intersect(right) {% endhighlight %} @@ -902,9 +902,9 @@ val result = left.intersect(right);

Similar to a SQL INTERSECT ALL clause. IntersectAll returns records that exist in both tables. If a record is present in both tables more than once, it is returned as many times as it is present in both tables, i.e., the resulting table might have duplicate records. Both tables must have identical field types.

{% highlight scala %} -val left = ds1.toTable(tableEnv, 'a, 'b, 'c); -val right = ds2.toTable(tableEnv, 'e, 'f, 'g); -val result = left.intersectAll(right); +val left = ds1.toTable(tableEnv, 'a, 'b, 'c) +val right = ds2.toTable(tableEnv, 'e, 'f, 'g) +val result = left.intersectAll(right) {% endhighlight %} @@ -917,9 +917,9 @@ val result = left.intersectAll(right);

Similar to a SQL EXCEPT clause. Minus returns records from the left table that do not exist in the right table. Duplicate records in the left table are returned exactly once, i.e., duplicates are removed. Both tables must have identical field types.

{% highlight scala %} -val left = ds1.toTable(tableEnv, 'a, 'b, 'c); -val right = ds2.toTable(tableEnv, 'a, 'b, 'c); -val result = left.minus(right); +val left = ds1.toTable(tableEnv, 'a, 'b, 'c) +val right = ds2.toTable(tableEnv, 'a, 'b, 'c) +val result = left.minus(right) {% endhighlight %} @@ -932,9 +932,9 @@ val result = left.minus(right);

Similar to a SQL EXCEPT ALL clause. MinusAll returns the records that do not exist in the right table. A record that is present n times in the left table and m times in the right table is returned (n - m) times, i.e., as many duplicates as are present in the right table are removed. Both tables must have identical field types.

{% highlight scala %} -val left = ds1.toTable(tableEnv, 'a, 'b, 'c); -val right = ds2.toTable(tableEnv, 'a, 'b, 'c); -val result = left.minusAll(right); +val left = ds1.toTable(tableEnv, 'a, 'b, 'c) +val right = ds2.toTable(tableEnv, 'a, 'b, 'c) +val result = left.minusAll(right) {% endhighlight %} @@ -947,9 +947,9 @@ val result = left.minusAll(right);

Similar to a SQL IN clause. In returns true if an expression exists in a given table sub-query. The sub-query table must consist of one column. This column must have the same data type as the expression.

{% highlight scala %} -val left = ds1.toTable(tableEnv, 'a, 'b, 'c); -val right = ds2.toTable(tableEnv, 'a); -val result = left.select('a, 'b, 'c).where('a.in(right)); +val left = ds1.toTable(tableEnv, 'a, 'b, 'c) +val right = ds2.toTable(tableEnv, 'a) +val result = left.select('a, 'b, 'c).where('a.in(right)) {% endhighlight %} @@ -1030,8 +1030,8 @@ Table result3 = in.orderBy("a.asc").offset(10).fetch(5);

Similar to a SQL ORDER BY clause. Returns records globally sorted across all parallel partitions.

{% highlight scala %} -val in = ds.toTable(tableEnv, 'a, 'b, 'c); -val result = in.orderBy('a.asc); +val in = ds.toTable(tableEnv, 'a, 'b, 'c) +val result = in.orderBy('a.asc) {% endhighlight %} diff --git a/docs/dev/table/udfs.md b/docs/dev/table/udfs.md index 71567d88056..0e0930211b1 100644 --- a/docs/dev/table/udfs.md +++ b/docs/dev/table/udfs.md @@ -93,7 +93,7 @@ myTable.select('string, hashCode('string)) // register and use the function in SQL tableEnv.registerFunction("hashCode", new HashCode(10)) -tableEnv.sqlQuery("SELECT string, HASHCODE(string) FROM MyTable"); +tableEnv.sqlQuery("SELECT string, HASHCODE(string) FROM MyTable") {% endhighlight %} @@ -198,17 +198,17 @@ val myTable = ... // table schema: [a: String] // Use the table function in the Scala Table API (Note: No registration required in Scala Table API). val split = new Split("#") // "as" specifies the field names of the generated table. -myTable.join(split('a) as ('word, 'length)).select('a, 'word, 'length); -myTable.leftOuterJoin(split('a) as ('word, 'length)).select('a, 'word, 'length); +myTable.join(split('a) as ('word, 'length)).select('a, 'word, 'length) +myTable.leftOuterJoin(split('a) as ('word, 'length)).select('a, 'word, 'length) // Register the table function to use it in SQL queries. tableEnv.registerFunction("split", new Split("#")) // Use the table function in SQL with LATERAL and TABLE keywords. // CROSS JOIN a table function (equivalent to "join" in Table API) -tableEnv.sqlQuery("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as T(word, length)"); +tableEnv.sqlQuery("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as T(word, length)") // LEFT JOIN a table function (equivalent to "leftOuterJoin" in Table API) -tableEnv.sqlQuery("SELECT a, word, length FROM MyTable LEFT JOIN TABLE(split(a)) as T(word, length) ON TRUE"); +tableEnv.sqlQuery("SELECT a, word, length FROM MyTable LEFT JOIN TABLE(split(a)) as T(word, length) ON TRUE") {% endhighlight %} **IMPORTANT:** Do not implement TableFunction as a Scala object. Scala object is a singleton and will cause concurrency issues. @@ -723,7 +723,7 @@ tableEnv.sqlQuery("SELECT string, HASHCODE(string) FROM MyTable"); {% highlight scala %} object hashCode extends ScalarFunction { - var hashcode_factor = 12; + var hashcode_factor = 12 override def open(context: FunctionContext): Unit = { // access "hashcode_factor" parameter @@ -743,7 +743,7 @@ myTable.select('string, hashCode('string)) // register and use the function in SQL tableEnv.registerFunction("hashCode", hashCode) -tableEnv.sqlQuery("SELECT string, HASHCODE(string) FROM MyTable"); +tableEnv.sqlQuery("SELECT string, HASHCODE(string) FROM MyTable") {% endhighlight %} -- GitLab