提交 2425885c 编写于 作者: G Gyula Fora

[streaming] Source parallelism + connector rework

上级 b9d0241f
......@@ -17,18 +17,19 @@
package org.apache.flink.streaming.connectors.flume;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.function.sink.SinkFunction;
import org.apache.flink.streaming.api.function.sink.RichSinkFunction;
import org.apache.flink.streaming.connectors.util.SerializationScheme;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.FlumeException;
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.event.EventBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class FlumeSink<IN> implements SinkFunction<IN> {
public class FlumeSink<IN> extends RichSinkFunction<IN> {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(FlumeSink.class);
......@@ -37,17 +38,17 @@ public abstract class FlumeSink<IN> implements SinkFunction<IN> {
boolean initDone = false;
String host;
int port;
private boolean sendAndClose = false;
private boolean closeWithoutSend = false;
SerializationScheme<IN, byte[]> scheme;
public FlumeSink(String host, int port) {
public FlumeSink(String host, int port, SerializationScheme<IN, byte[]> scheme) {
this.host = host;
this.port = port;
this.scheme = scheme;
}
/**
* Receives tuples from the Apache Flink {@link DataStream} and forwards them to
* Apache Flume.
* Receives tuples from the Apache Flink {@link DataStream} and forwards
* them to Apache Flume.
*
* @param value
* The tuple arriving from the datastream
......@@ -60,25 +61,11 @@ public abstract class FlumeSink<IN> implements SinkFunction<IN> {
client.init(host, port);
}
byte[] data = serialize(value);
if (!closeWithoutSend) {
client.sendDataToFlume(data);
}
if (sendAndClose) {
client.close();
}
byte[] data = scheme.serialize(value);
client.sendDataToFlume(data);
}
/**
* Serializes tuples into byte arrays.
*
* @param value
* The tuple used for the serialization
* @return The serialized byte array.
*/
public abstract byte[] serialize(IN value);
private class FlinkRpcClientFacade {
private RpcClient client;
private String hostname;
......@@ -99,7 +86,8 @@ public abstract class FlumeSink<IN> implements SinkFunction<IN> {
int initCounter = 0;
while (true) {
if (initCounter >= 90) {
throw new RuntimeException("Cannot establish connection with" + port + " at " + host);
throw new RuntimeException("Cannot establish connection with" + port + " at "
+ host);
}
try {
this.client = RpcClientFactory.getDefaultInstance(hostname, port);
......@@ -142,28 +130,10 @@ public abstract class FlumeSink<IN> implements SinkFunction<IN> {
}
}
/**
* Closes the RpcClient.
*/
public void close() {
client.close();
}
}
/**
* Closes the connection only when the next message is sent after this call.
*/
public void sendAndClose() {
sendAndClose = true;
}
/**
* Closes the connection immediately and no further data will be sent.
*/
public void closeWithoutSend() {
client.close();
closeWithoutSend = true;
@Override
public void close() {
client.client.close();
}
}
......@@ -20,7 +20,8 @@ package org.apache.flink.streaming.connectors.flume;
import java.util.List;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.function.source.RichSourceFunction;
import org.apache.flink.streaming.api.function.source.ParallelSourceFunction;
import org.apache.flink.streaming.connectors.util.DeserializationScheme;
import org.apache.flink.util.Collector;
import org.apache.flume.Context;
import org.apache.flume.channel.ChannelProcessor;
......@@ -28,15 +29,18 @@ import org.apache.flume.source.AvroSource;
import org.apache.flume.source.avro.AvroFlumeEvent;
import org.apache.flume.source.avro.Status;
public abstract class FlumeSource<OUT> extends RichSourceFunction<OUT> {
public class FlumeSource<OUT> implements ParallelSourceFunction<OUT> {
private static final long serialVersionUID = 1L;
String host;
String port;
DeserializationScheme<OUT> scheme;
volatile boolean finished = false;
FlumeSource(String host, int port) {
FlumeSource(String host, int port, DeserializationScheme<OUT> scheme) {
this.host = host;
this.port = Integer.toString(port);
this.scheme = scheme;
}
public class MyAvroSource extends AvroSource {
......@@ -48,7 +52,8 @@ public abstract class FlumeSource<OUT> extends RichSourceFunction<OUT> {
*
* @param avroEvent
* The event that should be sent to the dataStream
* @return A {@link Status}.OK message if sending the event was successful.
* @return A {@link Status}.OK message if sending the event was
* successful.
*/
@Override
public Status append(AvroFlumeEvent avroEvent) {
......@@ -82,30 +87,21 @@ public abstract class FlumeSource<OUT> extends RichSourceFunction<OUT> {
*/
private void collect(AvroFlumeEvent avroEvent) {
byte[] b = avroEvent.getBody().array();
OUT tuple = FlumeSource.this.deserialize(b);
if (!closeWithoutSend) {
collector.collect(tuple);
}
if (sendAndClose) {
sendDone = true;
OUT out = FlumeSource.this.scheme.deserialize(b);
if (scheme.isEndOfStream(out)) {
FlumeSource.this.finished = true;
this.stop();
FlumeSource.this.notifyAll();
} else {
collector.collect(out);
}
}
}
MyAvroSource avroSource;
private volatile boolean closeWithoutSend = false;
private boolean sendAndClose = false;
private volatile boolean sendDone = false;
/**
* Deserializes the incoming data.
*
* @param message
* The incoming message in a byte array
* @return The deserialized message in the required format.
*/
public abstract OUT deserialize(byte[] message);
/**
* Configures the AvroSource. Also sets the collector so the application can
......@@ -138,26 +134,9 @@ public abstract class FlumeSource<OUT> extends RichSourceFunction<OUT> {
public void invoke(Collector<OUT> collector) throws Exception {
configureAvroSource(collector);
avroSource.start();
while (true) {
if (closeWithoutSend || sendDone) {
break;
}
while (!finished) {
this.wait();
}
avroSource.stop();
}
/**
* Closes the connection only when the next message is sent after this call.
*/
public void sendAndClose() {
sendAndClose = true;
}
/**
* Closes the connection immediately and no further data will be sent.
*/
public void closeWithoutSend() {
closeWithoutSend = true;
}
}
......@@ -17,80 +17,33 @@
package org.apache.flink.streaming.connectors.flume;
import org.apache.commons.lang.SerializationUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.sink.SinkFunction;
import org.apache.flink.streaming.connectors.util.SerializationScheme;
import org.apache.flink.streaming.connectors.util.SimpleStringScheme;
public class FlumeTopology {
private static final Logger LOG = LoggerFactory.getLogger(FlumeTopology.class);
public static class MyFlumeSink extends FlumeSink<String> {
private static final long serialVersionUID = 1L;
public static void main(String[] args) throws Exception {
public MyFlumeSink(String host, int port) {
super(host, port);
}
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
@Override
public byte[] serialize(String tuple) {
if (tuple.equals("q")) {
try {
sendAndClose();
} catch (Exception e) {
throw new RuntimeException("Error while closing Flume connection with " + port
+ " at " + host, e);
}
}
return SerializationUtils.serialize(tuple);
}
@SuppressWarnings("unused")
DataStream<String> dataStream1 = env.addSource(
new FlumeSource<String>("localhost", 41414, new SimpleStringScheme())).addSink(
new FlumeSink<String>("localhost", 42424, new StringToByteSerializer()));
env.execute();
}
public static final class MyFlumePrintSink implements SinkFunction<String> {
private static final long serialVersionUID = 1L;
@Override
public void invoke(String value) {
if (LOG.isInfoEnabled()) {
LOG.info("String: <{}> arrived from Flume", value);
}
}
}
public static class StringToByteSerializer implements SerializationScheme<String, byte[]> {
public static class MyFlumeSource extends FlumeSource<String> {
private static final long serialVersionUID = 1L;
MyFlumeSource(String host, int port) {
super(host, port);
}
@Override
public String deserialize(byte[] msg) {
String s = (String) SerializationUtils.deserialize(msg);
if (s.equals("q")) {
closeWithoutSend();
}
return s;
public byte[] serialize(String element) {
return element.getBytes();
}
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
@SuppressWarnings("unused")
DataStream<String> dataStream1 = env.addSource(new MyFlumeSource("localhost", 41414))
.addSink(new MyFlumePrintSink());
@SuppressWarnings("unused")
DataStream<String> dataStream2 = env.fromElements("one", "two", "three", "four", "five",
"q").addSink(new MyFlumeSink("localhost", 42424));
env.execute();
}
}
......@@ -23,22 +23,24 @@ import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import org.apache.flink.streaming.api.function.sink.SinkFunction;
import org.apache.flink.streaming.api.function.sink.RichSinkFunction;
import org.apache.flink.streaming.connectors.util.SerializationScheme;
public abstract class KafkaSink<IN, OUT> implements SinkFunction<IN> {
public class KafkaSink<IN, OUT> extends RichSinkFunction<IN> {
private static final long serialVersionUID = 1L;
private kafka.javaapi.producer.Producer<Integer, OUT> producer;
private Properties props;
private String topicId;
private String brokerAddr;
private boolean sendAndClose = false;
private boolean closeWithoutSend = false;
private boolean initDone = false;
private SerializationScheme<IN, OUT> scheme;
public KafkaSink(String topicId, String brokerAddr) {
public KafkaSink(String topicId, String brokerAddr,
SerializationScheme<IN, OUT> serializationScheme) {
this.topicId = topicId;
this.brokerAddr = brokerAddr;
this.scheme = serializationScheme;
}
......@@ -60,49 +62,22 @@ public abstract class KafkaSink<IN, OUT> implements SinkFunction<IN> {
/**
* Called when new data arrives to the sink, and forwards it to Kafka.
*
* @param value
* @param next
* The incoming data
*/
@Override
public void invoke(IN value) {
public void invoke(IN next) {
if (!initDone) {
initialize();
}
OUT out = serialize(value);
KeyedMessage<Integer, OUT> data = new KeyedMessage<Integer, OUT>(topicId, out);
producer.send(new KeyedMessage<Integer, OUT>(topicId, scheme.serialize(next)));
if (!closeWithoutSend) {
producer.send(data);
}
if (sendAndClose) {
producer.close();
}
}
/**
* Serializes tuples into byte arrays.
*
* @param value
* The tuple used for the serialization
* @return The serialized byte array.
*/
public abstract OUT serialize(IN value);
/**
* Closes the connection immediately and no further data will be sent.
*/
public void closeWithoutSend() {
@Override
public void close() {
producer.close();
closeWithoutSend = true;
}
/**
* Closes the connection only when the next message is sent after this call.
*/
public void sendAndClose() {
sendAndClose = true;
}
}
......@@ -17,7 +17,7 @@
package org.apache.flink.streaming.connectors.kafka;
import java.util.HashMap;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
......@@ -27,28 +27,29 @@ import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.function.source.RichSourceFunction;
import org.apache.flink.streaming.api.function.source.RichParallelSourceFunction;
import org.apache.flink.streaming.connectors.util.DeserializationScheme;
import org.apache.flink.util.Collector;
public abstract class KafkaSource<OUT> extends RichSourceFunction<OUT> {
public class KafkaSource<OUT> extends RichParallelSourceFunction<OUT> {
private static final long serialVersionUID = 1L;
private final String zkQuorum;
private final String groupId;
private final String topicId;
private final int numThreads;
private ConsumerConnector consumer;
private boolean closeWithoutSend = false;
private boolean sendAndClose = false;
private DeserializationScheme<OUT> scheme;
OUT outTuple;
public KafkaSource(String zkQuorum, String groupId, String topicId, int numThreads) {
public KafkaSource(String zkQuorum, String groupId, String topicId,
DeserializationScheme<OUT> deserializationScheme) {
this.zkQuorum = zkQuorum;
this.groupId = groupId;
this.topicId = topicId;
this.numThreads = numThreads;
this.scheme = deserializationScheme;
}
/**
......@@ -58,7 +59,7 @@ public abstract class KafkaSource<OUT> extends RichSourceFunction<OUT> {
Properties props = new Properties();
props.put("zookeeper.connect", zkQuorum);
props.put("group.id", groupId);
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.session.timeout.ms", "2000");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
......@@ -72,48 +73,25 @@ public abstract class KafkaSource<OUT> extends RichSourceFunction<OUT> {
*/
@Override
public void invoke(Collector<OUT> collector) throws Exception {
initializeConnection();
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topicId, numThreads);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer
.createMessageStreams(topicCountMap);
.createMessageStreams(Collections.singletonMap(topicId, 1));
KafkaStream<byte[], byte[]> stream = consumerMap.get(topicId).get(0);
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while (it.hasNext()) {
OUT out = deserialize(it.next().message());
if (closeWithoutSend) {
OUT out = scheme.deserialize(it.next().message());
if (scheme.isEndOfStream(out)) {
break;
}
collector.collect(out);
if (sendAndClose) {
break;
}
}
consumer.shutdown();
}
/**
* Deserializes the incoming data.
*
* @param message
* The incoming message in a byte array
* @return The deserialized message in the required format.
*/
public abstract OUT deserialize(byte[] message);
/**
* Closes the connection immediately and no further data will be sent.
*/
public void closeWithoutSend() {
closeWithoutSend = true;
}
/**
* Closes the connection only when the next message is sent after this call.
*/
public void sendAndClose() {
sendAndClose = true;
@Override
public void open(Configuration config) {
initializeConnection();
}
}
......@@ -21,13 +21,14 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.sink.SinkFunction;
import org.apache.flink.streaming.api.function.source.SourceFunction;
import org.apache.flink.streaming.connectors.util.SimpleStringScheme;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class KafkaTopology {
private static final Logger LOG = LoggerFactory.getLogger(KafkaTopology.class);
public static final class MySource implements SourceFunction<String> {
private static final long serialVersionUID = 1L;
......@@ -41,41 +42,6 @@ public class KafkaTopology {
}
}
public static final class MyKafkaSource extends KafkaSource<String> {
private static final long serialVersionUID = 1L;
public MyKafkaSource(String zkQuorum, String groupId, String topicId, int numThreads) {
super(zkQuorum, groupId, topicId, numThreads);
}
@Override
public String deserialize(byte[] msg) {
String s = new String(msg);
if (s.equals("q")) {
closeWithoutSend();
}
return new String(s);
}
}
public static final class MyKafkaSink extends KafkaSink<String, String> {
private static final long serialVersionUID = 1L;
public MyKafkaSink(String topicId, String brokerAddr) {
super(topicId, brokerAddr);
}
@Override
public String serialize(String tuple) {
if (tuple.equals("q")) {
sendAndClose();
}
return tuple;
}
}
public static final class MyKafkaPrintSink implements SinkFunction<String> {
private static final long serialVersionUID = 1L;
......@@ -87,21 +53,19 @@ public class KafkaTopology {
}
}
private static final int SOURCE_PARALELISM = 1;
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
@SuppressWarnings("unused")
DataStream<String> stream1 = env
.addSource(new MyKafkaSource("localhost:2181", "group", "test", 1)).setParallelism(1)
.addSink(new MyKafkaPrintSink());
.addSource(
new KafkaSource<String>("localhost:2181", "group", "test",
new SimpleStringScheme())).addSink(new MyKafkaPrintSink());
@SuppressWarnings("unused")
DataStream<String> stream2 = env
.addSource(new MySource())
.addSink(new MyKafkaSink("test", "localhost:9092"));
DataStream<String> stream2 = env.addSource(new MySource()).addSink(
new KafkaSink<String, String>("test", "localhost:9092", new SimpleStringScheme()));
env.execute();
}
......
......@@ -19,32 +19,32 @@ package org.apache.flink.streaming.connectors.rabbitmq;
import java.io.IOException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.function.sink.RichSinkFunction;
import org.apache.flink.streaming.connectors.util.SerializationScheme;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flink.streaming.api.function.sink.SinkFunction;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public abstract class RMQSink<IN> implements SinkFunction<IN> {
public class RMQSink<IN> extends RichSinkFunction<IN> {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(RMQSink.class);
private boolean sendAndClose = false;
private boolean closeWithoutSend = false;
private String QUEUE_NAME;
private String HOST_NAME;
private transient ConnectionFactory factory;
private transient Connection connection;
private transient Channel channel;
private boolean initDone = false;
private SerializationScheme<IN, byte[]> scheme;
public RMQSink(String HOST_NAME, String QUEUE_NAME) {
public RMQSink(String HOST_NAME, String QUEUE_NAME, SerializationScheme<IN, byte[]> scheme) {
this.HOST_NAME = HOST_NAME;
this.QUEUE_NAME = QUEUE_NAME;
this.scheme = scheme;
}
/**
......@@ -60,8 +60,6 @@ public abstract class RMQSink<IN> implements SinkFunction<IN> {
} catch (IOException e) {
throw new RuntimeException(e);
}
initDone = true;
}
/**
......@@ -72,36 +70,20 @@ public abstract class RMQSink<IN> implements SinkFunction<IN> {
*/
@Override
public void invoke(IN value) {
if (!initDone) {
initializeConnection();
}
try {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
byte[] msg = serialize(value);
if (!closeWithoutSend) {
channel.basicPublish("", QUEUE_NAME, null, msg);
}
byte[] msg = scheme.serialize(value);
channel.basicPublish("", QUEUE_NAME, null, msg);
} catch (IOException e) {
if (LOG.isErrorEnabled()) {
LOG.error("Cannot send RMQ message {} at {}", QUEUE_NAME, HOST_NAME);
}
}
if (sendAndClose) {
closeChannel();
}
}
/**
* Serializes tuples into byte arrays.
*
* @param value
* The tuple used for the serialization
* @return The serialized byte array.
*/
public abstract byte[] serialize(IN value);
/**
* Closes the connection.
*/
......@@ -110,25 +92,20 @@ public abstract class RMQSink<IN> implements SinkFunction<IN> {
channel.close();
connection.close();
} catch (IOException e) {
throw new RuntimeException("Error while closing RMQ connection with " + QUEUE_NAME + " at "
+ HOST_NAME, e);
throw new RuntimeException("Error while closing RMQ connection with " + QUEUE_NAME
+ " at " + HOST_NAME, e);
}
}
/**
* Closes the connection immediately and no further data will be sent.
*/
public void closeWithoutSend() {
closeChannel();
closeWithoutSend = true;
@Override
public void open(Configuration config) {
initializeConnection();
}
/**
* Closes the connection only when the next message is sent after this call.
*/
public void sendAndClose() {
sendAndClose = true;
@Override
public void close() {
closeChannel();
}
}
......@@ -19,8 +19,10 @@ package org.apache.flink.streaming.connectors.rabbitmq;
import java.io.IOException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.function.source.RichSourceFunction;
import org.apache.flink.streaming.api.function.source.RichParallelSourceFunction;
import org.apache.flink.streaming.connectors.util.DeserializationScheme;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -30,15 +32,13 @@ import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public abstract class RMQSource<OUT> extends RichSourceFunction<OUT> {
public class RMQSource<OUT> extends RichParallelSourceFunction<OUT> {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(RMQSource.class);
private final String QUEUE_NAME;
private final String HOST_NAME;
private boolean closeWithoutSend = false;
private boolean sendAndClose = false;
private transient ConnectionFactory factory;
private transient Connection connection;
......@@ -46,9 +46,11 @@ public abstract class RMQSource<OUT> extends RichSourceFunction<OUT> {
private transient QueueingConsumer consumer;
private transient QueueingConsumer.Delivery delivery;
OUT outTuple;
private DeserializationScheme<OUT> scheme;
public RMQSource(String HOST_NAME, String QUEUE_NAME) {
OUT out;
public RMQSource(String HOST_NAME, String QUEUE_NAME, DeserializationScheme<OUT> scheme) {
this.HOST_NAME = HOST_NAME;
this.QUEUE_NAME = QUEUE_NAME;
}
......@@ -79,7 +81,6 @@ public abstract class RMQSource<OUT> extends RichSourceFunction<OUT> {
*/
@Override
public void invoke(Collector<OUT> collector) throws Exception {
initializeConnection();
while (true) {
......@@ -91,17 +92,23 @@ public abstract class RMQSource<OUT> extends RichSourceFunction<OUT> {
}
}
outTuple = deserialize(delivery.getBody());
if (closeWithoutSend) {
out = scheme.deserialize(delivery.getBody());
if (scheme.isEndOfStream(out)) {
break;
} else {
collector.collect(outTuple);
}
if (sendAndClose) {
break;
collector.collect(out);
}
}
}
@Override
public void open(Configuration config) {
initializeConnection();
}
@Override
public void close() {
try {
connection.close();
} catch (IOException e) {
......@@ -111,27 +118,4 @@ public abstract class RMQSource<OUT> extends RichSourceFunction<OUT> {
}
/**
* Deserializes the incoming data.
*
* @param message
* The incoming message in a byte array
* @return The deserialized message in the required format.
*/
public abstract OUT deserialize(byte[] message);
/**
* Closes the connection immediately and no further data will be sent.
*/
public void closeWithoutSend() {
closeWithoutSend = true;
}
/**
* Closes the connection only when the next message is sent after this call.
*/
public void sendAndClose() {
sendAndClose = true;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.rabbitmq;
import org.apache.commons.lang.SerializationUtils;
*/
package org.apache.flink.streaming.connectors.rabbitmq;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.sink.SinkFunction;
import org.apache.flink.streaming.connectors.kafka.KafkaTopology;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class RMQTopology {
private static final Logger LOG = LoggerFactory.getLogger(KafkaTopology.class);
public static final class MyRMQSink extends RMQSink<String> {
public MyRMQSink(String HOST_NAME, String QUEUE_NAME) {
super(HOST_NAME, QUEUE_NAME);
}
private static final long serialVersionUID = 1L;
@Override
public byte[] serialize(String t) {
if (t.equals("q")) {
sendAndClose();
}
return SerializationUtils.serialize((String) t);
}
}
import org.apache.flink.streaming.connectors.util.SerializationScheme;
import org.apache.flink.streaming.connectors.util.SimpleStringScheme;
public class RMQTopology {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
@SuppressWarnings("unused")
DataStream<String> dataStream1 = env.addSource(
new RMQSource<String>("localhost", "hello", new SimpleStringScheme())).print();
@SuppressWarnings("unused")
DataStream<String> dataStream2 = env.fromElements("one", "two", "three", "four", "five",
"q").addSink(
new RMQSink<String>("localhost", "hello", new StringToByteSerializer()));
env.execute();
}
public static class StringToByteSerializer implements SerializationScheme<String, byte[]> {
public static final class MyRMQPrintSink implements SinkFunction<String> {
private static final long serialVersionUID = 1L;
@Override
public void invoke(String value) {
if (LOG.isInfoEnabled()) {
LOG.info("String: <{}> arrived from RMQ", value);
}
public byte[] serialize(String element) {
return element.getBytes();
}
}
public static final class MyRMQSource extends RMQSource<String> {
public MyRMQSource(String HOST_NAME, String QUEUE_NAME) {
super(HOST_NAME, QUEUE_NAME);
}
private static final long serialVersionUID = 1L;
@Override
public String deserialize(byte[] t) {
String s = (String) SerializationUtils.deserialize(t);
if (s.equals("q")) {
closeWithoutSend();
}
return s;
}
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
@SuppressWarnings("unused")
DataStream<String> dataStream1 = env
.addSource(new MyRMQSource("localhost", "hello"))
.addSink(new MyRMQPrintSink());
@SuppressWarnings("unused")
DataStream<String> dataStream2 = env
.fromElements("one", "two", "three", "four", "five", "q")
.addSink(new MyRMQSink("localhost", "hello"));
env.execute();
}
}
}
......@@ -25,7 +25,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.function.source.RichSourceFunction;
import org.apache.flink.streaming.api.function.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.function.source.SourceFunction;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
......@@ -43,7 +43,7 @@ import com.twitter.hbc.httpclient.auth.OAuth1;
* Implementation of {@link SourceFunction} specialized to emit tweets from
* Twitter. It can connect to Twitter Streaming API, collect tweets and
*/
public class TwitterSource extends RichSourceFunction<String> {
public class TwitterSource extends RichParallelSourceFunction<String> {
private static final Logger LOG = LoggerFactory.getLogger(TwitterSource.class);
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.util;
import java.io.Serializable;
public interface DeserializationScheme<T> extends Serializable {
/**
* Deserializes the incoming data.
*
* @param message
* The incoming message in a byte array
* @return The deserialized message in the required format.
*/
public T deserialize(byte[] message);
/**
* Method to decide whether the element signals the end of the stream. If
* true is returned the element won't be emitted
*
* @param nextElement
* The element to test for end signal
* @return The end signal, if true the stream shuts down
*/
public boolean isEndOfStream(T nextElement);
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.util;
public class RawScheme implements DeserializationScheme<byte[]>,
SerializationScheme<byte[], byte[]> {
private static final long serialVersionUID = 1L;
@Override
public byte[] deserialize(byte[] message) {
return message;
}
@Override
public boolean isEndOfStream(byte[] nextElement) {
return false;
}
@Override
public byte[] serialize(byte[] element) {
return element;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.util;
import java.io.Serializable;
public interface SerializationScheme<T,R> extends Serializable {
/**
* Serializes the incoming element to a specified type.
*
* @param element
* The incoming element to be serialized
* @return The serialized element.
*/
public R serialize(T element);
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.util;
public class SimpleStringScheme implements DeserializationScheme<String>,
SerializationScheme<String, String> {
private static final long serialVersionUID = 1L;
@Override
public String deserialize(byte[] message) {
return new String(message);
}
@Override
public boolean isEndOfStream(String nextElement) {
return false;
}
@Override
public String serialize(String element) {
return element;
}
}
......@@ -1005,7 +1005,7 @@ public class DataStream<OUT> {
protected <R> DataStream<OUT> addIterationSource(Integer iterationID, long waitTime) {
DataStream<R> returnStream = new DataStreamSource<R>(environment, "iterationSource", null);
DataStream<R> returnStream = new DataStreamSource<R>(environment, "iterationSource", null, true);
jobGraphBuilder.addIterationHead(returnStream.getId(), this.getId(), iterationID,
degreeOfParallelism, waitTime);
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.datastream;
*/
package org.apache.flink.streaming.api.datastream;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* The DataStreamSource represents the starting point of a DataStream.
*
* @param <OUT>
* Type of the DataStream created.
*/
public class DataStreamSource<OUT> extends SingleOutputStreamOperator<OUT, DataStreamSource<OUT>> {
public DataStreamSource(StreamExecutionEnvironment environment, String operatorType, TypeInformation<OUT> outTypeInfo) {
super(environment, operatorType, outTypeInfo);
}
public DataStreamSource(DataStream<OUT> dataStream) {
super(dataStream);
}
}
/**
* The DataStreamSource represents the starting point of a DataStream.
*
* @param <OUT>
* Type of the DataStream created.
*/
public class DataStreamSource<OUT> extends SingleOutputStreamOperator<OUT, DataStreamSource<OUT>> {
boolean isParallel;
public DataStreamSource(StreamExecutionEnvironment environment, String operatorType,
TypeInformation<OUT> outTypeInfo, boolean isParallel) {
super(environment, operatorType, outTypeInfo);
this.isParallel = isParallel;
}
@Override
public DataStreamSource<OUT> setParallelism(int dop) {
if (dop > 1 && !isParallel) {
throw new IllegalArgumentException("Source: " + this.id + " is not a parallel source");
} else {
return (DataStreamSource<OUT>) super.setParallelism(dop);
}
}
}
......@@ -40,6 +40,8 @@ import org.apache.flink.streaming.api.function.source.FileSourceFunction;
import org.apache.flink.streaming.api.function.source.FileStreamFunction;
import org.apache.flink.streaming.api.function.source.FromElementsFunction;
import org.apache.flink.streaming.api.function.source.GenSequenceFunction;
import org.apache.flink.streaming.api.function.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.function.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.function.source.SocketTextStreamFunction;
import org.apache.flink.streaming.api.function.source.SourceFunction;
import org.apache.flink.streaming.api.invokable.SourceInvokable;
......@@ -222,7 +224,7 @@ public abstract class StreamExecutionEnvironment {
*/
public DataStreamSource<String> readTextStream(String filePath) {
checkIfFileExists(filePath);
return addSource(new FileStreamFunction(filePath));
return addSource(new FileStreamFunction(filePath), null, "textStream");
}
private static void checkIfFileExists(String filePath) {
......@@ -260,14 +262,10 @@ public abstract class StreamExecutionEnvironment {
}
TypeInformation<OUT> outTypeInfo = TypeExtractor.getForObject(data[0]);
DataStreamSource<OUT> returnStream = new DataStreamSource<OUT>(this, "elements",
outTypeInfo);
SourceFunction<OUT> function = new FromElementsFunction<OUT>(data);
jobGraphBuilder.addStreamVertex(returnStream.getId(), new SourceInvokable<OUT>(function),
null, outTypeInfo, "source", 1);
return returnStream;
return addSource(function, outTypeInfo, "elements");
}
/**
......@@ -292,13 +290,9 @@ public abstract class StreamExecutionEnvironment {
}
TypeInformation<OUT> outTypeInfo = TypeExtractor.getForObject(data.iterator().next());
DataStreamSource<OUT> returnStream = new DataStreamSource<OUT>(this, "collection",
outTypeInfo);
jobGraphBuilder.addStreamVertex(returnStream.getId(), new SourceInvokable<OUT>(
new FromElementsFunction<OUT>(data)), null, outTypeInfo, "source", 1);
SourceFunction<OUT> function = new FromElementsFunction<OUT>(data);
return returnStream;
return addSource(function, outTypeInfo, "collection");
}
/**
......@@ -316,7 +310,8 @@ public abstract class StreamExecutionEnvironment {
* @return A DataStream, containing the strings received from socket.
*/
public DataStreamSource<String> socketTextStream(String hostname, int port, char delimiter) {
return addSource(new SocketTextStreamFunction(hostname, port, delimiter));
return addSource(new SocketTextStreamFunction(hostname, port, delimiter), null,
"socketStrean");
}
/**
......@@ -348,20 +343,26 @@ public abstract class StreamExecutionEnvironment {
if (from > to) {
throw new IllegalArgumentException("Start of sequence must not be greater than the end");
}
return addSource(new GenSequenceFunction(from, to));
return addSource(new GenSequenceFunction(from, to), null, "sequence");
}
private DataStreamSource<String> addFileSource(InputFormat<String, ?> inputFormat,
TypeInformation<String> typeInfo) {
FileSourceFunction function = new FileSourceFunction(inputFormat, typeInfo);
DataStreamSource<String> returnStream = addSource(function);
DataStreamSource<String> returnStream = addSource(function, null, "fileSource");
jobGraphBuilder.setInputFormat(returnStream.getId(), inputFormat);
return returnStream;
}
/**
* Create a DataStream using a user defined source function for arbitrary
* source functionality.
* source functionality.</p> By default sources have a parallelism of 1. To
* enable parallel execution, the user defined source should implement
* {@link ParallelSourceFunction} or extend
* {@link RichParallelSourceFunction}. In these cases the resulting source
* will have the parallelism of the environment. To change this afterwards
* call {@link DataStreamSource#setParallelism(int)}
*
*
* @param function
* the user defined function
......@@ -370,22 +371,19 @@ public abstract class StreamExecutionEnvironment {
* @return the data stream constructed
*/
public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function) {
TypeInformation<OUT> outTypeInfo = TypeExtractor.createTypeInfo(SourceFunction.class,
function.getClass(), 0, null, null);
DataStreamSource<OUT> returnStream = new DataStreamSource<OUT>(this, "source", outTypeInfo);
jobGraphBuilder.addStreamVertex(returnStream.getId(), new SourceInvokable<OUT>(function),
null, outTypeInfo, "source", 1);
return returnStream;
return addSource(function, null);
}
/**
* Ads a data source with a custom type information thus opening a
* {@link DataStream}. Only in very special cases does the user need to
* support type information. Otherwise use
* {@link #addSource(SourceFunction)}
* {@link #addSource(SourceFunction)} </p> By default sources have a
* parallelism of 1. To enable parallel execution, the user defined source
* should implement {@link ParallelSourceFunction} or extend
* {@link RichParallelSourceFunction}. In these cases the resulting source
* will have the parallelism of the environment. To change this afterwards
* call {@link DataStreamSource#setParallelism(int)}
*
* @param function
* the user defined function
......@@ -397,11 +395,41 @@ public abstract class StreamExecutionEnvironment {
*/
public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function,
TypeInformation<OUT> outTypeInfo) {
return addSource(function, outTypeInfo, function.getClass().getName());
}
/**
* Ads a data source with a custom type information thus opening a
* {@link DataStream}. Only in very special cases does the user need to
* support type information. Otherwise use
* {@link #addSource(SourceFunction)}
*
* @param function
* the user defined function
* @param outTypeInfo
* the user defined type information for the stream
* @param sourceName
* Name of the data source
* @param <OUT>
* type of the returned stream
* @return the data stream constructed
*/
private <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function,
TypeInformation<OUT> outTypeInfo, String sourceName) {
if (outTypeInfo == null) {
outTypeInfo = TypeExtractor.createTypeInfo(SourceFunction.class, function.getClass(),
0, null, null);
}
boolean isParallel = function instanceof ParallelSourceFunction;
int dop = isParallel ? getDegreeOfParallelism() : 1;
DataStreamSource<OUT> returnStream = new DataStreamSource<OUT>(this, "source", outTypeInfo);
DataStreamSource<OUT> returnStream = new DataStreamSource<OUT>(this, sourceName,
outTypeInfo, isParallel);
jobGraphBuilder.addStreamVertex(returnStream.getId(), new SourceInvokable<OUT>(function),
null, outTypeInfo, "source", 1);
null, outTypeInfo, sourceName, dop);
return returnStream;
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.function.source;
*/
package org.apache.flink.streaming.api.function.source;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
/**
* Source Function used to generate the number sequence
*
*/
public class GenSequenceFunction implements SourceFunction<Long> {
private static final long serialVersionUID = 1L;
long from;
long to;
public GenSequenceFunction(long from, long to) {
this.from = from;
this.to = to;
}
@Override
public void invoke(Collector<Long> collector) throws Exception {
for (long i = from; i <= to; i++) {
collector.collect(i);
}
}
}
import org.apache.flink.util.NumberSequenceIterator;
/**
* Source Function used to generate the number sequence
*
*/
public class GenSequenceFunction extends RichParallelSourceFunction<Long> {
private static final long serialVersionUID = 1L;
private NumberSequenceIterator fullIterator;
private NumberSequenceIterator splitIterator;
public GenSequenceFunction(long from, long to) {
fullIterator = new NumberSequenceIterator(from, to);
}
@Override
public void invoke(Collector<Long> collector) throws Exception {
while (splitIterator.hasNext()) {
collector.collect(splitIterator.next());
}
}
@Override
public void open(Configuration config) {
int splitNumber = getRuntimeContext().getIndexOfThisSubtask();
int numOfSubTasks = getRuntimeContext().getNumberOfParallelSubtasks();
splitIterator = fullIterator.split(numOfSubTasks)[splitNumber];
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.function.source;
import org.apache.flink.util.Collector;
public interface ParallelSourceFunction<OUT> extends SourceFunction<OUT> {
public void invoke(Collector<OUT> collector) throws Exception;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.function.source;
import org.apache.flink.api.common.functions.AbstractRichFunction;
public abstract class RichParallelSourceFunction<OUT> extends AbstractRichFunction implements
ParallelSourceFunction<OUT> {
private static final long serialVersionUID = 1L;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.function.source;
import org.apache.flink.api.common.functions.AbstractRichFunction;
public abstract class RichSourceFunction<OUT> extends AbstractRichFunction implements
SourceFunction<OUT> {
private static final long serialVersionUID = 1L;
}
*/
package org.apache.flink.streaming.api.function.source;
import org.apache.flink.api.common.functions.AbstractRichFunction;
public abstract class RichSourceFunction<OUT> extends AbstractRichFunction implements
SourceFunction<OUT> {
private static final long serialVersionUID = 1L;
}
......@@ -28,7 +28,6 @@ import java.util.Arrays;
import java.util.List;
import org.apache.flink.streaming.api.function.source.FromElementsFunction;
import org.apache.flink.streaming.api.function.source.GenSequenceFunction;
import org.apache.flink.streaming.api.function.source.SocketTextStreamFunction;
import org.apache.flink.streaming.util.MockCollector;
import org.apache.flink.streaming.util.MockSource;
......@@ -39,21 +38,16 @@ public class SourceTest {
@Test
public void fromElementsTest() {
List<Integer> expectedList = Arrays.asList(1, 2, 3);
List<Integer> actualList = MockSource.createAndExecute(new FromElementsFunction<Integer>(1, 2, 3));
List<Integer> actualList = MockSource.createAndExecute(new FromElementsFunction<Integer>(1,
2, 3));
assertEquals(expectedList, actualList);
}
@Test
public void fromCollectionTest() {
List<Integer> expectedList = Arrays.asList(1, 2, 3);
List<Integer> actualList = MockSource.createAndExecute(new FromElementsFunction<Integer>(Arrays.asList(1, 2, 3)));
assertEquals(expectedList, actualList);
}
@Test
public void genSequenceTest() {
List<Long> expectedList = Arrays.asList(1L, 2L, 3L);
List<Long> actualList = MockSource.createAndExecute(new GenSequenceFunction(1, 3));
List<Integer> actualList = MockSource.createAndExecute(new FromElementsFunction<Integer>(
Arrays.asList(1, 2, 3)));
assertEquals(expectedList, actualList);
}
......@@ -62,14 +56,15 @@ public class SourceTest {
List<String> expectedList = Arrays.asList("a", "b", "c");
List<String> actualList = new ArrayList<String>();
byte[] data = {'a', '\n', 'b', '\n', 'c', '\n'};
byte[] data = { 'a', '\n', 'b', '\n', 'c', '\n' };
Socket socket = mock(Socket.class);
when(socket.getInputStream()).thenReturn(new ByteArrayInputStream(data));
when(socket.isClosed()).thenReturn(false);
when(socket.isConnected()).thenReturn(true);
new SocketTextStreamFunction("", 0, '\n').streamFromSocket(new MockCollector<String>(actualList), socket);
new SocketTextStreamFunction("", 0, '\n').streamFromSocket(new MockCollector<String>(
actualList), socket);
assertEquals(expectedList, actualList);
}
}
......@@ -145,14 +145,11 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
data: Seq[T]): DataStream[T] = {
Validate.notNull(data, "Data must not be null.")
val typeInfo = implicitly[TypeInformation[T]]
val returnStream = new DataStreamSource[T](javaEnv,
"elements", typeInfo);
javaEnv.getJobGraphBuilder.addStreamVertex(returnStream.getId(),
new SourceInvokable[T](new FromElementsFunction[T](scala.collection.JavaConversions
.asJavaCollection(data))), null, typeInfo,
"source", 1);
returnStream
val sourceFunction = new FromElementsFunction[T](scala.collection.JavaConversions
.asJavaCollection(data))
javaEnv.addSource(sourceFunction, typeInfo)
}
/**
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册