提交 3f1af0e3 编写于 作者: G Gyula Fora

[streaming] Updated connector type handling to suport generic classes by...

[streaming] Updated connector type handling to suport generic classes by GenericSourceFunction interface
上级 2425885c
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.function.source.GenericSourceFunction;
import org.apache.flink.streaming.api.function.source.RichParallelSourceFunction;
import org.apache.flink.streaming.connectors.util.DeserializationSchema;
public abstract class ConnectorSource<OUT> extends RichParallelSourceFunction<OUT> implements
GenericSourceFunction<OUT> {
private static final long serialVersionUID = 1L;
protected DeserializationSchema<OUT> schema;
public ConnectorSource(DeserializationSchema<OUT> schema) {
this.schema = schema;
}
@Override
public TypeInformation<OUT> getType() {
return TypeExtractor.createTypeInfo(DeserializationSchema.class, schema.getClass(), 0,
null, null);
}
}
......@@ -17,9 +17,10 @@
package org.apache.flink.streaming.connectors.flume;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.function.sink.RichSinkFunction;
import org.apache.flink.streaming.connectors.util.SerializationScheme;
import org.apache.flink.streaming.connectors.util.SerializationSchema;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.FlumeException;
......@@ -38,12 +39,12 @@ public class FlumeSink<IN> extends RichSinkFunction<IN> {
boolean initDone = false;
String host;
int port;
SerializationScheme<IN, byte[]> scheme;
SerializationSchema<IN, byte[]> scheme;
public FlumeSink(String host, int port, SerializationScheme<IN, byte[]> scheme) {
public FlumeSink(String host, int port, SerializationSchema<IN, byte[]> schema) {
this.host = host;
this.port = port;
this.scheme = scheme;
this.scheme = schema;
}
/**
......@@ -56,11 +57,6 @@ public class FlumeSink<IN> extends RichSinkFunction<IN> {
@Override
public void invoke(IN value) {
if (!initDone) {
client = new FlinkRpcClientFacade();
client.init(host, port);
}
byte[] data = scheme.serialize(value);
client.sendDataToFlume(data);
......@@ -136,4 +132,10 @@ public class FlumeSink<IN> extends RichSinkFunction<IN> {
public void close() {
client.client.close();
}
@Override
public void open(Configuration config) {
client = new FlinkRpcClientFacade();
client.init(host, port);
}
}
......@@ -20,8 +20,8 @@ package org.apache.flink.streaming.connectors.flume;
import java.util.List;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.function.source.ParallelSourceFunction;
import org.apache.flink.streaming.connectors.util.DeserializationScheme;
import org.apache.flink.streaming.connectors.ConnectorSource;
import org.apache.flink.streaming.connectors.util.DeserializationSchema;
import org.apache.flink.util.Collector;
import org.apache.flume.Context;
import org.apache.flume.channel.ChannelProcessor;
......@@ -29,18 +29,17 @@ import org.apache.flume.source.AvroSource;
import org.apache.flume.source.avro.AvroFlumeEvent;
import org.apache.flume.source.avro.Status;
public class FlumeSource<OUT> implements ParallelSourceFunction<OUT> {
public class FlumeSource<OUT> extends ConnectorSource<OUT> {
private static final long serialVersionUID = 1L;
String host;
String port;
DeserializationScheme<OUT> scheme;
volatile boolean finished = false;
FlumeSource(String host, int port, DeserializationScheme<OUT> scheme) {
FlumeSource(String host, int port, DeserializationSchema<OUT> deserializationSchema) {
super(deserializationSchema);
this.host = host;
this.port = Integer.toString(port);
this.scheme = scheme;
}
public class MyAvroSource extends AvroSource {
......@@ -87,9 +86,9 @@ public class FlumeSource<OUT> implements ParallelSourceFunction<OUT> {
*/
private void collect(AvroFlumeEvent avroEvent) {
byte[] b = avroEvent.getBody().array();
OUT out = FlumeSource.this.scheme.deserialize(b);
OUT out = FlumeSource.this.schema.deserialize(b);
if (scheme.isEndOfStream(out)) {
if (schema.isEndOfStream(out)) {
FlumeSource.this.finished = true;
this.stop();
FlumeSource.this.notifyAll();
......
......@@ -19,8 +19,8 @@ package org.apache.flink.streaming.connectors.flume;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.util.SerializationScheme;
import org.apache.flink.streaming.connectors.util.SimpleStringScheme;
import org.apache.flink.streaming.connectors.util.SerializationSchema;
import org.apache.flink.streaming.connectors.util.SimpleStringSchema;
public class FlumeTopology {
......@@ -30,13 +30,13 @@ public class FlumeTopology {
@SuppressWarnings("unused")
DataStream<String> dataStream1 = env.addSource(
new FlumeSource<String>("localhost", 41414, new SimpleStringScheme())).addSink(
new FlumeSource<String>("localhost", 41414, new SimpleStringSchema())).addSink(
new FlumeSink<String>("localhost", 42424, new StringToByteSerializer()));
env.execute();
}
public static class StringToByteSerializer implements SerializationScheme<String, byte[]> {
public static class StringToByteSerializer implements SerializationSchema<String, byte[]> {
private static final long serialVersionUID = 1L;
......
......@@ -24,7 +24,7 @@ import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import org.apache.flink.streaming.api.function.sink.RichSinkFunction;
import org.apache.flink.streaming.connectors.util.SerializationScheme;
import org.apache.flink.streaming.connectors.util.SerializationSchema;
public class KafkaSink<IN, OUT> extends RichSinkFunction<IN> {
private static final long serialVersionUID = 1L;
......@@ -34,13 +34,13 @@ public class KafkaSink<IN, OUT> extends RichSinkFunction<IN> {
private String topicId;
private String brokerAddr;
private boolean initDone = false;
private SerializationScheme<IN, OUT> scheme;
private SerializationSchema<IN, OUT> scheme;
public KafkaSink(String topicId, String brokerAddr,
SerializationScheme<IN, OUT> serializationScheme) {
SerializationSchema<IN, OUT> serializationSchema) {
this.topicId = topicId;
this.brokerAddr = brokerAddr;
this.scheme = serializationScheme;
this.scheme = serializationSchema;
}
......
......@@ -29,27 +29,26 @@ import kafka.javaapi.consumer.ConsumerConnector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.function.source.RichParallelSourceFunction;
import org.apache.flink.streaming.connectors.util.DeserializationScheme;
import org.apache.flink.streaming.connectors.ConnectorSource;
import org.apache.flink.streaming.connectors.util.DeserializationSchema;
import org.apache.flink.util.Collector;
public class KafkaSource<OUT> extends RichParallelSourceFunction<OUT> {
public class KafkaSource<OUT> extends ConnectorSource<OUT> {
private static final long serialVersionUID = 1L;
private final String zkQuorum;
private final String groupId;
private final String topicId;
private ConsumerConnector consumer;
private DeserializationScheme<OUT> scheme;
OUT outTuple;
public KafkaSource(String zkQuorum, String groupId, String topicId,
DeserializationScheme<OUT> deserializationScheme) {
DeserializationSchema<OUT> deserializationSchema) {
super(deserializationSchema);
this.zkQuorum = zkQuorum;
this.groupId = groupId;
this.topicId = topicId;
this.scheme = deserializationScheme;
}
/**
......@@ -81,8 +80,8 @@ public class KafkaSource<OUT> extends RichParallelSourceFunction<OUT> {
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while (it.hasNext()) {
OUT out = scheme.deserialize(it.next().message());
if (scheme.isEndOfStream(out)) {
OUT out = schema.deserialize(it.next().message());
if (schema.isEndOfStream(out)) {
break;
}
collector.collect(out);
......
......@@ -19,15 +19,11 @@ package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.sink.SinkFunction;
import org.apache.flink.streaming.api.function.source.SourceFunction;
import org.apache.flink.streaming.connectors.util.SimpleStringScheme;
import org.apache.flink.streaming.connectors.util.SimpleStringSchema;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class KafkaTopology {
private static final Logger LOG = LoggerFactory.getLogger(KafkaTopology.class);
public static final class MySource implements SourceFunction<String> {
private static final long serialVersionUID = 1L;
......@@ -42,17 +38,6 @@ public class KafkaTopology {
}
}
public static final class MyKafkaPrintSink implements SinkFunction<String> {
private static final long serialVersionUID = 1L;
@Override
public void invoke(String value) {
if (LOG.isInfoEnabled()) {
LOG.info("String: <{}> arrived from Kafka", value);
}
}
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
......@@ -61,11 +46,11 @@ public class KafkaTopology {
DataStream<String> stream1 = env
.addSource(
new KafkaSource<String>("localhost:2181", "group", "test",
new SimpleStringScheme())).addSink(new MyKafkaPrintSink());
new SimpleStringSchema())).print();
@SuppressWarnings("unused")
DataStream<String> stream2 = env.addSource(new MySource()).addSink(
new KafkaSink<String, String>("test", "localhost:9092", new SimpleStringScheme()));
new KafkaSink<String, String>("test", "localhost:9092", new SimpleStringSchema()));
env.execute();
}
......
......@@ -21,7 +21,7 @@ import java.io.IOException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.function.sink.RichSinkFunction;
import org.apache.flink.streaming.connectors.util.SerializationScheme;
import org.apache.flink.streaming.connectors.util.SerializationSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -39,12 +39,12 @@ public class RMQSink<IN> extends RichSinkFunction<IN> {
private transient ConnectionFactory factory;
private transient Connection connection;
private transient Channel channel;
private SerializationScheme<IN, byte[]> scheme;
private SerializationSchema<IN, byte[]> scheme;
public RMQSink(String HOST_NAME, String QUEUE_NAME, SerializationScheme<IN, byte[]> scheme) {
public RMQSink(String HOST_NAME, String QUEUE_NAME, SerializationSchema<IN, byte[]> schema) {
this.HOST_NAME = HOST_NAME;
this.QUEUE_NAME = QUEUE_NAME;
this.scheme = scheme;
this.scheme = schema;
}
/**
......@@ -56,6 +56,7 @@ public class RMQSink<IN> extends RichSinkFunction<IN> {
try {
connection = factory.newConnection();
channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
} catch (IOException e) {
throw new RuntimeException(e);
......@@ -71,7 +72,6 @@ public class RMQSink<IN> extends RichSinkFunction<IN> {
@Override
public void invoke(IN value) {
try {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
byte[] msg = scheme.serialize(value);
channel.basicPublish("", QUEUE_NAME, null, msg);
......
......@@ -21,8 +21,8 @@ import java.io.IOException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.function.source.RichParallelSourceFunction;
import org.apache.flink.streaming.connectors.util.DeserializationScheme;
import org.apache.flink.streaming.connectors.ConnectorSource;
import org.apache.flink.streaming.connectors.util.DeserializationSchema;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -32,7 +32,7 @@ import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class RMQSource<OUT> extends RichParallelSourceFunction<OUT> {
public class RMQSource<OUT> extends ConnectorSource<OUT> {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(RMQSource.class);
......@@ -46,11 +46,11 @@ public class RMQSource<OUT> extends RichParallelSourceFunction<OUT> {
private transient QueueingConsumer consumer;
private transient QueueingConsumer.Delivery delivery;
private DeserializationScheme<OUT> scheme;
OUT out;
public RMQSource(String HOST_NAME, String QUEUE_NAME, DeserializationScheme<OUT> scheme) {
public RMQSource(String HOST_NAME, String QUEUE_NAME,
DeserializationSchema<OUT> deserializationSchema) {
super(deserializationSchema);
this.HOST_NAME = HOST_NAME;
this.QUEUE_NAME = QUEUE_NAME;
}
......@@ -92,8 +92,8 @@ public class RMQSource<OUT> extends RichParallelSourceFunction<OUT> {
}
}
out = scheme.deserialize(delivery.getBody());
if (scheme.isEndOfStream(out)) {
out = schema.deserialize(delivery.getBody());
if (schema.isEndOfStream(out)) {
break;
} else {
collector.collect(out);
......
......@@ -19,8 +19,8 @@ package org.apache.flink.streaming.connectors.rabbitmq;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.util.SerializationScheme;
import org.apache.flink.streaming.connectors.util.SimpleStringScheme;
import org.apache.flink.streaming.connectors.util.SerializationSchema;
import org.apache.flink.streaming.connectors.util.SimpleStringSchema;
public class RMQTopology {
......@@ -30,7 +30,7 @@ public class RMQTopology {
@SuppressWarnings("unused")
DataStream<String> dataStream1 = env.addSource(
new RMQSource<String>("localhost", "hello", new SimpleStringScheme())).print();
new RMQSource<String>("localhost", "hello", new SimpleStringSchema())).print();
@SuppressWarnings("unused")
DataStream<String> dataStream2 = env.fromElements("one", "two", "three", "four", "five",
......@@ -40,7 +40,7 @@ public class RMQTopology {
env.execute();
}
public static class StringToByteSerializer implements SerializationScheme<String, byte[]> {
public static class StringToByteSerializer implements SerializationSchema<String, byte[]> {
private static final long serialVersionUID = 1L;
......
......@@ -19,7 +19,7 @@ package org.apache.flink.streaming.connectors.util;
import java.io.Serializable;
public interface DeserializationScheme<T> extends Serializable {
public interface DeserializationSchema<T> extends Serializable {
/**
* Deserializes the incoming data.
......
......@@ -17,8 +17,8 @@
package org.apache.flink.streaming.connectors.util;
public class RawScheme implements DeserializationScheme<byte[]>,
SerializationScheme<byte[], byte[]> {
public class RawSchema implements DeserializationSchema<byte[]>,
SerializationSchema<byte[], byte[]> {
private static final long serialVersionUID = 1L;
......
......@@ -19,7 +19,7 @@ package org.apache.flink.streaming.connectors.util;
import java.io.Serializable;
public interface SerializationScheme<T,R> extends Serializable {
public interface SerializationSchema<T,R> extends Serializable {
/**
* Serializes the incoming element to a specified type.
......
......@@ -17,8 +17,8 @@
package org.apache.flink.streaming.connectors.util;
public class SimpleStringScheme implements DeserializationScheme<String>,
SerializationScheme<String, String> {
public class SimpleStringSchema implements DeserializationSchema<String>,
SerializationSchema<String, String> {
private static final long serialVersionUID = 1L;
......
......@@ -40,6 +40,7 @@ import org.apache.flink.streaming.api.function.source.FileSourceFunction;
import org.apache.flink.streaming.api.function.source.FileStreamFunction;
import org.apache.flink.streaming.api.function.source.FromElementsFunction;
import org.apache.flink.streaming.api.function.source.GenSequenceFunction;
import org.apache.flink.streaming.api.function.source.GenericSourceFunction;
import org.apache.flink.streaming.api.function.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.function.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.function.source.SocketTextStreamFunction;
......@@ -414,12 +415,17 @@ public abstract class StreamExecutionEnvironment {
* type of the returned stream
* @return the data stream constructed
*/
@SuppressWarnings("unchecked")
private <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function,
TypeInformation<OUT> outTypeInfo, String sourceName) {
if (outTypeInfo == null) {
outTypeInfo = TypeExtractor.createTypeInfo(SourceFunction.class, function.getClass(),
0, null, null);
if (function instanceof GenericSourceFunction) {
outTypeInfo = ((GenericSourceFunction<OUT>) function).getType();
} else {
outTypeInfo = TypeExtractor.createTypeInfo(SourceFunction.class,
function.getClass(), 0, null, null);
}
}
boolean isParallel = function instanceof ParallelSourceFunction;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.function.source;
import org.apache.flink.api.common.typeinfo.TypeInformation;
public interface GenericSourceFunction<T> {
public TypeInformation<T> getType();
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册