@@ -299,20 +299,45 @@ The user is expected to connect to the outside world through the source and the
#### Sources
The user can create(connect to) data streams by the different implementations of `SourceFunction` interface using `StreamExecutionEnvironment.addSource(sourceFunction)`. By default, sources run with parallelism of 1.
To create parallel sources the users source function needs to implement `ParallelSourceFunction` or extend `RichParallelSourceFunction` in which cases the source will have the parallelism of the environment. The parallelism for ParallelSourceFunctions can be changed afterwards using `source.setParallelism(parallelism)`.
Sources can by created by using `StreamExecutionEnvironment.addSource(sourceFunction)`.
Either use one of the source functions that come with Flink or write a custom source
by implementing the `SourceFunction` interface. By default, sources run with
parallelism of 1. To create parallel sources the users source function needs to implement
`ParallelSourceFunction` or extend `RichParallelSourceFunction` in which cases the source will have
the parallelism of the environment. The parallelism for ParallelSourceFunctions can be changed
after creation by using `source.setParallelism(parallelism)`.
The `SourceFunction` interface has two methods: `reachedEnd()` and `next()`. The former is used
by the system to determine whether more input data is available. This method can block if there
is no data available right now but there might come more data in the future. The `next()` method
is called to get next data element. This method will only be called if `reachedEnd()` returns
false. This method can also block if no data is currently available but more will arrive in the
future.
The methods must react to thread interrupt calls and break out of blocking calls with
`InterruptedException`. The method may ignore interrupt calls and/or swallow InterruptedExceptions,
if it is guaranteed that the method returns quasi immediately irrespectively of the input.
This is true for example for file streams, where the call is guaranteed to return after a very
short I/O delay in the order of milliseconds.
In addition to the bounded data sources (with similar method signatures as the
[batch API](programming_guide.html#data-sources)) there are several predefined stream sources
accessible from the `StreamExecutionEnvironment`:
**Socket text stream*: Creates a new `DataStream` that contains the strings received
from the given socket. Strings are decoded by the system's default character set. The user
can optionally set the delimiters or the number of connection retries in case of errors.
Usage: `env.socketTextStream(hostname, port,…)`
The `SourceFunction` interface contains only two methods: `run(Collector<T> out)` and `cancel()`.
The `run` method will be called only once when the program starts, and should encapsulate the logic for generating the `DataStream`. Any object collected to the collector will be part of the `DataStream`. The `cancel` method will be called whenever the topology has failed for some reason.
**Text file stream*: Creates a new `DataStream` that contains the lines of the files created
(or modified) in a given directory. The system continuously monitors the given path, and processes
any new files or modifications based on the settings. The file will be read with the system's
default character set.
Usage: `env.readFileStream(String path, long checkFrequencyMillis, WatchType watchType)`
In addition to the static data sources (with similar method signatures as the [batch API](programming_guide.html#data-sources)) there are several predefined stream sources accessible from the `StreamExecutionEnvironment`:
**Message queue connectors*: There are pre-implemented connectors for a number of popular message
queue services, please refer to the section on [connectors](#stream-connectors) for more details.
**Socket text stream*: Creates a new `DataStream` that contains the strings received infinitely from the given socket. Strings are decoded by the system's default character set. The user can optionally set the delimiters or the number of connection retries in case of errors.
Usage: `env.socketTextStream(hostname, port,…)`
**Text file stream*: Creates a new `DataStream` that contains the lines of the files created (or modified) in a given directory. The system continuously monitors the given path, and processes any new files or modifications based on the settings. The file will be read with the system's default character set.
Usage: `env.readFileStream(String path, long checkFrequencyMillis, WatchType watchType)`
**Message queue connectors*: There are pre-implemented connectors for a number of the most popular message queue services, please refer to the section on [connectors](#stream-connectors) for more detail.
**Custom source*: Creates a new `DataStream` by using a user defined `SourceFunction` implementation.