From 48512386d17352d551234f5004066262e3f665c3 Mon Sep 17 00:00:00 2001 From: ghermann Date: Mon, 14 Jul 2014 16:29:16 +0200 Subject: [PATCH] [streaming] Tested BatchReduce --- .../api/datastream/DataStream.java | 2 +- .../api/datastream/FileSourceFunction.java | 14 ++++ .../StreamComponentHelper.java | 19 ++--- .../examples/wordcount/WordCountCounter.java | 20 ++--- .../examples/wordcount/WordCountSplitter.java | 48 ++++++----- .../streaming/api/BatchReduceTest.java | 84 +++++++++++++++++++ 6 files changed, 143 insertions(+), 44 deletions(-) create mode 100644 flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/BatchReduceTest.java diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/DataStream.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/DataStream.java index cfcd200206f..0941401e1a7 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/DataStream.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/DataStream.java @@ -94,7 +94,7 @@ public class DataStream { return context.addMapFunction(this, mapper); } - public DataStream flatMap(GroupReduceFunction reducer) { + public DataStream batchReduce(GroupReduceFunction reducer) { return context.addBatchReduceFunction(this, reducer); } diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/FileSourceFunction.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/FileSourceFunction.java index 2ded87ebf0e..0ecf10b1131 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/FileSourceFunction.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/FileSourceFunction.java @@ -1,3 +1,17 @@ +/*********************************************************************************************************************** + * + * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + **********************************************************************************************************************/ package eu.stratosphere.api.datastream; import java.io.BufferedReader; diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamComponentHelper.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamComponentHelper.java index 37be5818f24..1a26e1f78a4 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamComponentHelper.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamComponentHelper.java @@ -117,9 +117,7 @@ public final class StreamComponentHelper { try { ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(operatorBytes)); - if (operatorName.equals("flatMap")) { - FlatMapFunction f = (FlatMapFunction) in.readObject(); inTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo( @@ -152,16 +150,17 @@ public final class StreamComponentHelper { } else if (operatorName.equals("batchReduce")) { - GroupReduceFunction f = (GroupReduceFunction) in.readObject(); + GroupReduceFunction f = (GroupReduceFunction) in + .readObject(); - inTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo(GroupReduceFunction.class, - f.getClass(), 0, null, null); + inTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo( + GroupReduceFunction.class, f.getClass(), 0, null, null); inTupleSerializer = inTupleTypeInfo.createSerializer(); inDeserializationDelegate = new DeserializationDelegate(inTupleSerializer); - outTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo(GroupReduceFunction.class, - f.getClass(), 1, null, null); + outTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo( + GroupReduceFunction.class, f.getClass(), 1, null, null); outTupleSerializer = outTupleTypeInfo.createSerializer(); outSerializationDelegate = new SerializationDelegate(outTupleSerializer); @@ -170,8 +169,8 @@ public final class StreamComponentHelper { SinkFunction f = (SinkFunction) in.readObject(); - inTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo( - SinkFunction.class, f.getClass(), 0, null, null); + inTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo(SinkFunction.class, + f.getClass(), 0, null, null); inTupleSerializer = inTupleTypeInfo.createSerializer(); inDeserializationDelegate = new DeserializationDelegate(inTupleSerializer); @@ -195,7 +194,7 @@ public final class StreamComponentHelper { } } - + public AbstractRecordReader getConfigInputs(T taskBase, Configuration taskConfiguration) throws StreamComponentException { int numberOfInputs = taskConfiguration.getInteger("numberOfInputs", 0); diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountCounter.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountCounter.java index 9eba70e9b8e..408223af60f 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountCounter.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountCounter.java @@ -18,22 +18,22 @@ package eu.stratosphere.streaming.examples.wordcount; import java.util.HashMap; import java.util.Map; +import eu.stratosphere.api.java.functions.MapFunction; +import eu.stratosphere.api.java.tuple.Tuple1; import eu.stratosphere.api.java.tuple.Tuple2; -import eu.stratosphere.streaming.api.invokable.UserTaskInvokable; -import eu.stratosphere.streaming.api.streamrecord.StreamRecord; -public class WordCountCounter extends UserTaskInvokable { +public class WordCountCounter extends MapFunction, Tuple2> { private static final long serialVersionUID = 1L; private Map wordCounts = new HashMap(); private String word = ""; private Integer count = 0; - private StreamRecord outRecord = new StreamRecord(new Tuple2()); - + private Tuple2 outTuple = new Tuple2(); + @Override - public void invoke(StreamRecord record) throws Exception { - word = record.getString(0); + public Tuple2 map(Tuple1 inTuple) throws Exception { + word = inTuple.f0; if (wordCounts.containsKey(word)) { count = wordCounts.get(word) + 1; @@ -43,10 +43,10 @@ public class WordCountCounter extends UserTaskInvokable { wordCounts.put(word, 1); } - outRecord.setString(0, word); - outRecord.setInteger(1, count); + outTuple.f0 = word; + outTuple.f1 = count; - emit(outRecord); + return outTuple; // performanceCounter.count(); } diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountSplitter.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountSplitter.java index a11487d2abe..e03360591b1 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountSplitter.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountSplitter.java @@ -15,39 +15,41 @@ package eu.stratosphere.streaming.examples.wordcount; +import eu.stratosphere.api.java.functions.FlatMapFunction; import eu.stratosphere.api.java.tuple.Tuple1; -import eu.stratosphere.streaming.api.invokable.UserTaskInvokable; -import eu.stratosphere.streaming.api.streamrecord.StreamRecord; -import eu.stratosphere.streaming.util.PerformanceCounter; -import eu.stratosphere.streaming.util.PerformanceTimer; +import eu.stratosphere.util.Collector; -public class WordCountSplitter extends UserTaskInvokable { +public class WordCountSplitter extends FlatMapFunction, Tuple1> { private static final long serialVersionUID = 1L; private String[] words = new String[] {}; - private StreamRecord outputRecord = new StreamRecord(new Tuple1()); - PerformanceCounter pCounter = new PerformanceCounter("SplitterEmitCounter", 1000, 1000, - "/home/strato/stratosphere-distrib/log/counter/Splitter" + channelID); - PerformanceTimer pTimer = new PerformanceTimer("SplitterEmitTimer", 1000, 1000, true, - "/home/strato/stratosphere-distrib/log/timer/Splitter" + channelID); + private Tuple1 outTuple = new Tuple1(); + + //TODO move the performance tracked version to a separate package and clean this + // PerformanceCounter pCounter = new + // PerformanceCounter("SplitterEmitCounter", 1000, 1000, + // "/home/strato/stratosphere-distrib/log/counter/Splitter" + channelID); + // PerformanceTimer pTimer = new PerformanceTimer("SplitterEmitTimer", 1000, + // 1000, true, + // "/home/strato/stratosphere-distrib/log/timer/Splitter" + channelID); @Override - public void invoke(StreamRecord record) throws Exception { + public void flatMap(Tuple1 inTuple, Collector> out) throws Exception { - words = record.getString(0).split(" "); + words = inTuple.f0.split(" "); for (String word : words) { - outputRecord.setString(0, word); - pTimer.startTimer(); - emit(outputRecord); - pTimer.stopTimer(); - pCounter.count(); + outTuple.f0 = word; + // pTimer.startTimer(); + out.collect(outTuple); + // pTimer.stopTimer(); + // pCounter.count(); } } - @Override - public String getResult() { - pCounter.writeCSV(); - pTimer.writeCSV(); - return ""; - } + // @Override + // public String getResult() { + // pCounter.writeCSV(); + // pTimer.writeCSV(); + // return ""; + // } } \ No newline at end of file diff --git a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/BatchReduceTest.java b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/BatchReduceTest.java new file mode 100644 index 00000000000..9adf2675db9 --- /dev/null +++ b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/BatchReduceTest.java @@ -0,0 +1,84 @@ +/*********************************************************************************************************************** + * + * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + **********************************************************************************************************************/ + +package eu.stratosphere.streaming.api; + +import static org.junit.Assert.fail; + +import java.util.Iterator; + +import org.junit.Test; + +import eu.stratosphere.api.datastream.DataStream; +import eu.stratosphere.api.datastream.SinkFunction; +import eu.stratosphere.api.datastream.SourceFunction; +import eu.stratosphere.api.datastream.StreamExecutionEnvironment; +import eu.stratosphere.api.java.functions.FlatMapFunction; +import eu.stratosphere.api.java.functions.GroupReduceFunction; +import eu.stratosphere.api.java.tuple.Tuple1; +import eu.stratosphere.util.Collector; + +public class BatchReduceTest { + + public static final class MyBatchReduce extends + GroupReduceFunction, Tuple1> { + + @Override + public void reduce(Iterator> values, Collector> out) + throws Exception { + + Double sum = 0.; + Double count = 0.; + while (values.hasNext()) { + sum += values.next().f0; + count++; + } + + out.collect(new Tuple1(sum / count)); + + System.out.println("batchReduce " + sum); + } + } + + public static final class MySink extends SinkFunction> { + private static final long serialVersionUID = 1L; + + @Override + public void invoke(Tuple1 tuple) { + System.out.println("AVG: " + tuple); + } + + } + + public static final class MySource extends SourceFunction> { + private static final long serialVersionUID = 1L; + + @Override + public void invoke(Collector> collector) { + for (Double i = 0.; i < 20; i++) { + collector.collect(new Tuple1(i)); + } + } + } + + @Test + public void test() throws Exception { + + StreamExecutionEnvironment context = new StreamExecutionEnvironment(4); + DataStream> dataStream0 = context.addSource(new MySource()).batchReduce(new MyBatchReduce()).addSink(new MySink()); + + context.execute(); + } +} -- GitLab