提交 ae925b63 编写于 作者: O okumin 提交者: Fabian Hueske

[hotfix] [docs] Fix Scala code snippets in docs.

* remove unneeded semi-colons
* add `()` to `print` method
    * typically, methods with some side-effects are invoked with `()`
* fix a few misc issues

This closes #5221.
上级 473112ce
...@@ -571,7 +571,7 @@ data.reduceGroup { elements => elements.sum } ...@@ -571,7 +571,7 @@ data.reduceGroup { elements => elements.sum }
data set.</p> data set.</p>
{% highlight scala %} {% highlight scala %}
val input: DataSet[(Int, String, Double)] = // [...] val input: DataSet[(Int, String, Double)] = // [...]
val output: DataSet[(Int, String, Doublr)] = input.aggregate(SUM, 0).aggregate(MIN, 2); val output: DataSet[(Int, String, Double)] = input.aggregate(SUM, 0).aggregate(MIN, 2)
{% endhighlight %} {% endhighlight %}
<p>You can also use short-hand syntax for minimum, maximum, and sum aggregations.</p> <p>You can also use short-hand syntax for minimum, maximum, and sum aggregations.</p>
{% highlight scala %} {% highlight scala %}
...@@ -1037,7 +1037,7 @@ val csvInput = env.readCsvFile[Person]( ...@@ -1037,7 +1037,7 @@ val csvInput = env.readCsvFile[Person](
val values = env.fromElements("Foo", "bar", "foobar", "fubar") val values = env.fromElements("Foo", "bar", "foobar", "fubar")
// generate a number sequence // generate a number sequence
val numbers = env.generateSequence(1, 10000000); val numbers = env.generateSequence(1, 10000000)
// read a file from the specified path of type TextInputFormat // read a file from the specified path of type TextInputFormat
val tuples = env.readHadoopFile(new TextInputFormat, classOf[LongWritable], val tuples = env.readHadoopFile(new TextInputFormat, classOf[LongWritable],
...@@ -1288,7 +1288,7 @@ val values: DataSet[(String, Int, Double)] = // [...] ...@@ -1288,7 +1288,7 @@ val values: DataSet[(String, Int, Double)] = // [...]
values.writeAsCsv("file:///path/to/the/result/file", "\n", "|") values.writeAsCsv("file:///path/to/the/result/file", "\n", "|")
// this writes tuples in the text formatting "(a, b, c)", rather than as CSV lines // this writes tuples in the text formatting "(a, b, c)", rather than as CSV lines
values.writeAsText("file:///path/to/the/result/file"); values.writeAsText("file:///path/to/the/result/file")
// this writes values as strings using a user-defined formatting // this writes values as strings using a user-defined formatting
values map { tuple => tuple._1 + " - " + tuple._2 } values map { tuple => tuple._1 + " - " + tuple._2 }
...@@ -1309,19 +1309,19 @@ val pData: DataSet[(BookPojo, Double)] = // [...] ...@@ -1309,19 +1309,19 @@ val pData: DataSet[(BookPojo, Double)] = // [...]
val sData: DataSet[String] = // [...] val sData: DataSet[String] = // [...]
// sort output on String field in ascending order // sort output on String field in ascending order
tData.sortPartition(1, Order.ASCENDING).print; tData.sortPartition(1, Order.ASCENDING).print()
// sort output on Double field in descending and Int field in ascending order // sort output on Double field in descending and Int field in ascending order
tData.sortPartition(2, Order.DESCENDING).sortPartition(0, Order.ASCENDING).print; tData.sortPartition(2, Order.DESCENDING).sortPartition(0, Order.ASCENDING).print()
// sort output on the "author" field of nested BookPojo in descending order // sort output on the "author" field of nested BookPojo in descending order
pData.sortPartition("_1.author", Order.DESCENDING).writeAsText(...); pData.sortPartition("_1.author", Order.DESCENDING).writeAsText(...)
// sort output on the full tuple in ascending order // sort output on the full tuple in ascending order
tData.sortPartition("_", Order.ASCENDING).writeAsCsv(...); tData.sortPartition("_", Order.ASCENDING).writeAsCsv(...)
// sort atomic type (String) output in descending order // sort atomic type (String) output in descending order
sData.sortPartition("_", Order.DESCENDING).writeAsText(...); sData.sortPartition("_", Order.DESCENDING).writeAsText(...)
{% endhighlight %} {% endhighlight %}
...@@ -1486,7 +1486,7 @@ val result = count map { c => c / 10000.0 * 4 } ...@@ -1486,7 +1486,7 @@ val result = count map { c => c / 10000.0 * 4 }
result.print() result.print()
env.execute("Iterative Pi Example"); env.execute("Iterative Pi Example")
{% endhighlight %} {% endhighlight %}
You can also check out the You can also check out the
...@@ -1693,7 +1693,7 @@ val env = ExecutionEnvironment.createLocalEnvironment() ...@@ -1693,7 +1693,7 @@ val env = ExecutionEnvironment.createLocalEnvironment()
val lines = env.readTextFile(pathToTextFile) val lines = env.readTextFile(pathToTextFile)
// build your program // build your program
env.execute(); env.execute()
{% endhighlight %} {% endhighlight %}
</div> </div>
</div> </div>
......
...@@ -159,7 +159,7 @@ input.addSink(new ElasticsearchSink(config, transportAddresses, new Elasticsearc ...@@ -159,7 +159,7 @@ input.addSink(new ElasticsearchSink(config, transportAddresses, new Elasticsearc
return Requests.indexRequest() return Requests.indexRequest()
.index("my-index") .index("my-index")
.type("my-type") .type("my-type")
.source(json); .source(json)
} }
})) }))
{% endhighlight %} {% endhighlight %}
...@@ -185,7 +185,7 @@ input.addSink(new ElasticsearchSink(config, transportAddresses, new Elasticsearc ...@@ -185,7 +185,7 @@ input.addSink(new ElasticsearchSink(config, transportAddresses, new Elasticsearc
return Requests.indexRequest() return Requests.indexRequest()
.index("my-index") .index("my-index")
.type("my-type") .type("my-type")
.source(json); .source(json)
} }
})) }))
{% endhighlight %} {% endhighlight %}
...@@ -298,7 +298,7 @@ input.addSink(new ElasticsearchSink(config, new ElasticsearchSinkFunction[String ...@@ -298,7 +298,7 @@ input.addSink(new ElasticsearchSink(config, new ElasticsearchSinkFunction[String
return Requests.indexRequest() return Requests.indexRequest()
.index("my-index") .index("my-index")
.type("my-type") .type("my-type")
.source(json); .source(json)
} }
})) }))
{% endhighlight %} {% endhighlight %}
......
...@@ -130,14 +130,14 @@ DataStream<String> stream = env ...@@ -130,14 +130,14 @@ DataStream<String> stream = env
</div> </div>
<div data-lang="scala" markdown="1"> <div data-lang="scala" markdown="1">
{% highlight scala %} {% highlight scala %}
val properties = new Properties(); val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092"); properties.setProperty("bootstrap.servers", "localhost:9092")
// only required for Kafka 0.8 // only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "localhost:2181"); properties.setProperty("zookeeper.connect", "localhost:2181")
properties.setProperty("group.id", "test"); properties.setProperty("group.id", "test")
stream = env stream = env
.addSource(new FlinkKafkaConsumer08[String]("topic", new SimpleStringSchema(), properties)) .addSource(new FlinkKafkaConsumer08[String]("topic", new SimpleStringSchema(), properties))
.print .print()
{% endhighlight %} {% endhighlight %}
</div> </div>
</div> </div>
...@@ -422,17 +422,17 @@ DataStream<String> stream = env ...@@ -422,17 +422,17 @@ DataStream<String> stream = env
</div> </div>
<div data-lang="scala" markdown="1"> <div data-lang="scala" markdown="1">
{% highlight scala %} {% highlight scala %}
val properties = new Properties(); val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092"); properties.setProperty("bootstrap.servers", "localhost:9092")
// only required for Kafka 0.8 // only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "localhost:2181"); properties.setProperty("zookeeper.connect", "localhost:2181")
properties.setProperty("group.id", "test"); properties.setProperty("group.id", "test")
val myConsumer = new FlinkKafkaConsumer08[String]("topic", new SimpleStringSchema(), properties); val myConsumer = new FlinkKafkaConsumer08[String]("topic", new SimpleStringSchema(), properties)
myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter()); myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter())
stream = env stream = env
.addSource(myConsumer) .addSource(myConsumer)
.print .print()
{% endhighlight %} {% endhighlight %}
</div> </div>
</div> </div>
......
...@@ -86,11 +86,11 @@ DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>( ...@@ -86,11 +86,11 @@ DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>(
</div> </div>
<div data-lang="scala" markdown="1"> <div data-lang="scala" markdown="1">
{% highlight scala %} {% highlight scala %}
val consumerConfig = new Properties(); val consumerConfig = new Properties()
consumerConfig.put(ConsumerConfigConstants.AWS_REGION, "us-east-1"); consumerConfig.put(ConsumerConfigConstants.AWS_REGION, "us-east-1")
consumerConfig.put(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id"); consumerConfig.put(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id")
consumerConfig.put(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key"); consumerConfig.put(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key")
consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST"); consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST")
val env = StreamExecutionEnvironment.getEnvironment val env = StreamExecutionEnvironment.getEnvironment
...@@ -295,28 +295,28 @@ simpleStringStream.addSink(kinesis); ...@@ -295,28 +295,28 @@ simpleStringStream.addSink(kinesis);
</div> </div>
<div data-lang="scala" markdown="1"> <div data-lang="scala" markdown="1">
{% highlight scala %} {% highlight scala %}
val producerConfig = new Properties(); val producerConfig = new Properties()
// Required configs // Required configs
producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1"); producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id"); producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id")
producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key"); producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key")
// Optional KPL configs // Optional KPL configs
producerConfig.put("AggregationMaxCount", "4294967295"); producerConfig.put("AggregationMaxCount", "4294967295")
producerConfig.put("CollectionMaxCount", "1000"); producerConfig.put("CollectionMaxCount", "1000")
producerConfig.put("RecordTtl", "30000"); producerConfig.put("RecordTtl", "30000")
producerConfig.put("RequestTimeout", "6000"); producerConfig.put("RequestTimeout", "6000")
producerConfig.put("ThreadPoolSize", "15"); producerConfig.put("ThreadPoolSize", "15")
// Switch KinesisProducer's threading model // Switch KinesisProducer's threading model
// producerConfig.put("ThreadingModel", "PER_REQUEST"); // producerConfig.put("ThreadingModel", "PER_REQUEST")
val kinesis = new FlinkKinesisProducer[String](new SimpleStringSchema, producerConfig); val kinesis = new FlinkKinesisProducer[String](new SimpleStringSchema, producerConfig)
kinesis.setFailOnError(true); kinesis.setFailOnError(true)
kinesis.setDefaultStream("kinesis_stream_name"); kinesis.setDefaultStream("kinesis_stream_name")
kinesis.setDefaultPartition("0"); kinesis.setDefaultPartition("0")
val simpleStringStream = ...; val simpleStringStream = ...
simpleStringStream.addSink(kinesis); simpleStringStream.addSink(kinesis)
{% endhighlight %} {% endhighlight %}
</div> </div>
</div> </div>
...@@ -359,11 +359,11 @@ producerConfig.put(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4567"); ...@@ -359,11 +359,11 @@ producerConfig.put(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4567");
</div> </div>
<div data-lang="scala" markdown="1"> <div data-lang="scala" markdown="1">
{% highlight scala %} {% highlight scala %}
val producerConfig = new Properties(); val producerConfig = new Properties()
producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1"); producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id"); producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id")
producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key"); producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key")
producerConfig.put(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4567"); producerConfig.put(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4567")
{% endhighlight %} {% endhighlight %}
</div> </div>
</div> </div>
......
...@@ -67,12 +67,12 @@ DataStream<String> streamSource = env.addSource(new TwitterSource(props)); ...@@ -67,12 +67,12 @@ DataStream<String> streamSource = env.addSource(new TwitterSource(props));
</div> </div>
<div data-lang="scala" markdown="1"> <div data-lang="scala" markdown="1">
{% highlight scala %} {% highlight scala %}
val props = new Properties(); val props = new Properties()
props.setProperty(TwitterSource.CONSUMER_KEY, ""); props.setProperty(TwitterSource.CONSUMER_KEY, "")
props.setProperty(TwitterSource.CONSUMER_SECRET, ""); props.setProperty(TwitterSource.CONSUMER_SECRET, "")
props.setProperty(TwitterSource.TOKEN, ""); props.setProperty(TwitterSource.TOKEN, "")
props.setProperty(TwitterSource.TOKEN_SECRET, ""); props.setProperty(TwitterSource.TOKEN_SECRET, "")
DataStream<String> streamSource = env.addSource(new TwitterSource(props)); val streamSource = env.addSource(new TwitterSource(props))
{% endhighlight %} {% endhighlight %}
</div> </div>
</div> </div>
......
...@@ -113,7 +113,7 @@ object WindowWordCount { ...@@ -113,7 +113,7 @@ object WindowWordCount {
.timeWindow(Time.seconds(5)) .timeWindow(Time.seconds(5))
.sum(1) .sum(1)
counts.print counts.print()
env.execute("Window Stream WordCount") env.execute("Window Stream WordCount")
} }
......
...@@ -154,7 +154,7 @@ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) ...@@ -154,7 +154,7 @@ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val stream: DataStream[MyEvent] = env.readFile( val stream: DataStream[MyEvent] = env.readFile(
myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100, myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
FilePathFilter.createDefaultFilter()); FilePathFilter.createDefaultFilter())
val withTimestampsAndWatermarks: DataStream[MyEvent] = stream val withTimestampsAndWatermarks: DataStream[MyEvent] = stream
.filter( _.severity == WARNING ) .filter( _.severity == WARNING )
...@@ -240,19 +240,19 @@ public class TimeLagWatermarkGenerator extends AssignerWithPeriodicWatermarks<My ...@@ -240,19 +240,19 @@ public class TimeLagWatermarkGenerator extends AssignerWithPeriodicWatermarks<My
*/ */
class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks[MyEvent] { class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {
val maxOutOfOrderness = 3500L; // 3.5 seconds val maxOutOfOrderness = 3500L // 3.5 seconds
var currentMaxTimestamp: Long; var currentMaxTimestamp: Long
override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = { override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
val timestamp = element.getCreationTime() val timestamp = element.getCreationTime()
currentMaxTimestamp = max(timestamp, currentMaxTimestamp) currentMaxTimestamp = max(timestamp, currentMaxTimestamp)
timestamp; timestamp
} }
override def getCurrentWatermark(): Watermark = { override def getCurrentWatermark(): Watermark = {
// return the watermark as current highest timestamp minus the out-of-orderness bound // return the watermark as current highest timestamp minus the out-of-orderness bound
new Watermark(currentMaxTimestamp - maxOutOfOrderness); new Watermark(currentMaxTimestamp - maxOutOfOrderness)
} }
} }
...@@ -262,7 +262,7 @@ class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks[MyEv ...@@ -262,7 +262,7 @@ class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks[MyEv
*/ */
class TimeLagWatermarkGenerator extends AssignerWithPeriodicWatermarks[MyEvent] { class TimeLagWatermarkGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {
val maxTimeLag = 5000L; // 5 seconds val maxTimeLag = 5000L // 5 seconds
override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = { override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
element.getCreationTime element.getCreationTime
......
...@@ -210,40 +210,40 @@ For a pattern named `start`, the following are valid quantifiers: ...@@ -210,40 +210,40 @@ For a pattern named `start`, the following are valid quantifiers:
start.times(4).optional() start.times(4).optional()
// expecting 2, 3 or 4 occurrences // expecting 2, 3 or 4 occurrences
start.times(2, 4); start.times(2, 4)
// expecting 2, 3 or 4 occurrences and repeating as many as possible // expecting 2, 3 or 4 occurrences and repeating as many as possible
start.times(2, 4).greedy(); start.times(2, 4).greedy()
// expecting 0, 2, 3 or 4 occurrences // expecting 0, 2, 3 or 4 occurrences
start.times(2, 4).optional(); start.times(2, 4).optional()
// expecting 0, 2, 3 or 4 occurrences and repeating as many as possible // expecting 0, 2, 3 or 4 occurrences and repeating as many as possible
start.times(2, 4).optional().greedy(); start.times(2, 4).optional().greedy()
// expecting 1 or more occurrences // expecting 1 or more occurrences
start.oneOrMore() start.oneOrMore()
// expecting 1 or more occurrences and repeating as many as possible // expecting 1 or more occurrences and repeating as many as possible
start.oneOrMore().greedy(); start.oneOrMore().greedy()
// expecting 0 or more occurrences // expecting 0 or more occurrences
start.oneOrMore().optional() start.oneOrMore().optional()
// expecting 0 or more occurrences and repeating as many as possible // expecting 0 or more occurrences and repeating as many as possible
start.oneOrMore().optional().greedy(); start.oneOrMore().optional().greedy()
// expecting 2 or more occurrences // expecting 2 or more occurrences
start.timesOrMore(2); start.timesOrMore(2)
// expecting 2 or more occurrences and repeating as many as possible // expecting 2 or more occurrences and repeating as many as possible
start.timesOrMore(2).greedy(); start.timesOrMore(2).greedy()
// expecting 0, 2 or more occurrences // expecting 0, 2 or more occurrences
start.timesOrMore(2).optional(); start.timesOrMore(2).optional()
// expecting 0, 2 or more occurrences and repeating as many as possible // expecting 0, 2 or more occurrences and repeating as many as possible
start.timesOrMore(2).optional().greedy(); start.timesOrMore(2).optional().greedy()
{% endhighlight %} {% endhighlight %}
</div> </div>
</div> </div>
...@@ -729,7 +729,7 @@ pattern.times(2) ...@@ -729,7 +729,7 @@ pattern.times(2)
<p>By default a relaxed internal contiguity (between subsequent events) is used. For more info on <p>By default a relaxed internal contiguity (between subsequent events) is used. For more info on
internal contiguity see <a href="#consecutive_java">consecutive</a>.</p> internal contiguity see <a href="#consecutive_java">consecutive</a>.</p>
{% highlight scala %} {% highlight scala %}
pattern.times(2, 4); pattern.times(2, 4)
{% endhighlight %} {% endhighlight %}
</td> </td>
</tr> </tr>
...@@ -765,7 +765,7 @@ pattern.oneOrMore().greedy() ...@@ -765,7 +765,7 @@ pattern.oneOrMore().greedy()
Pattern.begin("start").where(_.getName().equals("c")) Pattern.begin("start").where(_.getName().equals("c"))
.followedBy("middle").where(_.getName().equals("a")) .followedBy("middle").where(_.getName().equals("a"))
.oneOrMore().consecutive() .oneOrMore().consecutive()
.followedBy("end1").where(_.getName().equals("b")); .followedBy("end1").where(_.getName().equals("b"))
{% endhighlight %} {% endhighlight %}
<p>Will generate the following matches for an input sequence: C D A1 A2 A3 D A4 B</p> <p>Will generate the following matches for an input sequence: C D A1 A2 A3 D A4 B</p>
...@@ -786,7 +786,7 @@ Pattern.begin("start").where(_.getName().equals("c")) ...@@ -786,7 +786,7 @@ Pattern.begin("start").where(_.getName().equals("c"))
Pattern.begin("start").where(_.getName().equals("c")) Pattern.begin("start").where(_.getName().equals("c"))
.followedBy("middle").where(_.getName().equals("a")) .followedBy("middle").where(_.getName().equals("a"))
.oneOrMore().allowCombinations() .oneOrMore().allowCombinations()
.followedBy("end1").where(_.getName().equals("b")); .followedBy("end1").where(_.getName().equals("b"))
{% endhighlight %} {% endhighlight %}
<p>Will generate the following matches for an input sequence: C D A1 A2 A3 D A4 B</p> <p>Will generate the following matches for an input sequence: C D A1 A2 A3 D A4 B</p>
...@@ -1491,7 +1491,7 @@ val result: SingleOutputStreamOperator[ComplexEvent] = patternStream.select(outp ...@@ -1491,7 +1491,7 @@ val result: SingleOutputStreamOperator[ComplexEvent] = patternStream.select(outp
pattern: Map[String, Iterable[Event]] => ComplexEvent() pattern: Map[String, Iterable[Event]] => ComplexEvent()
} }
val timeoutResult: DataStream<TimeoutEvent> = result.getSideOutput(outputTag); val timeoutResult: DataStream<TimeoutEvent> = result.getSideOutput(outputTag)
~~~ ~~~
The `flatSelect` API call offers the same overloaded version which takes as the first parameter a timeout function and as second parameter a selection function. The `flatSelect` API call offers the same overloaded version which takes as the first parameter a timeout function and as second parameter a selection function.
...@@ -1510,7 +1510,7 @@ val result: SingleOutputStreamOperator[ComplexEvent] = patternStream.flatSelect( ...@@ -1510,7 +1510,7 @@ val result: SingleOutputStreamOperator[ComplexEvent] = patternStream.flatSelect(
out.collect(ComplexEvent()) out.collect(ComplexEvent())
} }
val timeoutResult: DataStream<TimeoutEvent> = result.getSideOutput(outputTag); val timeoutResult: DataStream<TimeoutEvent> = result.getSideOutput(outputTag)
~~~ ~~~
</div> </div>
......
...@@ -773,7 +773,7 @@ final class SelectLargeWeightNeighbors extends NeighborsFunctionWithVertexValue[ ...@@ -773,7 +773,7 @@ final class SelectLargeWeightNeighbors extends NeighborsFunctionWithVertexValue[
for (neighbor <- neighbors) { for (neighbor <- neighbors) {
if (neighbor._1.getValue() > 0.5) { if (neighbor._1.getValue() > 0.5) {
out.collect(vertex, neighbor._2); out.collect(vertex, neighbor._2)
} }
} }
} }
......
...@@ -55,7 +55,7 @@ val graph: Graph[java.lang.Long, java.lang.Long, NullValue] = ... ...@@ -55,7 +55,7 @@ val graph: Graph[java.lang.Long, java.lang.Long, NullValue] = ...
val verticesWithCommunity = graph.run(new LabelPropagation[java.lang.Long, java.lang.Long, NullValue](30)) val verticesWithCommunity = graph.run(new LabelPropagation[java.lang.Long, java.lang.Long, NullValue](30))
// print the result // print the result
verticesWithCommunity.print verticesWithCommunity.print()
{% endhighlight %} {% endhighlight %}
</div> </div>
......
...@@ -150,7 +150,7 @@ class AsyncDatabaseRequest extends AsyncFunction[String, (String, String)] { ...@@ -150,7 +150,7 @@ class AsyncDatabaseRequest extends AsyncFunction[String, (String, String)] {
// set the callback to be executed once the request by the client is complete // set the callback to be executed once the request by the client is complete
// the callback simply forwards the result to the result future // the callback simply forwards the result to the result future
resultFuture.onSuccess { resultFuture.onSuccess {
case result: String => resultFuture.complete(Iterable((str, result))); case result: String => resultFuture.complete(Iterable((str, result)))
} }
} }
} }
......
...@@ -61,7 +61,7 @@ val descriptor = new ListStateDescriptor[(String, Integer)]( ...@@ -61,7 +61,7 @@ val descriptor = new ListStateDescriptor[(String, Integer)](
new CustomTypeSerializer) new CustomTypeSerializer)
) )
checkpointedState = getRuntimeContext.getListState(descriptor); checkpointedState = getRuntimeContext.getListState(descriptor)
{% endhighlight %} {% endhighlight %}
</div> </div>
</div> </div>
......
...@@ -247,8 +247,8 @@ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100)); ...@@ -247,8 +247,8 @@ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100));
<div data-lang="scala" markdown="1"> <div data-lang="scala" markdown="1">
{% highlight scala %} {% highlight scala %}
env.enableCheckpointing(500); env.enableCheckpointing(500)
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100)); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100))
{% endhighlight %} {% endhighlight %}
</div> </div>
</div> </div>
......
...@@ -131,7 +131,7 @@ val result: Table = orders ...@@ -131,7 +131,7 @@ val result: Table = orders
.select('a.lowerCase(), 'b, 'rowtime) .select('a.lowerCase(), 'b, 'rowtime)
.window(Tumble over 1.hour on 'rowtime as 'hourlyWindow) .window(Tumble over 1.hour on 'rowtime as 'hourlyWindow)
.groupBy('hourlyWindow, 'a) .groupBy('hourlyWindow, 'a)
.select('a, 'hourlyWindow.end as 'hour, 'b.avg as 'avgBillingAmount); .select('a, 'hourlyWindow.end as 'hour, 'b.avg as 'avgBillingAmount)
{% endhighlight %} {% endhighlight %}
</div> </div>
...@@ -355,7 +355,7 @@ Table result = orders ...@@ -355,7 +355,7 @@ Table result = orders
</td> </td>
<td> <td>
<p>Similar to a SQL OVER clause. Over window aggregates are computed for each row, based on a window (range) of preceding and succeeding rows. See the <a href="#over-windows">over windows section</a> for more details.</p> <p>Similar to a SQL OVER clause. Over window aggregates are computed for each row, based on a window (range) of preceding and succeeding rows. See the <a href="#over-windows">over windows section</a> for more details.</p>
{% highlight scala %} {% highlight java %}
Table orders = tableEnv.scan("Orders"); Table orders = tableEnv.scan("Orders");
Table result = orders Table result = orders
// define window // define window
...@@ -364,8 +364,8 @@ Table result = orders ...@@ -364,8 +364,8 @@ Table result = orders
.orderBy("rowtime") .orderBy("rowtime")
.preceding("UNBOUNDED_RANGE") .preceding("UNBOUNDED_RANGE")
.following("CURRENT_RANGE") .following("CURRENT_RANGE")
.as("w") .as("w"))
.select("a, b.avg over w, b.max over w, b.min over w") // sliding aggregate .select("a, b.avg over w, b.max over w, b.min over w"); // sliding aggregate
{% endhighlight %} {% endhighlight %}
<p><b>Note:</b> All aggregates must be defined over the same window, i.e., same partitioning, sorting, and range. Currently, only windows with PRECEDING (UNBOUNDED and bounded) to CURRENT ROW range are supported. Ranges with FOLLOWING are not supported yet. ORDER BY must be specified on a single <a href="streaming.html#time-attributes">time attribute</a>.</p> <p><b>Note:</b> All aggregates must be defined over the same window, i.e., same partitioning, sorting, and range. Currently, only windows with PRECEDING (UNBOUNDED and bounded) to CURRENT ROW range are supported. Ranges with FOLLOWING are not supported yet. ORDER BY must be specified on a single <a href="streaming.html#time-attributes">time attribute</a>.</p>
</td> </td>
...@@ -448,7 +448,7 @@ val result: Table = orders ...@@ -448,7 +448,7 @@ val result: Table = orders
preceding UNBOUNDED_RANGE preceding UNBOUNDED_RANGE
following CURRENT_RANGE following CURRENT_RANGE
as 'w) as 'w)
.select('a, 'b.avg over 'w, 'b.max over 'w, 'b.min over 'w,) // sliding aggregate .select('a, 'b.avg over 'w, 'b.max over 'w, 'b.min over 'w) // sliding aggregate
{% endhighlight %} {% endhighlight %}
<p><b>Note:</b> All aggregates must be defined over the same window, i.e., same partitioning, sorting, and range. Currently, only windows with PRECEDING (UNBOUNDED and bounded) to CURRENT ROW range are supported. Ranges with FOLLOWING are not supported yet. ORDER BY must be specified on a single <a href="streaming.html#time-attributes">time attribute</a>.</p> <p><b>Note:</b> All aggregates must be defined over the same window, i.e., same partitioning, sorting, and range. Currently, only windows with PRECEDING (UNBOUNDED and bounded) to CURRENT ROW range are supported. Ranges with FOLLOWING are not supported yet. ORDER BY must be specified on a single <a href="streaming.html#time-attributes">time attribute</a>.</p>
</td> </td>
...@@ -614,9 +614,9 @@ Table result = orders ...@@ -614,9 +614,9 @@ Table result = orders
<td> <td>
<p>Similar to a SQL JOIN clause. Joins two tables. Both tables must have distinct field names and at least one equality join predicate must be defined through join operator or using a where or filter operator.</p> <p>Similar to a SQL JOIN clause. Joins two tables. Both tables must have distinct field names and at least one equality join predicate must be defined through join operator or using a where or filter operator.</p>
{% highlight scala %} {% highlight scala %}
val left = ds1.toTable(tableEnv, 'a, 'b, 'c); val left = ds1.toTable(tableEnv, 'a, 'b, 'c)
val right = ds2.toTable(tableEnv, 'd, 'e, 'f); val right = ds2.toTable(tableEnv, 'd, 'e, 'f)
val result = left.join(right).where('a === 'd).select('a, 'b, 'e); val result = left.join(right).where('a === 'd).select('a, 'b, 'e)
{% endhighlight %} {% endhighlight %}
</td> </td>
</tr> </tr>
...@@ -656,12 +656,12 @@ val fullOuterResult = left.fullOuterJoin(right, 'a === 'd).select('a, 'b, 'e) ...@@ -656,12 +656,12 @@ val fullOuterResult = left.fullOuterJoin(right, 'a === 'd).select('a, 'b, 'e)
<p><b>Note:</b> Currently, only <code>INNER</code> time-windowed joins are supported.</p> <p><b>Note:</b> Currently, only <code>INNER</code> time-windowed joins are supported.</p>
{% highlight scala %} {% highlight scala %}
val left = ds1.toTable(tableEnv, 'a, 'b, 'c, 'ltime.rowtime); val left = ds1.toTable(tableEnv, 'a, 'b, 'c, 'ltime.rowtime)
val right = ds2.toTable(tableEnv, 'd, 'e, 'f, 'rtime.rowtime); val right = ds2.toTable(tableEnv, 'd, 'e, 'f, 'rtime.rowtime)
val result = left.join(right) val result = left.join(right)
.where('a === 'd && 'ltime >= 'rtime - 5.minutes && 'ltime < 'rtime + 10.minutes) .where('a === 'd && 'ltime >= 'rtime - 5.minutes && 'ltime < 'rtime + 10.minutes)
.select('a, 'b, 'e, 'ltime); .select('a, 'b, 'e, 'ltime)
{% endhighlight %} {% endhighlight %}
</td> </td>
</tr> </tr>
...@@ -856,9 +856,9 @@ Table result = left.select("a, b, c").where("a.in(RightTable)"); ...@@ -856,9 +856,9 @@ Table result = left.select("a, b, c").where("a.in(RightTable)");
<td> <td>
<p>Similar to a SQL UNION clause. Unions two tables with duplicate records removed, both tables must have identical field types.</p> <p>Similar to a SQL UNION clause. Unions two tables with duplicate records removed, both tables must have identical field types.</p>
{% highlight scala %} {% highlight scala %}
val left = ds1.toTable(tableEnv, 'a, 'b, 'c); val left = ds1.toTable(tableEnv, 'a, 'b, 'c)
val right = ds2.toTable(tableEnv, 'a, 'b, 'c); val right = ds2.toTable(tableEnv, 'a, 'b, 'c)
val result = left.union(right); val result = left.union(right)
{% endhighlight %} {% endhighlight %}
</td> </td>
</tr> </tr>
...@@ -872,9 +872,9 @@ val result = left.union(right); ...@@ -872,9 +872,9 @@ val result = left.union(right);
<td> <td>
<p>Similar to a SQL UNION ALL clause. Unions two tables, both tables must have identical field types.</p> <p>Similar to a SQL UNION ALL clause. Unions two tables, both tables must have identical field types.</p>
{% highlight scala %} {% highlight scala %}
val left = ds1.toTable(tableEnv, 'a, 'b, 'c); val left = ds1.toTable(tableEnv, 'a, 'b, 'c)
val right = ds2.toTable(tableEnv, 'a, 'b, 'c); val right = ds2.toTable(tableEnv, 'a, 'b, 'c)
val result = left.unionAll(right); val result = left.unionAll(right)
{% endhighlight %} {% endhighlight %}
</td> </td>
</tr> </tr>
...@@ -887,9 +887,9 @@ val result = left.unionAll(right); ...@@ -887,9 +887,9 @@ val result = left.unionAll(right);
<td> <td>
<p>Similar to a SQL INTERSECT clause. Intersect returns records that exist in both tables. If a record is present in one or both tables more than once, it is returned just once, i.e., the resulting table has no duplicate records. Both tables must have identical field types.</p> <p>Similar to a SQL INTERSECT clause. Intersect returns records that exist in both tables. If a record is present in one or both tables more than once, it is returned just once, i.e., the resulting table has no duplicate records. Both tables must have identical field types.</p>
{% highlight scala %} {% highlight scala %}
val left = ds1.toTable(tableEnv, 'a, 'b, 'c); val left = ds1.toTable(tableEnv, 'a, 'b, 'c)
val right = ds2.toTable(tableEnv, 'e, 'f, 'g); val right = ds2.toTable(tableEnv, 'e, 'f, 'g)
val result = left.intersect(right); val result = left.intersect(right)
{% endhighlight %} {% endhighlight %}
</td> </td>
</tr> </tr>
...@@ -902,9 +902,9 @@ val result = left.intersect(right); ...@@ -902,9 +902,9 @@ val result = left.intersect(right);
<td> <td>
<p>Similar to a SQL INTERSECT ALL clause. IntersectAll returns records that exist in both tables. If a record is present in both tables more than once, it is returned as many times as it is present in both tables, i.e., the resulting table might have duplicate records. Both tables must have identical field types.</p> <p>Similar to a SQL INTERSECT ALL clause. IntersectAll returns records that exist in both tables. If a record is present in both tables more than once, it is returned as many times as it is present in both tables, i.e., the resulting table might have duplicate records. Both tables must have identical field types.</p>
{% highlight scala %} {% highlight scala %}
val left = ds1.toTable(tableEnv, 'a, 'b, 'c); val left = ds1.toTable(tableEnv, 'a, 'b, 'c)
val right = ds2.toTable(tableEnv, 'e, 'f, 'g); val right = ds2.toTable(tableEnv, 'e, 'f, 'g)
val result = left.intersectAll(right); val result = left.intersectAll(right)
{% endhighlight %} {% endhighlight %}
</td> </td>
</tr> </tr>
...@@ -917,9 +917,9 @@ val result = left.intersectAll(right); ...@@ -917,9 +917,9 @@ val result = left.intersectAll(right);
<td> <td>
<p>Similar to a SQL EXCEPT clause. Minus returns records from the left table that do not exist in the right table. Duplicate records in the left table are returned exactly once, i.e., duplicates are removed. Both tables must have identical field types.</p> <p>Similar to a SQL EXCEPT clause. Minus returns records from the left table that do not exist in the right table. Duplicate records in the left table are returned exactly once, i.e., duplicates are removed. Both tables must have identical field types.</p>
{% highlight scala %} {% highlight scala %}
val left = ds1.toTable(tableEnv, 'a, 'b, 'c); val left = ds1.toTable(tableEnv, 'a, 'b, 'c)
val right = ds2.toTable(tableEnv, 'a, 'b, 'c); val right = ds2.toTable(tableEnv, 'a, 'b, 'c)
val result = left.minus(right); val result = left.minus(right)
{% endhighlight %} {% endhighlight %}
</td> </td>
</tr> </tr>
...@@ -932,9 +932,9 @@ val result = left.minus(right); ...@@ -932,9 +932,9 @@ val result = left.minus(right);
<td> <td>
<p>Similar to a SQL EXCEPT ALL clause. MinusAll returns the records that do not exist in the right table. A record that is present n times in the left table and m times in the right table is returned (n - m) times, i.e., as many duplicates as are present in the right table are removed. Both tables must have identical field types.</p> <p>Similar to a SQL EXCEPT ALL clause. MinusAll returns the records that do not exist in the right table. A record that is present n times in the left table and m times in the right table is returned (n - m) times, i.e., as many duplicates as are present in the right table are removed. Both tables must have identical field types.</p>
{% highlight scala %} {% highlight scala %}
val left = ds1.toTable(tableEnv, 'a, 'b, 'c); val left = ds1.toTable(tableEnv, 'a, 'b, 'c)
val right = ds2.toTable(tableEnv, 'a, 'b, 'c); val right = ds2.toTable(tableEnv, 'a, 'b, 'c)
val result = left.minusAll(right); val result = left.minusAll(right)
{% endhighlight %} {% endhighlight %}
</td> </td>
</tr> </tr>
...@@ -947,9 +947,9 @@ val result = left.minusAll(right); ...@@ -947,9 +947,9 @@ val result = left.minusAll(right);
<td> <td>
<p>Similar to a SQL IN clause. In returns true if an expression exists in a given table sub-query. The sub-query table must consist of one column. This column must have the same data type as the expression.</p> <p>Similar to a SQL IN clause. In returns true if an expression exists in a given table sub-query. The sub-query table must consist of one column. This column must have the same data type as the expression.</p>
{% highlight scala %} {% highlight scala %}
val left = ds1.toTable(tableEnv, 'a, 'b, 'c); val left = ds1.toTable(tableEnv, 'a, 'b, 'c)
val right = ds2.toTable(tableEnv, 'a); val right = ds2.toTable(tableEnv, 'a)
val result = left.select('a, 'b, 'c).where('a.in(right)); val result = left.select('a, 'b, 'c).where('a.in(right))
{% endhighlight %} {% endhighlight %}
</td> </td>
</tr> </tr>
...@@ -1030,8 +1030,8 @@ Table result3 = in.orderBy("a.asc").offset(10).fetch(5); ...@@ -1030,8 +1030,8 @@ Table result3 = in.orderBy("a.asc").offset(10).fetch(5);
<td> <td>
<p>Similar to a SQL ORDER BY clause. Returns records globally sorted across all parallel partitions.</p> <p>Similar to a SQL ORDER BY clause. Returns records globally sorted across all parallel partitions.</p>
{% highlight scala %} {% highlight scala %}
val in = ds.toTable(tableEnv, 'a, 'b, 'c); val in = ds.toTable(tableEnv, 'a, 'b, 'c)
val result = in.orderBy('a.asc); val result = in.orderBy('a.asc)
{% endhighlight %} {% endhighlight %}
</td> </td>
</tr> </tr>
......
...@@ -93,7 +93,7 @@ myTable.select('string, hashCode('string)) ...@@ -93,7 +93,7 @@ myTable.select('string, hashCode('string))
// register and use the function in SQL // register and use the function in SQL
tableEnv.registerFunction("hashCode", new HashCode(10)) tableEnv.registerFunction("hashCode", new HashCode(10))
tableEnv.sqlQuery("SELECT string, HASHCODE(string) FROM MyTable"); tableEnv.sqlQuery("SELECT string, HASHCODE(string) FROM MyTable")
{% endhighlight %} {% endhighlight %}
</div> </div>
</div> </div>
...@@ -198,17 +198,17 @@ val myTable = ... // table schema: [a: String] ...@@ -198,17 +198,17 @@ val myTable = ... // table schema: [a: String]
// Use the table function in the Scala Table API (Note: No registration required in Scala Table API). // Use the table function in the Scala Table API (Note: No registration required in Scala Table API).
val split = new Split("#") val split = new Split("#")
// "as" specifies the field names of the generated table. // "as" specifies the field names of the generated table.
myTable.join(split('a) as ('word, 'length)).select('a, 'word, 'length); myTable.join(split('a) as ('word, 'length)).select('a, 'word, 'length)
myTable.leftOuterJoin(split('a) as ('word, 'length)).select('a, 'word, 'length); myTable.leftOuterJoin(split('a) as ('word, 'length)).select('a, 'word, 'length)
// Register the table function to use it in SQL queries. // Register the table function to use it in SQL queries.
tableEnv.registerFunction("split", new Split("#")) tableEnv.registerFunction("split", new Split("#"))
// Use the table function in SQL with LATERAL and TABLE keywords. // Use the table function in SQL with LATERAL and TABLE keywords.
// CROSS JOIN a table function (equivalent to "join" in Table API) // CROSS JOIN a table function (equivalent to "join" in Table API)
tableEnv.sqlQuery("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as T(word, length)"); tableEnv.sqlQuery("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as T(word, length)")
// LEFT JOIN a table function (equivalent to "leftOuterJoin" in Table API) // LEFT JOIN a table function (equivalent to "leftOuterJoin" in Table API)
tableEnv.sqlQuery("SELECT a, word, length FROM MyTable LEFT JOIN TABLE(split(a)) as T(word, length) ON TRUE"); tableEnv.sqlQuery("SELECT a, word, length FROM MyTable LEFT JOIN TABLE(split(a)) as T(word, length) ON TRUE")
{% endhighlight %} {% endhighlight %}
**IMPORTANT:** Do not implement TableFunction as a Scala object. Scala object is a singleton and will cause concurrency issues. **IMPORTANT:** Do not implement TableFunction as a Scala object. Scala object is a singleton and will cause concurrency issues.
</div> </div>
...@@ -723,7 +723,7 @@ tableEnv.sqlQuery("SELECT string, HASHCODE(string) FROM MyTable"); ...@@ -723,7 +723,7 @@ tableEnv.sqlQuery("SELECT string, HASHCODE(string) FROM MyTable");
{% highlight scala %} {% highlight scala %}
object hashCode extends ScalarFunction { object hashCode extends ScalarFunction {
var hashcode_factor = 12; var hashcode_factor = 12
override def open(context: FunctionContext): Unit = { override def open(context: FunctionContext): Unit = {
// access "hashcode_factor" parameter // access "hashcode_factor" parameter
...@@ -743,7 +743,7 @@ myTable.select('string, hashCode('string)) ...@@ -743,7 +743,7 @@ myTable.select('string, hashCode('string))
// register and use the function in SQL // register and use the function in SQL
tableEnv.registerFunction("hashCode", hashCode) tableEnv.registerFunction("hashCode", hashCode)
tableEnv.sqlQuery("SELECT string, HASHCODE(string) FROM MyTable"); tableEnv.sqlQuery("SELECT string, HASHCODE(string) FROM MyTable")
{% endhighlight %} {% endhighlight %}
</div> </div>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册