diff --git a/docs/streaming_guide.md b/docs/streaming_guide.md index bea09076a5161f211916fd9819799913b783c19c..c7e70602b90c247ea367458b12ee1d7fab1071f7 100644 --- a/docs/streaming_guide.md +++ b/docs/streaming_guide.md @@ -170,7 +170,9 @@ Usage: `operator.setParallelism(1)` ### Sources -The user can connect to data streams by the different implementations of `SourceFunction` using `StreamExecutionEnvironment.addSource(SourceFunction)`. In contrast with other operators, DataStreamSources have a default operator parallelism of 1 which can be increased using the `.setParallelism(dop)` method as we will see later at the operator settings. +The user can connect to data streams by the different implementations of `SourceFunction` using `StreamExecutionEnvironment.addSource(SourceFunction)`. In contrast with other operators, DataStreamSources have a default operator parallelism of 1. + +To create parallel sources the users source function needs to implement `ParallelSourceFunction` or extend `RichParallelSourceFunction` in which cases the source will have the parallelism of the environment. The degree of parallelism for ParallelSourceFunctions can be changed afterwards using `source.setParallelism(int dop)`. There are several predefined ones similar to the ones of the batch API and some streaming specific ones like: @@ -617,7 +619,7 @@ Stream connectors Connectors provide an interface for accessing data from various third party sources (message queues). Currently four connectors are natively supported, namely [Apache Kafka](https://kafka.apache.org/), [RabbitMQ](http://www.rabbitmq.com/), [Apache Flume](https://flume.apache.org/index.html) and [Twitter Streaming API](https://dev.twitter.com/docs/streaming-apis). -Typically the connector packages consist of an abstract source and sink (with the exception of Twitter where only a source is provided). The burden of the user is to implement a subclass of these abstract classes specifying a serializer and a deserializer function. +Typically the connector packages consist of a source and sink class (with the exception of Twitter where only a source is provided). To use these sources the user needs to pass Serialization/Deserialization schemas for the connectors for the desired types. (Or use some predefined ones) To run an application using one of these connectors usually additional third party components are required to be installed and launched, e.g. the servers for the message queues. Further instructions for these can be found in the corresponding subsections. [Docker containers](#docker-containers-for-connectors) are also provided encapsulating these services to aid users getting started with connectors. @@ -631,106 +633,40 @@ This connector provides access to data streams from [Apache Kafka](https://kafka * If the Kafka zookeeper and server are running on a remote machine then in the config/server.properties file the advertised.host.name must be set to the machine's IP address. #### Kafka Source -An abstract class providing an interface for receiving data from Kafka. By implementing the user must: +A class providing an interface for receiving data from Kafka. - * Write a constructor calling the constructor of the abstract class, - * Write a deserializer function which processes the data coming from Kafka, - * Stop the source manually when necessary with one of the close functions. +The followings have to be provided for the `KafkaSource(..)` constructor in order: -The implemented class must extend `KafkaSource`, for example: `KafkaSource`. +1. The hostname +2. The group name +3. The topic name +4. The parallelism +5. Deserialisation schema -##### Constructor -An example of an implementation of a constructor: -~~~java -public MyKafkaSource(String zkQuorum, String groupId, String topicId, int numThreads) { - super(zkQuorum, groupId, topicId, numThreads); -} -~~~ - -##### Deserializer -An example of an implementation of a deserializer: +Example: ~~~java -@Override -public String deserialize(byte[] msg) { - String s = new String(msg); - if(s.equals("q")){ - closeWithoutSend(); - } - return new String(s); -} +DataStream stream = env + .addSource(new KafkaSource("localhost:2181", "group", "test",new SimpleStringSchema())) + .print(); ~~~ -The source closes when it receives the String `"q"`. - -###### Close -Two types of close functions are available, namely `closeWithoutSend()` and `sendAndClose()`. The former closes the connection immediately and no further data will be sent, while the latter closes the connection only when the next message is sent after this call. - -In the example provided `closeWithoutSend()` is used because here the String `"q"` is meta-message indicating the end of the stream and there is no need to forward it. - #### Kafka Sink -An abstract class providing an interface for sending data to Kafka. By implementing the user must: - - * Write a constructor calling the constructor of the abstract class, - * Write a serializer function to send data in the desired form to Kafka, - * Stop the sink manually when necessary with one of the close functions. - -The implemented class must extend `KafkaSink`, for example `KafkaSink`. - -##### Constructor -An example of an implementation of a constructor: - -~~~java -public MyKafkaSink(String topicId, String brokerAddr) { - super(topicId, brokerAddr); -} -~~~ - -##### Serializer -An example of an implementation of a serializer: - -~~~java -@Override -public String serialize(String tuple) { - if(tuple.equals("q")){ - sendAndClose(); - } - return tuple; -} -~~~ +A class providing an interface for sending data to Kafka. -##### Close -The API provided is the [same](#kafka_source_close) as the one for `KafkaSource`. +The followings have to be provided for the `KafkaSink()` constructor in order: -#### Building A Topology -To use a Kafka connector as a source in Flink call the `addSource()` function with a new instance of the class which extends `KafkaSource` as parameter: - -~~~java -DataStream stream1 = env. - addSource(new MyKafkaSource("localhost:2181", "group", "test", 1), SOURCE_PARALELISM) - .print(); -~~~ - -The followings have to be provided for the `MyKafkaSource()` constructor in order: - -1. The hostname -2. The group name -3. The topic name -4. The parallelism +1. The topic name +2. The hostname +3. Serialisation schema -Similarly to use a Kafka connector as a sink in Flink call the `addSink()` function with a new instance of the class which extends `KafkaSink`: +Example: ~~~java -DataStream stream2 = env - .addSource(new MySource()) - .addSink(new MyKafkaSink("test", "localhost:9092")); +stream.addSink(new KafkaSink("test", "localhost:9092", new SimpleStringSchema())); ~~~ -The followings have to be provided for the `MyKafkaSink()` constructor in order: - -1. The topic name -2. The hostname More about Kafka can be found [here](https://kafka.apache.org/documentation.html). @@ -741,114 +677,40 @@ More about Kafka can be found [here](https://kafka.apache.org/documentation.html This connector provides access to datastreams from [Apache Flume](http://flume.apache.org/). #### Installing Apache Flume -[Download](http://flume.apache.org/download.html) Apache Flume. A configuration file is required for starting agents in Flume. A configuration file for running the example can be found [here](#config_file). +[Download](http://flume.apache.org/download.html) Apache Flume. A configuration file is required for starting agents in Flume. A configuration file for running the example can be found [here](#config_file). #### Flume Source -An abstract class providing an interface for receiving data from Flume. By implementing the user must: +A class providing an interface for receiving data from Flume. - * Write a constructor calling the constructor of the abstract class, - * Write a deserializer function which processes the data coming from Flume, - * Stop the source manually when necessary with one of the close functions. +The followings have to be provided for the `FlumeSource(…)` constructor in order: -The implemented class must extend `FlumeSource` for example: `FlumeSource` - -##### Constructor -An example of an implementation of a constructor: - -~~~java -MyFlumeSource(String host, int port) { - super(host, port); -} -~~~ +1. The hostname +2. The port number +3. Deserialisation schema -##### Deserializer -An example of an implementation of a deserializer: +Example: ~~~java -@Override -public String deserialize(byte[] msg) { - String s = (String) SerializationUtils.deserialize(msg); - String out = s; - if (s.equals("q")) { - closeWithoutSend(); - } - return out; -} +DataStream stream = env + .addSource(new FlumeSource("localhost", 41414, new SimpleStringSchema())) + .print(); ~~~ -The source closes when it receives the String `"q"`. - -##### Close -Two types of close functions are available, namely `closeWithoutSend()` and `sendAndClose()`.The former closes the connection immediately and no further data will be sent, while the latter closes the connection only when the next message is sent after this call. - -In the example `closeWithoutSend()` is used because here the String `"q"` is meta-message indicating the end of the stream and there is no need to forward it. - #### Flume Sink -An abstract class providing an interface for sending data to Flume. By implementing the user must: - -* Write a constructor calling the constructor of the abstract class, -* Write a serializer function to send data in the desired form to Flume, -* Stop the sink manually when necessary with one of the close functions. +A class providing an interface for sending data to Flume. -The implemented class must extend `FlumeSink`, for example `FlumeSink`. - -##### Constructor -An example of an implementation of a constructor: - -~~~java -public MyFlumeSink(String host, int port) { - super(host, port); -} -~~~ - -##### Serializer -An example of an implementation of a serializer. - -~~~java -@Override -public byte[] serialize(String tuple) { - if (tuple.equals("q")) { - try { - sendAndClose(); - } catch (Exception e) { - new RuntimeException("Error while closing Flume connection with " + port + " at " - + host, e); - } - } - return SerializationUtils.serialize(tuple); -} -~~~ - -##### Close -The API provided is the [same](#flume_source_close) as the one for `FlumeSource`. - -#### Building A Topology -To use a Flume connector as a source in Flink call the `addSource()` function with a new instance of the class which extends `FlumeSource` as parameter: - -~~~java -DataStream dataStream1 = env - .addSource(new MyFlumeSource("localhost", 41414)) - .print(); -~~~ - -The followings have to be provided for the `MyFlumeSource()` constructor in order: +The followings have to be provided for the `FlumeSink(…)` constructor in order: 1. The hostname 2. The port number +3. Serialisation schema -Similarly to use a Flume connector as a sink in Flink call the `addSink()` function with a new instance of the class which extends `FlumeSink` +Example: ~~~java -DataStream dataStream2 = env - .fromElements("one", "two", "three", "four", "five", "q") - .addSink(new MyFlumeSink("localhost", 42424)); +stream.addSink(new FlumeSink("localhost", 42424, new StringToByteSerializer())); ~~~ -The followings have to be provided for the `MyFlumeSink()` constructor in order: - -1. The hostname -2. The port number - ##### Configuration file An example of a configuration file: @@ -890,107 +752,38 @@ This connector provides access to datastreams from [RabbitMQ](http://www.rabbitm Follow the instructions from the [RabbitMQ download page](http://www.rabbitmq.com/download.html). After the installation the server automatically starts and the application connecting to RabbitMQ can be launched. #### RabbitMQ Source -An abstract class providing an interface for receiving data from RabbitMQ. By implementing the user must: - -* Write a constructor calling the constructor of the abstract class, -* Write a deserializer function which processes the data coming from RabbitMQ, -* Stop the source manually when necessary with one of the close functions. -The implemented class must extend `RabbitMQSource` for example: `RabbitMQSource` +A class providing an interface for receiving data from RabbitMQ. -##### Constructor -An example of an implementation of a constructor: +The followings have to be provided for the `RMQSource(…)` constructor in order: -~~~java -public MyRMQSource(String HOST_NAME, String QUEUE_NAME) { - super(HOST_NAME, QUEUE_NAME); -} -~~~ +1. The hostname +2. The queue name +3. Deserialisation schema -##### Deserializer -An example of an implemetation of a deserializer: +Example: ~~~java -@Override -public String deserialize(byte[] t) { - String s = (String) SerializationUtils.deserialize(t); - String out = s; - if (s.equals("q")) { - closeWithoutSend(); - } - return out; -} +DataStream stream = env + .addSource(new RMQSource("localhost", "hello", new SimpleStringSchema())) + .print(); ~~~ -The source closes when it receives the String `"q"`. - -##### Close -Two types of close functions are available, namely `closeWithoutSend()` and `sendAndClose()`. The former closes the connection immediately and no further data will be sent, while the latter closes the connection only when the next message is sent after this call. - -Closes the connection only when the next message is sent after this call. - -In the example `closeWithoutSend()` is used because here the String `"q"` is meta-message indicating the end of the stream and there is no need to forward it. - #### RabbitMQ Sink -An abstract class providing an interface for sending data to RabbitMQ. By implementing the user must: - -* Write a constructor calling the constructor of the abstract class -* Write a serializer function to send data in the desired form to RabbitMQ -* Stop the sink manually when necessary with one of the close functions - -The implemented class must extend `RabbitMQSink` for example: `RabbitMQSink` - -##### Constructor -An example of an implementation of a constructor: - -~~~java -public MyRMQSink(String HOST_NAME, String QUEUE_NAME) { - super(HOST_NAME, QUEUE_NAME); -} -~~~ +A class providing an interface for sending data to RabbitMQ. -##### Serializer -An example of an implementation of a serializer. - -~~~java -@Override -public byte[] serialize(Tuple tuple) { - if (t.getField(0).equals("q")) { - sendAndClose(); - } - return SerializationUtils.serialize(tuple.f0); -} -~~~ - -##### Close -The API provided is the [same](#rmq_source_close) as the one for `RabbitMQSource`. - -#### Building A Topology -To use a RabbitMQ connector as a source in Flink call the `addSource()` function with a new instance of the class which extends `RabbitMQSource` as parameter: - -~~~java -DataStream dataStream1 = env - .addSource(new MyRMQSource("localhost", "hello")) - .print(); -~~~ - -The followings have to be provided for the `MyRabbitMQSource()` constructor in order: +The followings have to be provided for the `RMQSink(…)` constructor in order: 1. The hostname 2. The queue name +3. Serialisation schema -Similarly to use a RabbitMQ connector as a sink in Flink call the `addSink()` function with a new instance of the class which extends `RabbitMQSink` +Example: ~~~java -DataStream dataStream2 = env - .fromElements("one", "two", "three", "four", "five", "q") - .addSink(new MyRMQSink("localhost", "hello")); +stream.addSink(new RMQSink("localhost", "hello", new StringToByteSerializer())); ~~~ -The followings have to be provided for the `MyRabbitMQSink()` constructor in order: - -1. The hostname -1. The queue name More about RabbitMQ can be found [here](http://www.rabbitmq.com/). diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala index 361abd6c188bc787400a23832d5a01310276d67b..61a61098f0e37f962fd7f13333d6a6c2b1796ff9 100644 --- a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala +++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala @@ -151,7 +151,11 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { /** * Create a DataStream using a user defined source function for arbitrary - * source functionality. + * source functionality. By default sources have a parallelism of 1. + * To enable parallel execution, the user defined source should implement + * ParallelSourceFunction or extend RichParallelSourceFunction. + * In these cases the resulting source will have the parallelism of the environment. + * To change this afterwards call DataStreamSource.setParallelism(int) * */ def addSource[T: ClassTag: TypeInformation](function: SourceFunction[T]): DataStream[T] = {