提交 6dfb3fad 编写于 作者: M mbalassi

[FLINK-1173] [streaming] SocketTextStream minor fixes + documentation

This closes #204
上级 80416acb
......@@ -44,15 +44,14 @@ public class StreamingWordCount {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
DataStream<Tuple2<String, Integer>> dataStream = env
.fromElements("Who's there?",
"I think I hear them. Stand, ho! Who's there?")
.socketTextStream("localhost", 9999)
.flatMap(new Splitter())
.groupBy(0)
.sum(1);
dataStream.print();
env.execute();
env.execute("Socket Stream WordCount");
}
public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
......@@ -67,6 +66,14 @@ public class StreamingWordCount {
}
~~~
To run the example program start the input stream with netcat first from a terminal:
~~~batch
nc -lk 9999
~~~
The lines typed to this terminal are submitted as a source for your streaming job.
[Back to top](#top)
Program Skeleton
......@@ -91,7 +98,7 @@ StreamExecutionEnvironment.createRemoteEnvironment(params…)
For connecting to data streams the `StreamExecutionEnvironment` has many different methods, from basic file sources to completely general user defined data sources. We will go into details in the [basics](#basics) section.
~~~java
env.readTextFile(filePath)
env.socketTextStream(host, port)
~~~
After defining the data stream sources, the user can specify transformations on the data streams to create a new data stream. Different data streams can be also combined together for joint transformations which are being showcased in the [operations](#operations) section.
......@@ -106,10 +113,10 @@ The processed data can be pushed to different outputs called sinks. The user can
dataStream.writeAsCsv(path)
~~~
Once the complete program is specified `execute()` needs to be called on the `StreamExecutionEnvironment`. This will either execute on the local machine or submit the program for execution on a cluster, depending on the chosen execution environment.
Once the complete program is specified `execute(programName)` is to be called on the `StreamExecutionEnvironment`. This will either execute on the local machine or submit the program for execution on a cluster, depending on the chosen execution environment.
~~~java
env.execute()
env.execute(programName)
~~~
[Back to top](#top)
......@@ -142,16 +149,18 @@ Usage: `operator.setParallelism(1)`
### Sources
The user can connect to data streams by the different implemenations of `DataStreamSource` using methods provided in `StreamExecutionEnvironment`. There are several predefined ones similar to the ones provided by the batch API like:
The user can connect to data streams by the different implemenations of `DataStreamSource` using methods provided by the `StreamExecutionEnvironment`. There are several predefined ones similar to the ones of the batch API and some streaming specific ones like:
* `env.genereateSequence(from, to)`
* `env.fromElements(elements…)`
* `env.fromCollection(collection)`
* `env.readTextFile(filepath)`
* `socketTextStream(hostname, port)`
* `readTextStream(filepath)`
* `genereateSequence(from, to)`
* `fromElements(elements…)`
* `fromCollection(collection)`
* `readTextFile(filepath)`
These can be used to easily test and debug streaming programs. There are also some streaming specific sources for example `env.readTextStream(filepath)` which iterates over the same file infinitely providing yet another nice testing tool.
There are implemented connectors for a number of the most popular message queue services, please refer to the section on [connectors](#stream-connectors) for more detail.
Besides the pre-defined solutions the user can implement their own source by implementing the `SourceFunction` interface and using the `env.addSource(sourceFunction)` method of the `StreamExecutionEnvironment`.
These can be used to easily test and debug streaming programs.
There are pre-implemented connectors for a number of the most popular message queue services, please refer to the section on [connectors](#stream-connectors) for more detail.
Besides the pre-defined solutions the user can implement their own source by implementing the `SourceFunction` interface and using the `addSource(sourceFunction)` method of the `StreamExecutionEnvironment`.
### Sinks
......@@ -326,7 +335,7 @@ dataStream1.connect(dataStream2)
})
~~~
#### winddowReduceGroup on ConnectedDataStream
#### windowReduceGroup on ConnectedDataStream
The windowReduceGroup operator applies a user defined `CoGroupFunction` to time aligned windows of the two data streams and return zero or more elements of an arbitrary type. The user can define the window and slide intervals and can also implement custom timestamps to be used for calculating windows.
~~~java
......@@ -404,6 +413,7 @@ DataStream<Integer> tail = head.map(new IterationTail());
iteration.closeWith(tail);
~~~
Or to use with output splitting:
~~~java
SplitDataStream<Integer> tail = head.map(new IterationTail()).split(outputSelector);
iteration.closeWith(tail.select("iterate"));
......@@ -449,6 +459,7 @@ env.genereateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);
Most operators allow setting mutability for reading input data. If the operator is set mutable then the variable used to store input data for operators will be reused in a mutable fashion to avoid excessive object creation. By default, all operators are set to immutable.
Usage:
~~~java
operator.setMutability(isMutable)
~~~
......
......@@ -280,7 +280,20 @@ public abstract class StreamExecutionEnvironment {
public DataStreamSource<String> socketTextStream(String hostname, int port, char delimiter) {
return addSource(new SocketTextStreamFunction(hostname, port, delimiter));
}
/**
* Creates a new DataStream that contains the strings received infinitely
* from socket. Received strings are decoded by the system's default
* character set, uses '\n' as delimiter.
*
* @param hostname
* The host name which a server socket bind.
* @param port
* The port number which a server socket bind. A port number of
* 0 means that the port number is automatically allocated.
* @return A DataStream, containing the strings received from socket.
*/
public DataStreamSource<String> socketTextStream(String hostname, int port) {
return socketTextStream(hostname, port, '\n');
}
......
......@@ -17,18 +17,22 @@
package org.apache.flink.streaming.api.function.source;
import org.apache.flink.util.Collector;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.net.Socket;
public class SocketTextStreamFunction implements SourceFunction<String> {
private static final long serialVersionUID = 1L;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
public class SocketTextStreamFunction extends RichSourceFunction<String> {
private static final long serialVersionUID = 1L;
private String hostname;
private int port;
private char delimiter;
private Socket socket;
private static final int CONNECTION_TIMEOUT_TIME = 0;
public SocketTextStreamFunction(String hostname, int port, char delimiter) {
this.hostname = hostname;
......@@ -36,9 +40,16 @@ public class SocketTextStreamFunction implements SourceFunction<String> {
this.delimiter = delimiter;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
socket = new Socket();
socket.connect(new InetSocketAddress(hostname, port), CONNECTION_TIMEOUT_TIME);
}
@Override
public void invoke(Collector<String> collector) throws Exception {
Socket socket = new Socket(hostname, port);
while (!socket.isClosed() && socket.isConnected()) {
streamFromSocket(collector, socket);
}
......@@ -66,4 +77,10 @@ public class SocketTextStreamFunction implements SourceFunction<String> {
collector.collect(buffer.toString());
}
}
@Override
public void close() throws Exception {
socket.close();
super.close();
}
}
......@@ -242,16 +242,13 @@ under the License.
<archive>
<manifestEntries>
<program-class>org.apache.flink.streaming.examples.wordcount.SocketTextStreamWordCount</program-class>
<program-class>org.apache.flink.streaming.examples.socket.SocketTextStreamWordCount</program-class>
</manifestEntries>
</archive>
<includes>
<include>org/apache/flink/streaming/examples/wordcount/SocketTextStreamWordCount.class</include>
<include>org/apache/flink/streaming/examples/wordcount/SocketTextStreamWordCount$*.class</include>
<include>org/apache/flink/streaming/examples/wordcount/WordCount.class</include>
<include>org/apache/flink/streaming/examples/wordcount/WordCount$*.class</include>
<include>org/apache/flink/examples/java/wordcount/util/WordCountData.class</include>
<include>org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.class</include>
<include>org/apache/flink/streaming/examples/wordcount/WordCount$Tokenizer.class</include>
</includes>
</configuration>
</execution>
......
......@@ -15,11 +15,12 @@
* limitations under the License.
*/
package org.apache.flink.streaming.examples.wordcount;
package org.apache.flink.streaming.examples.socket;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.examples.wordcount.WordCount.Tokenizer;
/**
* This example shows an implementation of WordCount with data from socket.
......@@ -50,7 +51,7 @@ public class SocketTextStreamWordCount {
DataStream<Tuple2<String, Integer>> counts =
// split up the lines in pairs (2-tuples) containing: (word,1)
text.flatMap(new WordCount.Tokenizer())
text.flatMap(new Tokenizer())
// group by the tuple field "0" and sum up tuple field "1"
.groupBy(0).sum(1);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册