From ce7a8996a5b1140a8a6b731b7ed86a233227d009 Mon Sep 17 00:00:00 2001 From: gyfora Date: Mon, 14 Jul 2014 16:29:16 +0200 Subject: [PATCH] [streaming] addSink added to newapi --- .../api/datastream/DataStream.java | 6 ++- .../api/datastream/SinkFunction.java | 13 +++++ .../api/datastream/SinkInvokable.java | 41 +++++++++++++++ .../StreamExecutionEnvironment.java | 50 +++++++++++-------- .../api/invokable/UserSinkInvokable.java | 1 + .../StreamComponentHelper.java | 5 +- .../streaming/api/FlatMapTest.java | 19 +++++-- .../stratosphere/streaming/api/MapTest.java | 8 +-- 8 files changed, 111 insertions(+), 32 deletions(-) create mode 100755 flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/SinkFunction.java create mode 100755 flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/SinkInvokable.java diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/DataStream.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/DataStream.java index b2965792de0..ec192874aeb 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/DataStream.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/DataStream.java @@ -78,12 +78,16 @@ public class DataStream { public DataStream flatMap(FlatMapFunction flatMapper) { return context.addFlatMapFunction(this, flatMapper); } + + public DataStream addSink(SinkFunction sinkFunction) { + return context.addSink(this, sinkFunction); + } public DataStream map(MapFunction mapper) { return context.addMapFunction(this, mapper); } - public DataStream addDummySink() { + public DataStream addDummySink() { return context.addDummySink(this); } diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/SinkFunction.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/SinkFunction.java new file mode 100755 index 00000000000..88358f1a62a --- /dev/null +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/SinkFunction.java @@ -0,0 +1,13 @@ +package eu.stratosphere.api.datastream; + +import java.io.Serializable; + +import eu.stratosphere.api.java.tuple.Tuple; + +public abstract class SinkFunction implements Serializable { + + private static final long serialVersionUID = 1L; + + public abstract void invoke(IN tuple); + +} diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/SinkInvokable.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/SinkInvokable.java new file mode 100755 index 00000000000..98619236ca6 --- /dev/null +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/SinkInvokable.java @@ -0,0 +1,41 @@ +/*********************************************************************************************************************** + * + * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + **********************************************************************************************************************/ + +package eu.stratosphere.api.datastream; + +import eu.stratosphere.api.java.tuple.Tuple; +import eu.stratosphere.streaming.api.StreamCollector; +import eu.stratosphere.streaming.api.invokable.UserSinkInvokable; +import eu.stratosphere.streaming.api.streamrecord.StreamRecord; + +public class SinkInvokable extends UserSinkInvokable { + private static final long serialVersionUID = 1L; + + private SinkFunction sinkFunction; + + public SinkInvokable(SinkFunction sinkFunction) { + this.sinkFunction = sinkFunction; + } + + @Override + public void invoke(StreamRecord record, StreamCollector collector) throws Exception { + int batchSize = record.getBatchSize(); + for (int i = 0; i < batchSize; i++) { + @SuppressWarnings("unchecked") + IN tuple = (IN) record.getTuple(i); + sinkFunction.invoke(tuple); + } + } +} diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/StreamExecutionEnvironment.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/StreamExecutionEnvironment.java index 88dbfc859a9..efe5844ea5b 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/StreamExecutionEnvironment.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/StreamExecutionEnvironment.java @@ -101,6 +101,28 @@ public class StreamExecutionEnvironment { return returnStream; } + public DataStream addSink(DataStream inputStream, + SinkFunction sinkFunction) { + DataStream returnStream = new DataStream(this); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream oos; + try { + oos = new ObjectOutputStream(baos); + oos.writeObject(sinkFunction); + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + + jobGraphBuilder.setSink("sink", new SinkInvokable(sinkFunction), "sink", + baos.toByteArray()); + + connectGraph(inputStream.connectIDs, "sink", inputStream.ctype, inputStream.cparam); + + return returnStream; + } + public DataStream addMapFunction( DataStream inputStream, final MapFunction mapper) { DataStream returnStream = new DataStream(this); @@ -120,39 +142,23 @@ public class StreamExecutionEnvironment { connectGraph(inputStream.connectIDs, returnStream.getId(), inputStream.ctype, inputStream.cparam); - + return returnStream; } - public static final class DummySink extends UserSinkInvokable> { + public static final class DummySink extends SinkFunction> { private static final long serialVersionUID = 1L; @Override - public void invoke(StreamRecord record, StreamCollector collector) throws Exception { - for (Tuple tuple : record.getBatchIterable()) { - System.out.println(tuple); - } + public void invoke(Tuple1 tuple) { + System.out.println(tuple); } } - public DataStream addDummySink(DataStream inputStream) { - - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - ObjectOutputStream oos; - try { - oos = new ObjectOutputStream(baos); - oos.writeObject(new DummySink()); - } catch (IOException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - - jobGraphBuilder.setSink("sink", new DummySink(), "sink", baos.toByteArray()); + public DataStream addDummySink(DataStream inputStream) { - connectGraph(inputStream.connectIDs, "sink", inputStream.ctype, - inputStream.cparam); - return new DataStream(this); + return addSink(inputStream, (SinkFunction) new DummySink()); } public void execute() { diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/invokable/UserSinkInvokable.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/invokable/UserSinkInvokable.java index 801267c12c1..8563eeddd87 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/invokable/UserSinkInvokable.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/invokable/UserSinkInvokable.java @@ -22,5 +22,6 @@ import eu.stratosphere.api.java.tuple.Tuple; public abstract class UserSinkInvokable extends StreamRecordInvokable implements Serializable { + private static final long serialVersionUID = 1L; } \ No newline at end of file diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamComponentHelper.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamComponentHelper.java index fc91d4e0449..60b2b3faf7a 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamComponentHelper.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamComponentHelper.java @@ -24,6 +24,7 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import eu.stratosphere.api.datastream.SinkFunction; import eu.stratosphere.api.java.functions.FlatMapFunction; import eu.stratosphere.api.java.functions.MapFunction; import eu.stratosphere.api.java.tuple.Tuple; @@ -148,10 +149,10 @@ public final class StreamComponentHelper { } else if (operatorName.equals("sink")) { - UserSinkInvokable f = (UserSinkInvokable) in.readObject(); + SinkFunction f = (SinkFunction) in.readObject(); inTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo( - UserSinkInvokable.class, f.getClass(), 0, null, null); + SinkFunction.class, f.getClass(), 0, null, null); inTupleSerializer = inTupleTypeInfo.createSerializer(); inDeserializationDelegate = new DeserializationDelegate(inTupleSerializer); diff --git a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/FlatMapTest.java b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/FlatMapTest.java index ac90b89e10d..084f557ad29 100644 --- a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/FlatMapTest.java +++ b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/FlatMapTest.java @@ -8,6 +8,7 @@ import java.io.ObjectInputStream; import org.junit.Test; import eu.stratosphere.api.datastream.DataStream; +import eu.stratosphere.api.datastream.SinkFunction; import eu.stratosphere.api.datastream.StreamExecutionEnvironment; import eu.stratosphere.api.java.functions.FlatMapFunction; import eu.stratosphere.api.java.tuple.Tuple; @@ -34,6 +35,16 @@ public class FlatMapTest { } } + public static final class MySink extends SinkFunction> { + + @Override + public void invoke(Tuple1 tuple) { + // TODO Auto-generated method stub + System.out.println(tuple); + } + + } + @Test public void test() throws Exception { Tuple1 tup = new Tuple1("asd"); @@ -42,9 +53,9 @@ public class FlatMapTest { DataStream> dataStream0 = context.setDummySource(); DataStream> dataStream1 = context.setDummySource().connectWith(dataStream0) - .partitionBy(0).flatMap(new MyFlatMap()).broadcast().addDummySink(); + .partitionBy(0).flatMap(new MyFlatMap()).broadcast().addSink(new MySink()); - context.execute(); + context.execute(); JobGraphBuilder jgb = context.jobGB(); @@ -84,11 +95,11 @@ public class FlatMapTest { ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(bytes)); - UserSinkInvokable f = (UserSinkInvokable) in.readObject(); + SinkFunction f = (SinkFunction) in.readObject(); System.out.println(f.getClass().getGenericSuperclass()); TupleTypeInfo ts = (TupleTypeInfo) TypeExtractor.createTypeInfo( - UserSinkInvokable.class, f.getClass(), 0, null, null); + SinkFunction.class, f.getClass(), 0, null, null); System.out.println(ts); diff --git a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/MapTest.java b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/MapTest.java index 08bb33dcb11..ba4617be531 100644 --- a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/MapTest.java +++ b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/MapTest.java @@ -8,6 +8,7 @@ import java.io.ObjectInputStream; import org.junit.Test; import eu.stratosphere.api.datastream.DataStream; +import eu.stratosphere.api.datastream.SinkFunction; import eu.stratosphere.api.datastream.StreamExecutionEnvironment; import eu.stratosphere.api.java.functions.FlatMapFunction; import eu.stratosphere.api.java.functions.MapFunction; @@ -22,6 +23,7 @@ import eu.stratosphere.nephele.jobgraph.JobOutputVertex; import eu.stratosphere.nephele.jobgraph.JobTaskVertex; import eu.stratosphere.streaming.api.invokable.UserSinkInvokable; import eu.stratosphere.streaming.api.invokable.UserSourceInvokable; +import eu.stratosphere.streaming.api.invokable.UserTaskInvokable; import eu.stratosphere.streaming.api.streamcomponent.StreamInvokableComponent; import eu.stratosphere.util.Collector; @@ -71,7 +73,7 @@ public class MapTest { byte[] userFunctionSerialized = config.getBytes("serializedudf", null); in = new ObjectInputStream(new ByteArrayInputStream(userFunctionSerialized)); - StreamInvokableComponent userFunction = (StreamInvokableComponent) in.readObject(); + UserTaskInvokable userFunction = (UserTaskInvokable) in.readObject(); System.out.println(userFunction.getClass()); assertTrue(true); System.out.println("----------------"); @@ -84,11 +86,11 @@ public class MapTest { ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(bytes)); - UserSinkInvokable f = (UserSinkInvokable) in.readObject(); + SinkFunction f = (SinkFunction) in.readObject(); System.out.println(f.getClass().getGenericSuperclass()); TupleTypeInfo ts = (TupleTypeInfo) TypeExtractor.createTypeInfo( - UserSinkInvokable.class, f.getClass(), 0, null, null); + SinkFunction.class, f.getClass(), 0, null, null); System.out.println(ts); -- GitLab