diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java index ec32e9f41517a831def4557c6129e2e735f2e0be..e9b60f48b3cf60589185708a8b3f37870e216efa 100644 --- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java +++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java @@ -23,11 +23,21 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.examples.wordcount.WordCount.Tokenizer; /** - * This example shows an implementation of WordCount with data from socket. - * + * This example shows an implementation of WordCount with data from a text + * socket. To run the example make sure that the service providing the text data + * is already up and running. + * *

- * Usage: SocketTextStreamWordCount <hostname> <port > <result path>
- * + * To start an example socket text stream on your local machine run netcat from + * a command line: nc -lk 9999, where the parameter specifies the + * port number. + * + * + *

+ * Usage: + * SocketTextStreamWordCount <hostname> <port> <result path> + *
+ * *

* This example shows how to: *

+ * + * @see netcat */ public class SocketTextStreamWordCount { public static void main(String[] args) throws Exception { @@ -44,16 +56,18 @@ public class SocketTextStreamWordCount { } // set up the execution environment - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + final StreamExecutionEnvironment env = StreamExecutionEnvironment + .getExecutionEnvironment(); // get input data - DataStream text = env.socketTextStream(hostname, port); + DataStream text = env.socketTextStream(hostName, port); DataStream> counts = // split up the lines in pairs (2-tuples) containing: (word,1) text.flatMap(new Tokenizer()) // group by the tuple field "0" and sum up tuple field "1" - .groupBy(0).sum(1); + .groupBy(0) + .sum(1); if (fileOutput) { counts.writeAsText(outputPath, 1); @@ -62,7 +76,7 @@ public class SocketTextStreamWordCount { } // execute program - env.execute("WordCount with SocketTextStream Example"); + env.execute("WordCount from SocketTextStream Example"); } // ************************************************************************* @@ -70,30 +84,23 @@ public class SocketTextStreamWordCount { // ************************************************************************* private static boolean fileOutput = false; - private static String hostname; + private static String hostName; private static int port; private static String outputPath; private static boolean parseParameters(String[] args) { - if (args.length > 0) { - // parse input arguments - if (args.length == 3) { - fileOutput = true; - hostname = args[0]; - port = Integer.valueOf(args[1]); - outputPath = args[2]; - } else if (args.length == 2) { - hostname = args[0]; - port = Integer.valueOf(args[1]); - } else { - System.err.println("Usage: SocketTextStreamWordCount "); - return false; - } + // parse input arguments + if (args.length == 3) { + fileOutput = true; + hostName = args[0]; + port = Integer.valueOf(args[1]); + outputPath = args[2]; + } else if (args.length == 2) { + hostName = args[0]; + port = Integer.valueOf(args[1]); } else { - System.out.println("Executing WordCount example with data from socket."); - System.out.println(" Provide parameters to connect data source."); - System.out.println(" Usage: SocketTextStreamWordCount "); + System.err.println("Usage: SocketTextStreamWordCount []"); return false; } return true; diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketTextStreamWordCount.scala b/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketTextStreamWordCount.scala new file mode 100644 index 0000000000000000000000000000000000000000..b38764c46875ebfabe23e98117eaf2dde30ddfcf --- /dev/null +++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketTextStreamWordCount.scala @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.scala.examples.socket + +import org.apache.flink.streaming.api.scala._ + +/** + * This example shows an implementation of WordCount with data from a text socket. + * To run the example make sure that the service providing the text data is already up and running. + * + * To start an example socket text stream on your local machine run netcat from a command line, + * where the parameter specifies the port number: + * + * {{{ + * nc -lk 9999 + * }}} + * + * Usage: + * {{{ + * SocketTextStreamWordCount + * }}} + * + * This example shows how to: + * + * - use StreamExecutionEnvironment.socketTextStream + * - write a simple Flink Streaming program in scala. + * - write and use user-defined functions. + */ +object SocketTextStreamWordCount { + + def main(args: Array[String]) { + if (!parseParameters(args)) { + return + } + + val env = StreamExecutionEnvironment.getExecutionEnvironment + + //Create streams for names and ages by mapping the inputs to the corresponding objects + val text = env.socketTextStream(hostName, port) + val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } } + .map { (_, 1) } + .groupBy(0) + .sum(1) + + if (fileOutput) { + counts.writeAsCsv(outputPath, 1) + } else { + counts print + } + + env.execute("Scala SocketTextStreamWordCount Example") + } + + private def parseParameters(args: Array[String]): Boolean = { + if (args.length == 3) { + fileOutput = true + hostName = args(0) + port = args(1).toInt + outputPath = args(2) + } else if (args.length == 2) { + hostName = args(0) + port = args(1).toInt + } else { + System.err.println("Usage: SocketTextStreamWordCount []") + return false + } + true + } + + private var fileOutput: Boolean = false + private var hostName: String = null + private var port: Int = 0 + private var outputPath: String = null + +} diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala b/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala index a43f4798b697a48e669ac526688b0bdc195e4e68..e3ef95ec53048fb7e4ca46a3ccc0d03e0ec367a5 100644 --- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala +++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala @@ -18,12 +18,11 @@ package org.apache.flink.streaming.scala.examples.windowing - import java.util.concurrent.TimeUnit._ -import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.scala.windowing.{Delta, Time} -import org.apache.flink.api.scala._ + import scala.Stream._ import scala.math._ import scala.util.Random diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowJoin.scala b/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowJoin.scala index d6c0363527c91823cb032054e3e05cf6dc3094f7..0b783650eaf316b4278b8a30c399583d7f657a72 100644 --- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowJoin.scala +++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowJoin.scala @@ -18,8 +18,7 @@ package org.apache.flink.streaming.scala.examples.windowing -import org.apache.flink.api.scala._ -import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.api.scala._ import scala.Stream._ import scala.util.Random @@ -39,11 +38,13 @@ object WindowJoin { val names = env.fromCollection(nameStream).map(x => Name(x._1, x._2)) val ages = env.fromCollection(ageStream).map(x => Age(x._1, x._2)) - //Join the two input streams by id on the last second every 2 seconds and create new + //Join the two input streams by id on the last 2 seconds every second and create new //Person objects containing both name and age val joined = - names.join(ages).onWindow(1, TimeUnit.SECONDS).every(2, TimeUnit.SECONDS) - .where("id").equalTo("id") { (n, a) => Person(n.name, a.age) } + names.join(ages).onWindow(2, TimeUnit.SECONDS) + .every(1, TimeUnit.SECONDS) + .where("id") + .equalTo("id") { (n, a) => Person(n.name, a.age) } joined print diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala index 6d94de72e0638b819194679517bc780dfac3ad97..ffe91cb1eab093ebf1d31b127ceaa53803037efb 100644 --- a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala +++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala @@ -214,8 +214,8 @@ class DataStream[T](javaStream: JavaStream[T]) { * * */ - def iterate(stepFunction: DataStream[T] => (DataStream[T], DataStream[T]), maxWaitTimeMillis: - Long = 0): DataStream[T] = { + def iterate[R](stepFunction: DataStream[T] => (DataStream[T], DataStream[R]), + maxWaitTimeMillis:Long = 0): DataStream[R] = { val iterativeStream = javaStream.iterate(maxWaitTimeMillis) val (feedback, output) = stepFunction(new DataStream[T](iterativeStream))