From cde1d46344741ff8c87d2bad62d2f49e05fe3fe8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?M=C3=A1rton=20Balassi?= Date: Mon, 14 Jul 2014 16:29:17 +0200 Subject: [PATCH] [streaming] package refactor --- .../api/datastream/FileSourceFunction.java | 34 -------- .../api}/BatchReduceInvokable.java | 3 +- .../api}/DataStream.java | 6 +- .../streaming/api/FileSourceFunction.java | 48 +++++++++++ .../api}/FlatMapInvokable.java | 3 +- .../api}/MapInvokable.java | 3 +- .../api}/SinkFunction.java | 2 +- .../api}/SinkInvokable.java | 3 +- .../api}/SourceFunction.java | 2 +- .../api}/StreamExecutionEnvironment.java | 3 +- .../StreamComponentHelper.java | 21 +++-- .../examples/wordcount/WordCountLocal.java | 4 +- .../examples/wordcount/WordCountSink.java | 2 +- .../streaming/api/BatchReduceTest.java | 80 +++++++++++++++++++ .../streaming/api/FlatMapTest.java | 4 - .../stratosphere/streaming/api/MapTest.java | 3 - .../streaming/api/TypeExtractTest.java | 52 ++++++++++++ 17 files changed, 203 insertions(+), 70 deletions(-) delete mode 100644 flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/FileSourceFunction.java rename flink-addons/flink-streaming/src/main/java/eu/stratosphere/{api/datastream => streaming/api}/BatchReduceInvokable.java (89%) rename flink-addons/flink-streaming/src/main/java/eu/stratosphere/{api/datastream => streaming/api}/DataStream.java (94%) create mode 100644 flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/FileSourceFunction.java rename flink-addons/flink-streaming/src/main/java/eu/stratosphere/{api/datastream => streaming/api}/FlatMapInvokable.java (94%) rename flink-addons/flink-streaming/src/main/java/eu/stratosphere/{api/datastream => streaming/api}/MapInvokable.java (94%) rename flink-addons/flink-streaming/src/main/java/eu/stratosphere/{api/datastream => streaming/api}/SinkFunction.java (94%) rename flink-addons/flink-streaming/src/main/java/eu/stratosphere/{api/datastream => streaming/api}/SinkInvokable.java (94%) rename flink-addons/flink-streaming/src/main/java/eu/stratosphere/{api/datastream => streaming/api}/SourceFunction.java (94%) rename flink-addons/flink-streaming/src/main/java/eu/stratosphere/{api/datastream => streaming/api}/StreamExecutionEnvironment.java (98%) create mode 100644 flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/BatchReduceTest.java create mode 100755 flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/TypeExtractTest.java diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/FileSourceFunction.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/FileSourceFunction.java deleted file mode 100644 index 2ded87ebf0e..00000000000 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/FileSourceFunction.java +++ /dev/null @@ -1,34 +0,0 @@ -package eu.stratosphere.api.datastream; - -import java.io.BufferedReader; -import java.io.FileReader; -import java.io.IOException; - -import eu.stratosphere.api.java.tuple.Tuple1; -import eu.stratosphere.util.Collector; - -public class FileSourceFunction extends SourceFunction> { - private static final long serialVersionUID = 1L; - - private final String path; - private Tuple1 outTuple = new Tuple1(); - - public FileSourceFunction(String path) { - this.path = path; - } - - @Override - public void invoke(Collector> collector) throws IOException { - BufferedReader br = new BufferedReader(new FileReader(path)); - String line = br.readLine(); - while (line != null) { - if (line != "") { - outTuple.f0 = line; - collector.collect(outTuple); - } - line = br.readLine(); - } - br.close(); - } - -} diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/BatchReduceInvokable.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/BatchReduceInvokable.java similarity index 89% rename from flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/BatchReduceInvokable.java rename to flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/BatchReduceInvokable.java index c2fb499ba14..673a8e287e0 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/BatchReduceInvokable.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/BatchReduceInvokable.java @@ -1,10 +1,9 @@ -package eu.stratosphere.api.datastream; +package eu.stratosphere.streaming.api; import java.util.Iterator; import eu.stratosphere.api.java.functions.GroupReduceFunction; import eu.stratosphere.api.java.tuple.Tuple; -import eu.stratosphere.streaming.api.StreamCollector; import eu.stratosphere.streaming.api.invokable.UserTaskInvokable; import eu.stratosphere.streaming.api.streamrecord.StreamRecord; diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/DataStream.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/DataStream.java similarity index 94% rename from flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/DataStream.java rename to flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/DataStream.java index cfcd200206f..b463553a2f2 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/DataStream.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/DataStream.java @@ -13,17 +13,17 @@ * **********************************************************************************************************************/ -package eu.stratosphere.api.datastream; +package eu.stratosphere.streaming.api; import java.util.ArrayList; import java.util.List; import java.util.Random; -import eu.stratosphere.api.datastream.StreamExecutionEnvironment.ConnectionType; import eu.stratosphere.api.java.functions.FlatMapFunction; import eu.stratosphere.api.java.functions.GroupReduceFunction; import eu.stratosphere.api.java.functions.MapFunction; import eu.stratosphere.api.java.tuple.Tuple; +import eu.stratosphere.streaming.api.StreamExecutionEnvironment.ConnectionType; import eu.stratosphere.types.TypeInformation; public class DataStream { @@ -94,7 +94,7 @@ public class DataStream { return context.addMapFunction(this, mapper); } - public DataStream flatMap(GroupReduceFunction reducer) { + public DataStream batchReduce(GroupReduceFunction reducer) { return context.addBatchReduceFunction(this, reducer); } diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/FileSourceFunction.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/FileSourceFunction.java new file mode 100644 index 00000000000..9d28de68194 --- /dev/null +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/FileSourceFunction.java @@ -0,0 +1,48 @@ +/*********************************************************************************************************************** + * + * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + **********************************************************************************************************************/ +package eu.stratosphere.streaming.api; + +import java.io.BufferedReader; +import java.io.FileReader; +import java.io.IOException; + +import eu.stratosphere.api.java.tuple.Tuple1; +import eu.stratosphere.util.Collector; + +public class FileSourceFunction extends SourceFunction> { + private static final long serialVersionUID = 1L; + + private final String path; + private Tuple1 outTuple = new Tuple1(); + + public FileSourceFunction(String path) { + this.path = path; + } + + @Override + public void invoke(Collector> collector) throws IOException { + BufferedReader br = new BufferedReader(new FileReader(path)); + String line = br.readLine(); + while (line != null) { + if (line != "") { + outTuple.f0 = line; + collector.collect(outTuple); + } + line = br.readLine(); + } + br.close(); + } + +} diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/FlatMapInvokable.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/FlatMapInvokable.java similarity index 94% rename from flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/FlatMapInvokable.java rename to flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/FlatMapInvokable.java index daa7fd6f70a..619ac0bd07a 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/FlatMapInvokable.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/FlatMapInvokable.java @@ -13,11 +13,10 @@ * **********************************************************************************************************************/ -package eu.stratosphere.api.datastream; +package eu.stratosphere.streaming.api; import eu.stratosphere.api.java.functions.FlatMapFunction; import eu.stratosphere.api.java.tuple.Tuple; -import eu.stratosphere.streaming.api.StreamCollector; import eu.stratosphere.streaming.api.invokable.UserTaskInvokable; import eu.stratosphere.streaming.api.streamrecord.StreamRecord; diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/MapInvokable.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/MapInvokable.java similarity index 94% rename from flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/MapInvokable.java rename to flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/MapInvokable.java index f8f469a0539..544423d5758 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/MapInvokable.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/MapInvokable.java @@ -13,11 +13,10 @@ * **********************************************************************************************************************/ -package eu.stratosphere.api.datastream; +package eu.stratosphere.streaming.api; import eu.stratosphere.api.java.functions.MapFunction; import eu.stratosphere.api.java.tuple.Tuple; -import eu.stratosphere.streaming.api.StreamCollector; import eu.stratosphere.streaming.api.invokable.UserTaskInvokable; import eu.stratosphere.streaming.api.streamrecord.StreamRecord; diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/SinkFunction.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/SinkFunction.java similarity index 94% rename from flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/SinkFunction.java rename to flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/SinkFunction.java index 8f8ff51897b..b8c98f233d0 100755 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/SinkFunction.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/SinkFunction.java @@ -13,7 +13,7 @@ * **********************************************************************************************************************/ -package eu.stratosphere.api.datastream; +package eu.stratosphere.streaming.api; import java.io.Serializable; diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/SinkInvokable.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/SinkInvokable.java similarity index 94% rename from flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/SinkInvokable.java rename to flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/SinkInvokable.java index 98619236ca6..3260711fe3c 100755 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/SinkInvokable.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/SinkInvokable.java @@ -13,10 +13,9 @@ * **********************************************************************************************************************/ -package eu.stratosphere.api.datastream; +package eu.stratosphere.streaming.api; import eu.stratosphere.api.java.tuple.Tuple; -import eu.stratosphere.streaming.api.StreamCollector; import eu.stratosphere.streaming.api.invokable.UserSinkInvokable; import eu.stratosphere.streaming.api.streamrecord.StreamRecord; diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/SourceFunction.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/SourceFunction.java similarity index 94% rename from flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/SourceFunction.java rename to flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/SourceFunction.java index 855a4b877fd..b9b66105dcc 100755 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/SourceFunction.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/SourceFunction.java @@ -13,7 +13,7 @@ * **********************************************************************************************************************/ -package eu.stratosphere.api.datastream; +package eu.stratosphere.streaming.api; import eu.stratosphere.api.java.tuple.Tuple; import eu.stratosphere.streaming.api.invokable.UserSourceInvokable; diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/StreamExecutionEnvironment.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamExecutionEnvironment.java similarity index 98% rename from flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/StreamExecutionEnvironment.java rename to flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamExecutionEnvironment.java index 0dd34c80f56..d99d42604cf 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/StreamExecutionEnvironment.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamExecutionEnvironment.java @@ -13,7 +13,7 @@ * **********************************************************************************************************************/ -package eu.stratosphere.api.datastream; +package eu.stratosphere.streaming.api; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -24,7 +24,6 @@ import eu.stratosphere.api.java.functions.GroupReduceFunction; import eu.stratosphere.api.java.functions.MapFunction; import eu.stratosphere.api.java.tuple.Tuple; import eu.stratosphere.api.java.tuple.Tuple1; -import eu.stratosphere.streaming.api.JobGraphBuilder; import eu.stratosphere.streaming.api.invokable.UserSourceInvokable; import eu.stratosphere.streaming.faulttolerance.FaultToleranceType; import eu.stratosphere.streaming.util.ClusterUtil; diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamComponentHelper.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamComponentHelper.java index 37be5818f24..8beef100987 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamComponentHelper.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamComponentHelper.java @@ -24,7 +24,6 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import eu.stratosphere.api.datastream.SinkFunction; import eu.stratosphere.api.java.functions.FlatMapFunction; import eu.stratosphere.api.java.functions.GroupReduceFunction; import eu.stratosphere.api.java.functions.MapFunction; @@ -42,6 +41,7 @@ import eu.stratosphere.nephele.io.RecordWriter; import eu.stratosphere.nephele.template.AbstractInvokable; import eu.stratosphere.pact.runtime.plugable.DeserializationDelegate; import eu.stratosphere.pact.runtime.plugable.SerializationDelegate; +import eu.stratosphere.streaming.api.SinkFunction; import eu.stratosphere.streaming.api.StreamCollector; import eu.stratosphere.streaming.api.invokable.DefaultSinkInvokable; import eu.stratosphere.streaming.api.invokable.DefaultSourceInvokable; @@ -117,9 +117,7 @@ public final class StreamComponentHelper { try { ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(operatorBytes)); - if (operatorName.equals("flatMap")) { - FlatMapFunction f = (FlatMapFunction) in.readObject(); inTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo( @@ -152,16 +150,17 @@ public final class StreamComponentHelper { } else if (operatorName.equals("batchReduce")) { - GroupReduceFunction f = (GroupReduceFunction) in.readObject(); + GroupReduceFunction f = (GroupReduceFunction) in + .readObject(); - inTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo(GroupReduceFunction.class, - f.getClass(), 0, null, null); + inTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo( + GroupReduceFunction.class, f.getClass(), 0, null, null); inTupleSerializer = inTupleTypeInfo.createSerializer(); inDeserializationDelegate = new DeserializationDelegate(inTupleSerializer); - outTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo(GroupReduceFunction.class, - f.getClass(), 1, null, null); + outTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo( + GroupReduceFunction.class, f.getClass(), 1, null, null); outTupleSerializer = outTupleTypeInfo.createSerializer(); outSerializationDelegate = new SerializationDelegate(outTupleSerializer); @@ -170,8 +169,8 @@ public final class StreamComponentHelper { SinkFunction f = (SinkFunction) in.readObject(); - inTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo( - SinkFunction.class, f.getClass(), 0, null, null); + inTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo(SinkFunction.class, + f.getClass(), 0, null, null); inTupleSerializer = inTupleTypeInfo.createSerializer(); inDeserializationDelegate = new DeserializationDelegate(inTupleSerializer); @@ -195,7 +194,7 @@ public final class StreamComponentHelper { } } - + public AbstractRecordReader getConfigInputs(T taskBase, Configuration taskConfiguration) throws StreamComponentException { int numberOfInputs = taskConfiguration.getInteger("numberOfInputs", 0); diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountLocal.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountLocal.java index 86ae4e5555a..889c72c79e4 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountLocal.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountLocal.java @@ -15,9 +15,9 @@ package eu.stratosphere.streaming.examples.wordcount; -import eu.stratosphere.api.datastream.DataStream; -import eu.stratosphere.api.datastream.StreamExecutionEnvironment; import eu.stratosphere.api.java.tuple.Tuple2; +import eu.stratosphere.streaming.api.DataStream; +import eu.stratosphere.streaming.api.StreamExecutionEnvironment; public class WordCountLocal { diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountSink.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountSink.java index a0bca1decb8..3a8e4659fbb 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountSink.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountSink.java @@ -15,8 +15,8 @@ package eu.stratosphere.streaming.examples.wordcount; -import eu.stratosphere.api.datastream.SinkFunction; import eu.stratosphere.api.java.tuple.Tuple2; +import eu.stratosphere.streaming.api.SinkFunction; public class WordCountSink extends SinkFunction> { private static final long serialVersionUID = 1L; diff --git a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/BatchReduceTest.java b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/BatchReduceTest.java new file mode 100644 index 00000000000..958e0ee263c --- /dev/null +++ b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/BatchReduceTest.java @@ -0,0 +1,80 @@ +/*********************************************************************************************************************** + * + * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + **********************************************************************************************************************/ + +package eu.stratosphere.streaming.api; + +import static org.junit.Assert.fail; + +import java.util.Iterator; + +import org.junit.Test; + +import eu.stratosphere.api.java.functions.FlatMapFunction; +import eu.stratosphere.api.java.functions.GroupReduceFunction; +import eu.stratosphere.api.java.tuple.Tuple1; +import eu.stratosphere.util.Collector; + +public class BatchReduceTest { + + public static final class MyBatchReduce extends + GroupReduceFunction, Tuple1> { + + @Override + public void reduce(Iterator> values, Collector> out) + throws Exception { + + Double sum = 0.; + Double count = 0.; + while (values.hasNext()) { + sum += values.next().f0; + count++; + } + + out.collect(new Tuple1(sum / count)); + + System.out.println("batchReduce " + sum); + } + } + + public static final class MySink extends SinkFunction> { + private static final long serialVersionUID = 1L; + + @Override + public void invoke(Tuple1 tuple) { + System.out.println("AVG: " + tuple); + } + + } + + public static final class MySource extends SourceFunction> { + private static final long serialVersionUID = 1L; + + @Override + public void invoke(Collector> collector) { + for (Double i = 0.; i < 20; i++) { + collector.collect(new Tuple1(i)); + } + } + } + + @Test + public void test() throws Exception { + + StreamExecutionEnvironment context = new StreamExecutionEnvironment(4); + DataStream> dataStream0 = context.addSource(new MySource()).batchReduce(new MyBatchReduce()).addSink(new MySink()); + + context.execute(); + } +} diff --git a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/FlatMapTest.java b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/FlatMapTest.java index 49772ef5731..a5cc3cb5215 100644 --- a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/FlatMapTest.java +++ b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/FlatMapTest.java @@ -23,10 +23,6 @@ import java.io.ObjectInputStream; import org.junit.Test; -import eu.stratosphere.api.datastream.DataStream; -import eu.stratosphere.api.datastream.SinkFunction; -import eu.stratosphere.api.datastream.SourceFunction; -import eu.stratosphere.api.datastream.StreamExecutionEnvironment; import eu.stratosphere.api.java.functions.FlatMapFunction; import eu.stratosphere.api.java.tuple.Tuple; import eu.stratosphere.api.java.tuple.Tuple1; diff --git a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/MapTest.java b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/MapTest.java index d849667aedd..fa433598c5d 100644 --- a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/MapTest.java +++ b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/MapTest.java @@ -22,9 +22,6 @@ import java.io.ObjectInputStream; import org.junit.Test; -import eu.stratosphere.api.datastream.DataStream; -import eu.stratosphere.api.datastream.SinkFunction; -import eu.stratosphere.api.datastream.StreamExecutionEnvironment; import eu.stratosphere.api.java.functions.FlatMapFunction; import eu.stratosphere.api.java.functions.MapFunction; import eu.stratosphere.api.java.tuple.Tuple; diff --git a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/TypeExtractTest.java b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/TypeExtractTest.java new file mode 100755 index 00000000000..97fae1b5127 --- /dev/null +++ b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/TypeExtractTest.java @@ -0,0 +1,52 @@ +package eu.stratosphere.streaming.api; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; + +import org.junit.Test; + +import eu.stratosphere.api.java.tuple.Tuple; +import eu.stratosphere.api.java.typeutils.TupleTypeInfo; +import eu.stratosphere.api.java.typeutils.TypeExtractor; +import eu.stratosphere.types.TypeInformation; + +public class TypeExtractTest { + + public static class MySuperlass implements Serializable{ + + } + + public static class Myclass extends MySuperlass { + + private static final long serialVersionUID = 1L; + + } + + @Test + public void test() throws IOException, ClassNotFoundException { + + Myclass f = new Myclass(); + + System.out.println(f.getClass().getGenericSuperclass()); + TypeInformation ts = TypeExtractor.createTypeInfo(MySuperlass.class, f.getClass(), 0, + null, null); + + System.out.println(ts); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream oos; + + oos = new ObjectOutputStream(baos); + oos.writeObject(f); + + ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray())); + + + System.out.println(new TupleTypeInfo(TypeExtractor.getForObject(in.readObject()))); + } + +} -- GitLab