提交 b22406a6 编写于 作者: M mbalassi

[streaming] [scala] scala SocketTextStream added and minor fixes

Organized imports for streaming scala examples
Added template parameter for scala streaming iterate
Minor fixes in streaming examples
上级 19066b52
...@@ -23,11 +23,21 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; ...@@ -23,11 +23,21 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.examples.wordcount.WordCount.Tokenizer; import org.apache.flink.streaming.examples.wordcount.WordCount.Tokenizer;
/** /**
* This example shows an implementation of WordCount with data from socket. * This example shows an implementation of WordCount with data from a text
* * socket. To run the example make sure that the service providing the text data
* is already up and running.
*
* <p> * <p>
* Usage: <code>SocketTextStreamWordCount &lt;hostname&gt; &lt;port &gt; &lt;result path&gt;</code><br> * To start an example socket text stream on your local machine run netcat from
* * a command line: <code>nc -lk 9999</code>, where the parameter specifies the
* port number.
*
*
* <p>
* Usage:
* <code>SocketTextStreamWordCount &lt;hostname&gt; &lt;port&gt; &lt;result path&gt;</code>
* <br>
*
* <p> * <p>
* This example shows how to: * This example shows how to:
* <ul> * <ul>
...@@ -35,6 +45,8 @@ import org.apache.flink.streaming.examples.wordcount.WordCount.Tokenizer; ...@@ -35,6 +45,8 @@ import org.apache.flink.streaming.examples.wordcount.WordCount.Tokenizer;
* <li>write a simple Flink program, * <li>write a simple Flink program,
* <li>write and use user-defined functions. * <li>write and use user-defined functions.
* </ul> * </ul>
*
* @see <a href="www.openbsd.org/cgi-bin/man.cgi?query=nc">netcat</a>
*/ */
public class SocketTextStreamWordCount { public class SocketTextStreamWordCount {
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
...@@ -44,16 +56,18 @@ public class SocketTextStreamWordCount { ...@@ -44,16 +56,18 @@ public class SocketTextStreamWordCount {
} }
// set up the execution environment // set up the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final StreamExecutionEnvironment env = StreamExecutionEnvironment
.getExecutionEnvironment();
// get input data // get input data
DataStream<String> text = env.socketTextStream(hostname, port); DataStream<String> text = env.socketTextStream(hostName, port);
DataStream<Tuple2<String, Integer>> counts = DataStream<Tuple2<String, Integer>> counts =
// split up the lines in pairs (2-tuples) containing: (word,1) // split up the lines in pairs (2-tuples) containing: (word,1)
text.flatMap(new Tokenizer()) text.flatMap(new Tokenizer())
// group by the tuple field "0" and sum up tuple field "1" // group by the tuple field "0" and sum up tuple field "1"
.groupBy(0).sum(1); .groupBy(0)
.sum(1);
if (fileOutput) { if (fileOutput) {
counts.writeAsText(outputPath, 1); counts.writeAsText(outputPath, 1);
...@@ -62,7 +76,7 @@ public class SocketTextStreamWordCount { ...@@ -62,7 +76,7 @@ public class SocketTextStreamWordCount {
} }
// execute program // execute program
env.execute("WordCount with SocketTextStream Example"); env.execute("WordCount from SocketTextStream Example");
} }
// ************************************************************************* // *************************************************************************
...@@ -70,30 +84,23 @@ public class SocketTextStreamWordCount { ...@@ -70,30 +84,23 @@ public class SocketTextStreamWordCount {
// ************************************************************************* // *************************************************************************
private static boolean fileOutput = false; private static boolean fileOutput = false;
private static String hostname; private static String hostName;
private static int port; private static int port;
private static String outputPath; private static String outputPath;
private static boolean parseParameters(String[] args) { private static boolean parseParameters(String[] args) {
if (args.length > 0) { // parse input arguments
// parse input arguments if (args.length == 3) {
if (args.length == 3) { fileOutput = true;
fileOutput = true; hostName = args[0];
hostname = args[0]; port = Integer.valueOf(args[1]);
port = Integer.valueOf(args[1]); outputPath = args[2];
outputPath = args[2]; } else if (args.length == 2) {
} else if (args.length == 2) { hostName = args[0];
hostname = args[0]; port = Integer.valueOf(args[1]);
port = Integer.valueOf(args[1]);
} else {
System.err.println("Usage: SocketTextStreamWordCount <hostname> <port> <output path>");
return false;
}
} else { } else {
System.out.println("Executing WordCount example with data from socket."); System.err.println("Usage: SocketTextStreamWordCount <hostname> <port> [<output path>]");
System.out.println(" Provide parameters to connect data source.");
System.out.println(" Usage: SocketTextStreamWordCount <hostname> <port> <output path>");
return false; return false;
} }
return true; return true;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.scala.examples.socket
import org.apache.flink.streaming.api.scala._
/**
* This example shows an implementation of WordCount with data from a text socket.
* To run the example make sure that the service providing the text data is already up and running.
*
* To start an example socket text stream on your local machine run netcat from a command line,
* where the parameter specifies the port number:
*
* {{{
* nc -lk 9999
* }}}
*
* Usage:
* {{{
* SocketTextStreamWordCount <hostname> <port> <output path>
* }}}
*
* This example shows how to:
*
* - use StreamExecutionEnvironment.socketTextStream
* - write a simple Flink Streaming program in scala.
* - write and use user-defined functions.
*/
object SocketTextStreamWordCount {
def main(args: Array[String]) {
if (!parseParameters(args)) {
return
}
val env = StreamExecutionEnvironment.getExecutionEnvironment
//Create streams for names and ages by mapping the inputs to the corresponding objects
val text = env.socketTextStream(hostName, port)
val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
.map { (_, 1) }
.groupBy(0)
.sum(1)
if (fileOutput) {
counts.writeAsCsv(outputPath, 1)
} else {
counts print
}
env.execute("Scala SocketTextStreamWordCount Example")
}
private def parseParameters(args: Array[String]): Boolean = {
if (args.length == 3) {
fileOutput = true
hostName = args(0)
port = args(1).toInt
outputPath = args(2)
} else if (args.length == 2) {
hostName = args(0)
port = args(1).toInt
} else {
System.err.println("Usage: SocketTextStreamWordCount <hostname> <port> [<output path>]")
return false
}
true
}
private var fileOutput: Boolean = false
private var hostName: String = null
private var port: Int = 0
private var outputPath: String = null
}
...@@ -18,12 +18,11 @@ ...@@ -18,12 +18,11 @@
package org.apache.flink.streaming.scala.examples.windowing package org.apache.flink.streaming.scala.examples.windowing
import java.util.concurrent.TimeUnit._ import java.util.concurrent.TimeUnit._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.windowing.{Delta, Time} import org.apache.flink.streaming.api.scala.windowing.{Delta, Time}
import org.apache.flink.api.scala._
import scala.Stream._ import scala.Stream._
import scala.math._ import scala.math._
import scala.util.Random import scala.util.Random
......
...@@ -18,8 +18,7 @@ ...@@ -18,8 +18,7 @@
package org.apache.flink.streaming.scala.examples.windowing package org.apache.flink.streaming.scala.examples.windowing
import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import scala.Stream._ import scala.Stream._
import scala.util.Random import scala.util.Random
...@@ -39,11 +38,13 @@ object WindowJoin { ...@@ -39,11 +38,13 @@ object WindowJoin {
val names = env.fromCollection(nameStream).map(x => Name(x._1, x._2)) val names = env.fromCollection(nameStream).map(x => Name(x._1, x._2))
val ages = env.fromCollection(ageStream).map(x => Age(x._1, x._2)) val ages = env.fromCollection(ageStream).map(x => Age(x._1, x._2))
//Join the two input streams by id on the last second every 2 seconds and create new //Join the two input streams by id on the last 2 seconds every second and create new
//Person objects containing both name and age //Person objects containing both name and age
val joined = val joined =
names.join(ages).onWindow(1, TimeUnit.SECONDS).every(2, TimeUnit.SECONDS) names.join(ages).onWindow(2, TimeUnit.SECONDS)
.where("id").equalTo("id") { (n, a) => Person(n.name, a.age) } .every(1, TimeUnit.SECONDS)
.where("id")
.equalTo("id") { (n, a) => Person(n.name, a.age) }
joined print joined print
......
...@@ -214,8 +214,8 @@ class DataStream[T](javaStream: JavaStream[T]) { ...@@ -214,8 +214,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
* *
* *
*/ */
def iterate(stepFunction: DataStream[T] => (DataStream[T], DataStream[T]), maxWaitTimeMillis: def iterate[R](stepFunction: DataStream[T] => (DataStream[T], DataStream[R]),
Long = 0): DataStream[T] = { maxWaitTimeMillis:Long = 0): DataStream[R] = {
val iterativeStream = javaStream.iterate(maxWaitTimeMillis) val iterativeStream = javaStream.iterate(maxWaitTimeMillis)
val (feedback, output) = stepFunction(new DataStream[T](iterativeStream)) val (feedback, output) = stepFunction(new DataStream[T](iterativeStream))
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册