提交 7cc24006 编写于 作者: G ghermann 提交者: mbalassi

[FLINK-1104] [streaming] Eliminated Tuple1 from connectors

上级 a3b02840
......@@ -456,7 +456,7 @@ An abstract class providing an interface for receiving data from Kafka. By imple
* Write a deserializer function which processes the data coming from Kafka,
* Stop the source manually when necessary with one of the close functions.
The implemented class must extend `KafkaSource`, for example: `KafkaSource<Tuple1<String>>`.
The implemented class must extend `KafkaSource`, for example: `KafkaSource<String>`.
##### Constructor
An example of an implementation of a constructor:
......@@ -472,12 +472,12 @@ An example of an implementation of a deserializer:
~~~java
@Override
public Tuple1<String> deserialize(byte[] msg) {
public String deserialize(byte[] msg) {
String s = new String(msg);
if(s.equals("q")){
closeWithoutSend();
}
return new Tuple1<String>(s);
return new String(s);
}
~~~
......@@ -494,7 +494,7 @@ An abstract class providing an interface for sending data to Kafka. By implement
* Write a serializer function to send data in the desired form to Kafka,
* Stop the sink manually when necessary with one of the close functions.
The implemented class must extend `KafkaSink`, for example `KafkaSink<Tuple1<String>, String>`.
The implemented class must extend `KafkaSink`, for example `KafkaSink<String, String>`.
##### Constructor
An example of an implementation of a constructor:
......@@ -510,11 +510,11 @@ An example of an implementation of a serializer:
~~~java
@Override
public String serialize(Tuple1<String> tuple) {
if(tuple.f0.equals("q")){
public String serialize(String tuple) {
if(tuple.equals("q")){
sendAndClose();
}
return tuple.f0;
return tuple;
}
~~~
......@@ -524,8 +524,8 @@ The API provided is the [same](#kafka_source_close) as the one for `KafkaSource`
#### Building A Topology
To use a Kafka connector as a source in Flink call the `addSource()` function with a new instance of the class which extends `KafkaSource` as parameter:
~~~java
DataStream<Tuple1<String>> stream1 = env.
```java
DataStream<String> stream1 = env.
addSource(new MyKafkaSource("localhost:2181", "group", "test", 1), SOURCE_PARALELISM)
.print();
~~~
......@@ -539,8 +539,8 @@ The followings have to be provided for the `MyKafkaSource()` constructor in orde
Similarly to use a Kafka connector as a sink in Flink call the `addSink()` function with a new instance of the class which extends `KafkaSink`:
~~~java
DataStream<Tuple1<String>> stream2 = env
```java
DataStream<String> stream2 = env
.addSource(new MySource())
.addSink(new MyKafkaSink("test", "localhost:9092"));
~~~
......@@ -567,7 +567,7 @@ An abstract class providing an interface for receiving data from Flume. By imple
* Write a deserializer function which processes the data coming from Flume,
* Stop the source manually when necessary with one of the close functions.
The implemented class must extend `FlumeSource` for example: `FlumeSource<Tuple1<String>>`
The implemented class must extend `FlumeSource` for example: `FlumeSource<String>`
##### Constructor
An example of an implementation of a constructor:
......@@ -583,10 +583,9 @@ An example of an implementation of a deserializer:
~~~java
@Override
public Tuple1<String> deserialize(byte[] msg) {
public String deserialize(byte[] msg) {
String s = (String) SerializationUtils.deserialize(msg);
Tuple1<String> out = new Tuple1<String>();
out.f0 = s;
String out = s;
if (s.equals("q")) {
closeWithoutSend();
}
......@@ -607,7 +606,7 @@ An abstract class providing an interface for sending data to Flume. By implement
* Write a serializer function to send data in the desired form to Flume,
* Stop the sink manually when necessary with one of the close functions.
The implemented class must extend `FlumeSink`, for example `FlumeSink<Tuple1<String>, String>`.
The implemented class must extend `FlumeSink`, for example `FlumeSink<String, String>`.
##### Constructor
An example of an implementation of a constructor:
......@@ -623,8 +622,8 @@ An example of an implementation of a serializer.
~~~java
@Override
public byte[] serialize(Tuple1<String> tuple) {
if (tuple.f0.equals("q")) {
public byte[] serialize(String tuple) {
if (tuple.equals("q")) {
try {
sendAndClose();
} catch (Exception e) {
......@@ -632,7 +631,7 @@ public byte[] serialize(Tuple1<String> tuple) {
+ host, e);
}
}
return SerializationUtils.serialize(tuple.f0);
return SerializationUtils.serialize(tuple);
}
~~~
......@@ -642,8 +641,8 @@ The API provided is the [same](#flume_source_close) as the one for `FlumeSource`
#### Building A Topology
To use a Flume connector as a source in Flink call the `addSource()` function with a new instance of the class which extends `FlumeSource` as parameter:
~~~java
DataStream<Tuple1<String>> dataStream1 = env
```java
DataStream<String> dataStream1 = env
.addSource(new MyFlumeSource("localhost", 41414))
.print();
~~~
......@@ -655,8 +654,8 @@ The followings have to be provided for the `MyFlumeSource()` constructor in orde
Similarly to use a Flume connector as a sink in Flink call the `addSink()` function with a new instance of the class which extends `FlumeSink`
~~~java
DataStream<Tuple1<String>> dataStream2 = env
```java
DataStream<String> dataStream2 = env
.fromElements("one", "two", "three", "four", "five", "q")
.addSink(new MyFlumeSink("localhost", 42424));
~~~
......@@ -713,7 +712,7 @@ An abstract class providing an interface for receiving data from RabbitMQ. By im
* Write a deserializer function which processes the data coming from RabbitMQ,
* Stop the source manually when necessary with one of the close functions.
The implemented class must extend `RabbitMQSource` for example: `RabbitMQSource<Tuple1<String>>`
The implemented class must extend `RabbitMQSource` for example: `RabbitMQSource<String>`
##### Constructor
An example of an implementation of a constructor:
......@@ -729,10 +728,9 @@ An example of an implemetation of a deserializer:
~~~java
@Override
public Tuple1<String> deserialize(byte[] t) {
public String deserialize(byte[] t) {
String s = (String) SerializationUtils.deserialize(t);
Tuple1<String> out = new Tuple1<String>();
out.f0 = s;
String out = s;
if (s.equals("q")) {
closeWithoutSend();
}
......@@ -755,7 +753,7 @@ An abstract class providing an interface for sending data to RabbitMQ. By implem
* Write a serializer function to send data in the desired form to RabbitMQ
* Stop the sink manually when necessary with one of the close functions
The implemented class must extend `RabbitMQSink` for example: `RabbitMQSink<Tuple1<String>, String>`
The implemented class must extend `RabbitMQSink` for example: `RabbitMQSink<String, String>`
##### Constructor
An example of an implementation of a constructor:
......@@ -787,7 +785,7 @@ To use a RabbitMQ connector as a source in Flink call the `addSource()` function
~~~java
@SuppressWarnings("unused")
DataStream<Tuple1<String>> dataStream1 = env
DataStream<String> dataStream1 = env
.addSource(new MyRMQSource("localhost", "hello"))
.print();
~~~
......@@ -799,8 +797,8 @@ The followings have to be provided for the `MyRabbitMQSource()` constructor in o
Similarly to use a RabbitMQ connector as a sink in Flink call the `addSink()` function with a new instance of the class which extends `RabbitMQSink`
~~~java
DataStream<Tuple1<String>> dataStream2 = env
```java
DataStream<String> dataStream2 = env
.fromElements("one", "two", "three", "four", "five", "q")
.addSink(new MyRMQSink("localhost", "hello"));
~~~
......@@ -835,7 +833,7 @@ To run the container type:
sudo docker run -p 127.0.0.1:5672:5672 -t -i flinkstreaming/flink-connectors-rabbitmq
~~~
Now a terminal started running from the image with all the necessary configurations to test run the RabbitMQ connector. The -p flag binds the localhost's and the Docker container's port so RabbitMQ can communicate with the application through this.
Now a terminal started running from the image with all the necessary configurations to test run the RabbitMQ connector. The -p flag binds the localhost's and the Docker container's ports so RabbitMQ can communicate with the application through these.
To start the RabbitMQ server:
......@@ -877,7 +875,7 @@ To run the container type:
sudo docker run -p 127.0.0.1:2181:2181 -p 127.0.0.1:9092:9092 -t -i flinkstreaming/flink-connectors-kafka
~~~
Now a terminal started running from the image with all the necessary configurations to test run the Kafka connector. The -p flag binds the localhost's and the Docker container's port so Kafka can communicate with the application through this.
Now a terminal started running from the image with all the necessary configurations to test run the Kafka connector. The -p flag binds the localhost's and the Docker container's ports so Kafka can communicate with the application through these.
First start a zookeeper in the background:
~~~batch
......@@ -931,7 +929,7 @@ To run the container type:
sudo docker run -t -i flinkstreaming/flink-connectors-flume
~~~
Now a terminal started running from the image with all the necessary configurations to test run the Flume connector. The -p flag binds the localhost's and the Docker container's port so flume can communicate with the application through this.
Now a terminal started running from the image with all the necessary configurations to test run the Flume connector. The -p flag binds the localhost's and the Docker container's ports so flume can communicate with the application through these.
To have the latest version of Flink type:
~~~batch
......
......@@ -17,7 +17,6 @@
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.sink.SinkFunction;
......@@ -29,20 +28,20 @@ import org.slf4j.LoggerFactory;
public class KafkaTopology {
private static final Logger LOG = LoggerFactory.getLogger(KafkaTopology.class);
public static final class MySource implements SourceFunction<Tuple1<String>> {
public static final class MySource implements SourceFunction<String> {
private static final long serialVersionUID = 1L;
@Override
public void invoke(Collector<Tuple1<String>> collector) throws Exception {
public void invoke(Collector<String> collector) throws Exception {
for (int i = 0; i < 10; i++) {
collector.collect(new Tuple1<String>(Integer.toString(i)));
collector.collect(new String(Integer.toString(i)));
}
collector.collect(new Tuple1<String>("q"));
collector.collect(new String("q"));
}
}
public static final class MyKafkaSource extends KafkaSource<Tuple1<String>> {
public static final class MyKafkaSource extends KafkaSource<String> {
private static final long serialVersionUID = 1L;
public MyKafkaSource(String zkQuorum, String groupId, String topicId, int numThreads) {
......@@ -50,17 +49,17 @@ public class KafkaTopology {
}
@Override
public Tuple1<String> deserialize(byte[] msg) {
public String deserialize(byte[] msg) {
String s = new String(msg);
if (s.equals("q")) {
closeWithoutSend();
}
return new Tuple1<String>(s);
return new String(s);
}
}
public static final class MyKafkaSink extends KafkaSink<Tuple1<String>, String> {
public static final class MyKafkaSink extends KafkaSink<String, String> {
private static final long serialVersionUID = 1L;
public MyKafkaSink(String topicId, String brokerAddr) {
......@@ -68,20 +67,20 @@ public class KafkaTopology {
}
@Override
public String serialize(Tuple1<String> tuple) {
if (tuple.f0.equals("q")) {
public String serialize(String tuple) {
if (tuple.equals("q")) {
sendAndClose();
}
return tuple.f0;
return tuple;
}
}
public static final class MyKafkaPrintSink implements SinkFunction<Tuple1<String>> {
public static final class MyKafkaPrintSink implements SinkFunction<String> {
private static final long serialVersionUID = 1L;
@Override
public void invoke(Tuple1<String> value) {
public void invoke(String value) {
if (LOG.isInfoEnabled()) {
LOG.info("String: <{}> arrived from Kafka", value);
}
......@@ -95,12 +94,12 @@ public class KafkaTopology {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
@SuppressWarnings("unused")
DataStream<Tuple1<String>> stream1 = env
DataStream<String> stream1 = env
.addSource(new MyKafkaSource("localhost:2181", "group", "test", 1), SOURCE_PARALELISM)
.addSink(new MyKafkaPrintSink());
@SuppressWarnings("unused")
DataStream<Tuple1<String>> stream2 = env
DataStream<String> stream2 = env
.addSource(new MySource())
.addSink(new MyKafkaSink("test", "localhost:9092"));
......
......@@ -17,8 +17,7 @@
package org.apache.flink.streaming.api.function.source;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Collector;
/**
* Source Function used to generate the number sequence
......@@ -30,7 +29,6 @@ public class GenSequenceFunction implements SourceFunction<Long> {
long from;
long to;
Tuple1<Long> outTuple = new Tuple1<Long>();
public GenSequenceFunction(long from, long to) {
this.from = from;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册