提交 8500ad08 编写于 作者: G Gyula Fora 提交者: mbalassi

[streaming] Updated streaming guide for recent connector and data source changes

上级 11518b58
......@@ -170,7 +170,9 @@ Usage: `operator.setParallelism(1)`
### Sources
The user can connect to data streams by the different implementations of `SourceFunction` using `StreamExecutionEnvironment.addSource(SourceFunction)`. In contrast with other operators, DataStreamSources have a default operator parallelism of 1 which can be increased using the `.setParallelism(dop)` method as we will see later at the operator settings.
The user can connect to data streams by the different implementations of `SourceFunction` using `StreamExecutionEnvironment.addSource(SourceFunction)`. In contrast with other operators, DataStreamSources have a default operator parallelism of 1.
To create parallel sources the users source function needs to implement `ParallelSourceFunction` or extend `RichParallelSourceFunction` in which cases the source will have the parallelism of the environment. The degree of parallelism for ParallelSourceFunctions can be changed afterwards using `source.setParallelism(int dop)`.
There are several predefined ones similar to the ones of the batch API and some streaming specific ones like:
......@@ -617,7 +619,7 @@ Stream connectors
Connectors provide an interface for accessing data from various third party sources (message queues). Currently four connectors are natively supported, namely [Apache Kafka](https://kafka.apache.org/), [RabbitMQ](http://www.rabbitmq.com/), [Apache Flume](https://flume.apache.org/index.html) and [Twitter Streaming API](https://dev.twitter.com/docs/streaming-apis).
Typically the connector packages consist of an abstract source and sink (with the exception of Twitter where only a source is provided). The burden of the user is to implement a subclass of these abstract classes specifying a serializer and a deserializer function.
Typically the connector packages consist of a source and sink class (with the exception of Twitter where only a source is provided). To use these sources the user needs to pass Serialization/Deserialization schemas for the connectors for the desired types. (Or use some predefined ones)
To run an application using one of these connectors usually additional third party components are required to be installed and launched, e.g. the servers for the message queues. Further instructions for these can be found in the corresponding subsections. [Docker containers](#docker-containers-for-connectors) are also provided encapsulating these services to aid users getting started with connectors.
......@@ -631,106 +633,40 @@ This connector provides access to data streams from [Apache Kafka](https://kafka
* If the Kafka zookeeper and server are running on a remote machine then in the config/server.properties file the advertised.host.name must be set to the machine's IP address.
#### Kafka Source
An abstract class providing an interface for receiving data from Kafka. By implementing the user must:
A class providing an interface for receiving data from Kafka.
* Write a constructor calling the constructor of the abstract class,
* Write a deserializer function which processes the data coming from Kafka,
* Stop the source manually when necessary with one of the close functions.
The followings have to be provided for the `KafkaSource(..)` constructor in order:
The implemented class must extend `KafkaSource`, for example: `KafkaSource<String>`.
1. The hostname
2. The group name
3. The topic name
4. The parallelism
5. Deserialisation schema
##### Constructor
An example of an implementation of a constructor:
public MyKafkaSource(String zkQuorum, String groupId, String topicId, int numThreads) {
super(zkQuorum, groupId, topicId, numThreads);
##### Deserializer
An example of an implementation of a deserializer:
public String deserialize(byte[] msg) {
String s = new String(msg);
return new String(s);
DataStream<String> stream = env
.addSource(new KafkaSource<String>("localhost:2181", "group", "test",new SimpleStringSchema()))
The source closes when it receives the String `"q"`.
###### Close<a name="kafka_source_close"></a>
Two types of close functions are available, namely `closeWithoutSend()` and `sendAndClose()`. The former closes the connection immediately and no further data will be sent, while the latter closes the connection only when the next message is sent after this call.
In the example provided `closeWithoutSend()` is used because here the String `"q"` is meta-message indicating the end of the stream and there is no need to forward it.
#### Kafka Sink
An abstract class providing an interface for sending data to Kafka. By implementing the user must:
* Write a constructor calling the constructor of the abstract class,
* Write a serializer function to send data in the desired form to Kafka,
* Stop the sink manually when necessary with one of the close functions.
The implemented class must extend `KafkaSink`, for example `KafkaSink<String, String>`.
##### Constructor
An example of an implementation of a constructor:
public MyKafkaSink(String topicId, String brokerAddr) {
super(topicId, brokerAddr);
##### Serializer
An example of an implementation of a serializer:
public String serialize(String tuple) {
return tuple;
A class providing an interface for sending data to Kafka.
##### Close
The API provided is the [same](#kafka_source_close) as the one for `KafkaSource`.
The followings have to be provided for the `KafkaSink()` constructor in order:
#### Building A Topology
To use a Kafka connector as a source in Flink call the `addSource()` function with a new instance of the class which extends `KafkaSource` as parameter:
DataStream<String> stream1 = env.
addSource(new MyKafkaSource("localhost:2181", "group", "test", 1), SOURCE_PARALELISM)
The followings have to be provided for the `MyKafkaSource()` constructor in order:
1. The hostname
2. The group name
3. The topic name
4. The parallelism
1. The topic name
2. The hostname
3. Serialisation schema
Similarly to use a Kafka connector as a sink in Flink call the `addSink()` function with a new instance of the class which extends `KafkaSink`:
DataStream<String> stream2 = env
.addSource(new MySource())
.addSink(new MyKafkaSink("test", "localhost:9092"));
stream.addSink(new KafkaSink<String, String>("test", "localhost:9092", new SimpleStringSchema()));
The followings have to be provided for the `MyKafkaSink()` constructor in order:
1. The topic name
2. The hostname
More about Kafka can be found [here](https://kafka.apache.org/documentation.html).
......@@ -741,114 +677,40 @@ More about Kafka can be found [here](https://kafka.apache.org/documentation.html
This connector provides access to datastreams from [Apache Flume](http://flume.apache.org/).
#### Installing Apache Flume
[Download](http://flume.apache.org/download.html) Apache Flume. A configuration file is required for starting agents in Flume. A configuration file for running the example can be found [here](#config_file).
[Download](http://flume.apache.org/download.html) Apache Flume. A configuration file is required for starting agents in Flume. A configuration file for running the example can be found [here](#config_file).
#### Flume Source
An abstract class providing an interface for receiving data from Flume. By implementing the user must:
A class providing an interface for receiving data from Flume.
* Write a constructor calling the constructor of the abstract class,
* Write a deserializer function which processes the data coming from Flume,
* Stop the source manually when necessary with one of the close functions.
The followings have to be provided for the `FlumeSource(…)` constructor in order:
The implemented class must extend `FlumeSource` for example: `FlumeSource<String>`
##### Constructor
An example of an implementation of a constructor:
MyFlumeSource(String host, int port) {
super(host, port);
1. The hostname
2. The port number
3. Deserialisation schema
##### Deserializer
An example of an implementation of a deserializer:
public String deserialize(byte[] msg) {
String s = (String) SerializationUtils.deserialize(msg);
String out = s;
if (s.equals("q")) {
return out;
DataStream<String> stream = env
.addSource(new FlumeSource<String>("localhost", 41414, new SimpleStringSchema()))
The source closes when it receives the String `"q"`.
##### Close<a name="flume_source_close"></a>
Two types of close functions are available, namely `closeWithoutSend()` and `sendAndClose()`.The former closes the connection immediately and no further data will be sent, while the latter closes the connection only when the next message is sent after this call.
In the example `closeWithoutSend()` is used because here the String `"q"` is meta-message indicating the end of the stream and there is no need to forward it.
#### Flume Sink
An abstract class providing an interface for sending data to Flume. By implementing the user must:
* Write a constructor calling the constructor of the abstract class,
* Write a serializer function to send data in the desired form to Flume,
* Stop the sink manually when necessary with one of the close functions.
A class providing an interface for sending data to Flume.
The implemented class must extend `FlumeSink`, for example `FlumeSink<String, String>`.
##### Constructor
An example of an implementation of a constructor:
public MyFlumeSink(String host, int port) {
super(host, port);
##### Serializer
An example of an implementation of a serializer.
public byte[] serialize(String tuple) {
if (tuple.equals("q")) {
try {
} catch (Exception e) {
new RuntimeException("Error while closing Flume connection with " + port + " at "
+ host, e);
return SerializationUtils.serialize(tuple);
##### Close
The API provided is the [same](#flume_source_close) as the one for `FlumeSource`.
#### Building A Topology
To use a Flume connector as a source in Flink call the `addSource()` function with a new instance of the class which extends `FlumeSource` as parameter:
DataStream<String> dataStream1 = env
.addSource(new MyFlumeSource("localhost", 41414))
The followings have to be provided for the `MyFlumeSource()` constructor in order:
The followings have to be provided for the `FlumeSink(…)` constructor in order:
1. The hostname
2. The port number
3. Serialisation schema
Similarly to use a Flume connector as a sink in Flink call the `addSink()` function with a new instance of the class which extends `FlumeSink`
DataStream<String> dataStream2 = env
.fromElements("one", "two", "three", "four", "five", "q")
.addSink(new MyFlumeSink("localhost", 42424));
stream.addSink(new FlumeSink<String>("localhost", 42424, new StringToByteSerializer()));
The followings have to be provided for the `MyFlumeSink()` constructor in order:
1. The hostname
2. The port number
##### Configuration file<a name="config_file"></a>
An example of a configuration file:
......@@ -890,107 +752,38 @@ This connector provides access to datastreams from [RabbitMQ](http://www.rabbitm
Follow the instructions from the [RabbitMQ download page](http://www.rabbitmq.com/download.html). After the installation the server automatically starts and the application connecting to RabbitMQ can be launched.
#### RabbitMQ Source
An abstract class providing an interface for receiving data from RabbitMQ. By implementing the user must:
* Write a constructor calling the constructor of the abstract class,
* Write a deserializer function which processes the data coming from RabbitMQ,
* Stop the source manually when necessary with one of the close functions.
The implemented class must extend `RabbitMQSource` for example: `RabbitMQSource<String>`
A class providing an interface for receiving data from RabbitMQ.
##### Constructor
An example of an implementation of a constructor:
The followings have to be provided for the `RMQSource(…)` constructor in order:
public MyRMQSource(String HOST_NAME, String QUEUE_NAME) {
1. The hostname
2. The queue name
3. Deserialisation schema
##### Deserializer
An example of an implemetation of a deserializer:
public String deserialize(byte[] t) {
String s = (String) SerializationUtils.deserialize(t);
String out = s;
if (s.equals("q")) {
return out;
DataStream<String> stream = env
.addSource(new RMQSource<String>("localhost", "hello", new SimpleStringSchema()))
The source closes when it receives the String `"q"`.
##### Close<a name="rmq_source_close"></a>
Two types of close functions are available, namely `closeWithoutSend()` and `sendAndClose()`. The former closes the connection immediately and no further data will be sent, while the latter closes the connection only when the next message is sent after this call.
Closes the connection only when the next message is sent after this call.
In the example `closeWithoutSend()` is used because here the String `"q"` is meta-message indicating the end of the stream and there is no need to forward it.
#### RabbitMQ Sink
An abstract class providing an interface for sending data to RabbitMQ. By implementing the user must:
* Write a constructor calling the constructor of the abstract class
* Write a serializer function to send data in the desired form to RabbitMQ
* Stop the sink manually when necessary with one of the close functions
The implemented class must extend `RabbitMQSink` for example: `RabbitMQSink<String, String>`
##### Constructor
An example of an implementation of a constructor:
public MyRMQSink(String HOST_NAME, String QUEUE_NAME) {
A class providing an interface for sending data to RabbitMQ.
##### Serializer
An example of an implementation of a serializer.
public byte[] serialize(Tuple tuple) {
if (t.getField(0).equals("q")) {
return SerializationUtils.serialize(tuple.f0);
##### Close
The API provided is the [same](#rmq_source_close) as the one for `RabbitMQSource`.
#### Building A Topology
To use a RabbitMQ connector as a source in Flink call the `addSource()` function with a new instance of the class which extends `RabbitMQSource` as parameter:
DataStream<String> dataStream1 = env
.addSource(new MyRMQSource("localhost", "hello"))
The followings have to be provided for the `MyRabbitMQSource()` constructor in order:
The followings have to be provided for the `RMQSink(…)` constructor in order:
1. The hostname
2. The queue name
3. Serialisation schema
Similarly to use a RabbitMQ connector as a sink in Flink call the `addSink()` function with a new instance of the class which extends `RabbitMQSink`
DataStream<String> dataStream2 = env
.fromElements("one", "two", "three", "four", "five", "q")
.addSink(new MyRMQSink("localhost", "hello"));
stream.addSink(new RMQSink<String>("localhost", "hello", new StringToByteSerializer()));
The followings have to be provided for the `MyRabbitMQSink()` constructor in order:
1. The hostname
1. The queue name
More about RabbitMQ can be found [here](http://www.rabbitmq.com/).
......@@ -151,7 +151,11 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
* Create a DataStream using a user defined source function for arbitrary
* source functionality.
* source functionality. By default sources have a parallelism of 1.
* To enable parallel execution, the user defined source should implement
* ParallelSourceFunction or extend RichParallelSourceFunction.
* In these cases the resulting source will have the parallelism of the environment.
* To change this afterwards call DataStreamSource.setParallelism(int)
def addSource[T: ClassTag: TypeInformation](function: SourceFunction[T]): DataStream[T] = {
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册