diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/BatchReduceInvokable.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/BatchReduceInvokable.java new file mode 100644 index 0000000000000000000000000000000000000000..c2fb499ba146ed62a1ff4fe384b06cf4a711a55c --- /dev/null +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/BatchReduceInvokable.java @@ -0,0 +1,25 @@ +package eu.stratosphere.api.datastream; + +import java.util.Iterator; + +import eu.stratosphere.api.java.functions.GroupReduceFunction; +import eu.stratosphere.api.java.tuple.Tuple; +import eu.stratosphere.streaming.api.StreamCollector; +import eu.stratosphere.streaming.api.invokable.UserTaskInvokable; +import eu.stratosphere.streaming.api.streamrecord.StreamRecord; + +public class BatchReduceInvokable extends UserTaskInvokable { + private static final long serialVersionUID = 1L; + + private GroupReduceFunction reducer; + public BatchReduceInvokable(GroupReduceFunction reduceFunction) { + this.reducer = reduceFunction; + } + + @Override + public void invoke(StreamRecord record, StreamCollector collector) throws Exception { + Iterator iterator = (Iterator) record.getBatchIterable().iterator(); + reducer.reduce(iterator, collector); + } + +} diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/DataStream.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/DataStream.java index 3f4299340bc6927e85bbd454d2853a689664160c..cfcd200206f224191f57ab77d01e6c7fb6fa5afb 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/DataStream.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/DataStream.java @@ -21,12 +21,11 @@ import java.util.Random; import eu.stratosphere.api.datastream.StreamExecutionEnvironment.ConnectionType; import eu.stratosphere.api.java.functions.FlatMapFunction; +import eu.stratosphere.api.java.functions.GroupReduceFunction; import eu.stratosphere.api.java.functions.MapFunction; import eu.stratosphere.api.java.tuple.Tuple; import eu.stratosphere.types.TypeInformation; -//TODO:get batchsize from user -> put in config -> set in streamcomponenthelper for collector -//TODO:batchReduce -> tuple iterator over tuplebatch, out tuple (reduce function) public class DataStream { private final StreamExecutionEnvironment context; @@ -48,7 +47,7 @@ public class DataStream { if (context == null) { throw new NullPointerException("context is null"); } - + // TODO add name based on component number an preferable sequential id this.id = Long.toHexString(random.nextLong()) + Long.toHexString(random.nextLong()); this.context = context; @@ -95,6 +94,10 @@ public class DataStream { return context.addMapFunction(this, mapper); } + public DataStream flatMap(GroupReduceFunction reducer) { + return context.addBatchReduceFunction(this, reducer); + } + public DataStream addSink(SinkFunction sinkFunction) { return context.addSink(this, sinkFunction); } diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/FileSourceFunction.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/FileSourceFunction.java new file mode 100644 index 0000000000000000000000000000000000000000..2ded87ebf0e74703242a4208cd2a2d941de917dd --- /dev/null +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/FileSourceFunction.java @@ -0,0 +1,34 @@ +package eu.stratosphere.api.datastream; + +import java.io.BufferedReader; +import java.io.FileReader; +import java.io.IOException; + +import eu.stratosphere.api.java.tuple.Tuple1; +import eu.stratosphere.util.Collector; + +public class FileSourceFunction extends SourceFunction> { + private static final long serialVersionUID = 1L; + + private final String path; + private Tuple1 outTuple = new Tuple1(); + + public FileSourceFunction(String path) { + this.path = path; + } + + @Override + public void invoke(Collector> collector) throws IOException { + BufferedReader br = new BufferedReader(new FileReader(path)); + String line = br.readLine(); + while (line != null) { + if (line != "") { + outTuple.f0 = line; + collector.collect(outTuple); + } + line = br.readLine(); + } + br.close(); + } + +} diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/StreamExecutionEnvironment.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/StreamExecutionEnvironment.java index 2d619dff69eb58f783f8fc9dd34a6b348f5d5375..0dd34c80f5635d57f09e7712c42040a2806017f6 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/StreamExecutionEnvironment.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/StreamExecutionEnvironment.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.io.ObjectOutputStream; import eu.stratosphere.api.java.functions.FlatMapFunction; +import eu.stratosphere.api.java.functions.GroupReduceFunction; import eu.stratosphere.api.java.functions.MapFunction; import eu.stratosphere.api.java.tuple.Tuple; import eu.stratosphere.api.java.tuple.Tuple1; @@ -34,10 +35,15 @@ import eu.stratosphere.util.Collector; public class StreamExecutionEnvironment { JobGraphBuilder jobGraphBuilder; - private static final int BATCH_SIZE = 1; + public StreamExecutionEnvironment(int batchSize) { + if (batchSize < 1) { + throw new IllegalArgumentException("Batch size must be positive."); + } + jobGraphBuilder = new JobGraphBuilder("jobGraph", FaultToleranceType.NONE, batchSize); + } public StreamExecutionEnvironment() { - jobGraphBuilder = new JobGraphBuilder("jobGraph", FaultToleranceType.NONE); + this(1); } private static class DummySource extends UserSourceInvokable> { @@ -49,9 +55,8 @@ public class StreamExecutionEnvironment { collector.collect(new Tuple1("source")); } } - } - + public static enum ConnectionType { SHUFFLE, BROADCAST, FIELD } @@ -122,6 +127,27 @@ public class StreamExecutionEnvironment { return returnStream; } + public DataStream addBatchReduceFunction( + DataStream inputStream, final GroupReduceFunction reducer) { + DataStream returnStream = new DataStream(this); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream oos; + try { + oos = new ObjectOutputStream(baos); + oos.writeObject(reducer); + } catch (IOException e) { + e.printStackTrace(); + } + + jobGraphBuilder.setTask(returnStream.getId(), new BatchReduceInvokable(reducer), + "batchReduce", baos.toByteArray()); + + connectGraph(inputStream, returnStream.getId()); + + return returnStream; + } + public DataStream addSink(DataStream inputStream, SinkFunction sinkFunction) { DataStream returnStream = new DataStream(this); @@ -182,6 +208,10 @@ public class StreamExecutionEnvironment { return returnStream; } + public DataStream> addFileSource(String path) { + return addSource(new FileSourceFunction(path)); + } + public DataStream> addDummySource() { DataStream> returnStream = new DataStream>(this); diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/JobGraphBuilder.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/JobGraphBuilder.java index 1d2a5beca37e7296a49842ed0b4e3fcd6eea3eb2..0b730e6f69c02c5d2aa334d9a861842707267b4b 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/JobGraphBuilder.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/JobGraphBuilder.java @@ -64,7 +64,7 @@ public class JobGraphBuilder { protected String maxParallelismVertexName; protected int maxParallelism; protected FaultToleranceType faultToleranceType; - + private int batchSize; /** * Creates a new JobGraph with the given name * @@ -84,6 +84,7 @@ public class JobGraphBuilder { log.debug("JobGraph created"); } this.faultToleranceType = faultToleranceType; + batchSize = 1; } /** @@ -97,6 +98,11 @@ public class JobGraphBuilder { this(jobGraphName, FaultToleranceType.NONE); } + public JobGraphBuilder(String jobGraphName, FaultToleranceType faultToleranceType, int batchSize) { + this(jobGraphName,faultToleranceType); + this.batchSize = batchSize; + } + /** * Adds source to the JobGraph by user defined object with no parallelism * @@ -111,7 +117,6 @@ public class JobGraphBuilder { Configuration config = setSource(sourceName, InvokableObject, 1, 1); config.setBytes("operator", serializedFunction); config.setString("operatorName", operatorName); - } /** @@ -250,7 +255,7 @@ public class JobGraphBuilder { Configuration config = new TaskConfig(component.getConfiguration()).getConfiguration(); config.setClass("userfunction", InvokableClass); config.setString("componentName", componentName); - + config.setInteger("batchSize", batchSize); // config.setBytes("operator", getSerializedFunction()); config.setInteger("faultToleranceType", faultToleranceType.id); diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/invokable/UserSourceInvokable.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/invokable/UserSourceInvokable.java index 5dd4fe74787c4181ecd5da642eed18b23c66b761..f75d53e21af0f1311042580712dc8e6296181c7b 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/invokable/UserSourceInvokable.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/invokable/UserSourceInvokable.java @@ -15,6 +15,7 @@ package eu.stratosphere.streaming.api.invokable; +import java.io.IOException; import java.io.Serializable; import eu.stratosphere.api.java.tuple.Tuple; @@ -25,6 +26,6 @@ public abstract class UserSourceInvokable extends StreamCompo private static final long serialVersionUID = 1L; - public abstract void invoke(Collector collector); + public abstract void invoke(Collector collector) throws Exception; } diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamComponentHelper.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamComponentHelper.java index 60b2b3faf7a89a6a9710822aa42f04b771976f11..37be5818f24a525c9c47f0ccb75accc498e0e776 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamComponentHelper.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamComponentHelper.java @@ -26,6 +26,7 @@ import org.apache.commons.logging.LogFactory; import eu.stratosphere.api.datastream.SinkFunction; import eu.stratosphere.api.java.functions.FlatMapFunction; +import eu.stratosphere.api.java.functions.GroupReduceFunction; import eu.stratosphere.api.java.functions.MapFunction; import eu.stratosphere.api.java.tuple.Tuple; import eu.stratosphere.api.java.typeutils.TupleTypeInfo; @@ -104,10 +105,12 @@ public final class StreamComponentHelper { public StreamCollector setCollector(Configuration taskConfiguration, int id, List> outputs) { - collector = new StreamCollector(1, id, outSerializationDelegate, outputs); + int batchSize = taskConfiguration.getInteger("batchSize", -1); + collector = new StreamCollector(batchSize, id, outSerializationDelegate, outputs); return collector; } + // TODO add type parameters to avoid redundant code public void setSerializers(Configuration taskConfiguration) { byte[] operatorBytes = taskConfiguration.getBytes("operator", null); String operatorName = taskConfiguration.getString("operatorName", ""); @@ -147,6 +150,22 @@ public final class StreamComponentHelper { outTupleSerializer = outTupleTypeInfo.createSerializer(); outSerializationDelegate = new SerializationDelegate(outTupleSerializer); + } else if (operatorName.equals("batchReduce")) { + + GroupReduceFunction f = (GroupReduceFunction) in.readObject(); + + inTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo(GroupReduceFunction.class, + f.getClass(), 0, null, null); + + inTupleSerializer = inTupleTypeInfo.createSerializer(); + inDeserializationDelegate = new DeserializationDelegate(inTupleSerializer); + + outTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo(GroupReduceFunction.class, + f.getClass(), 1, null, null); + + outTupleSerializer = outTupleTypeInfo.createSerializer(); + outSerializationDelegate = new SerializationDelegate(outTupleSerializer); + } else if (operatorName.equals("sink")) { SinkFunction f = (SinkFunction) in.readObject(); diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountCounter.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountCounter.java index 408223af60fba584b3a87f470a74633eda9ad8ae..9eba70e9b8edc7c26a4bae6479e56faacb303b9a 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountCounter.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountCounter.java @@ -18,22 +18,22 @@ package eu.stratosphere.streaming.examples.wordcount; import java.util.HashMap; import java.util.Map; -import eu.stratosphere.api.java.functions.MapFunction; -import eu.stratosphere.api.java.tuple.Tuple1; import eu.stratosphere.api.java.tuple.Tuple2; +import eu.stratosphere.streaming.api.invokable.UserTaskInvokable; +import eu.stratosphere.streaming.api.streamrecord.StreamRecord; -public class WordCountCounter extends MapFunction, Tuple2> { +public class WordCountCounter extends UserTaskInvokable { private static final long serialVersionUID = 1L; private Map wordCounts = new HashMap(); private String word = ""; private Integer count = 0; - private Tuple2 outTuple = new Tuple2(); - + private StreamRecord outRecord = new StreamRecord(new Tuple2()); + @Override - public Tuple2 map(Tuple1 inTuple) throws Exception { - word = inTuple.f0; + public void invoke(StreamRecord record) throws Exception { + word = record.getString(0); if (wordCounts.containsKey(word)) { count = wordCounts.get(word) + 1; @@ -43,10 +43,10 @@ public class WordCountCounter extends MapFunction, Tuple2, Tuple1> { +public class WordCountSplitter extends UserTaskInvokable { private static final long serialVersionUID = 1L; private String[] words = new String[] {}; - private Tuple1 outTuple = new Tuple1(); - - //TODO move the performance tracked version to a separate package and clean this - // PerformanceCounter pCounter = new - // PerformanceCounter("SplitterEmitCounter", 1000, 1000, - // "/home/strato/stratosphere-distrib/log/counter/Splitter" + channelID); - // PerformanceTimer pTimer = new PerformanceTimer("SplitterEmitTimer", 1000, - // 1000, true, - // "/home/strato/stratosphere-distrib/log/timer/Splitter" + channelID); + private StreamRecord outputRecord = new StreamRecord(new Tuple1()); + PerformanceCounter pCounter = new PerformanceCounter("SplitterEmitCounter", 1000, 1000, + "/home/strato/stratosphere-distrib/log/counter/Splitter" + channelID); + PerformanceTimer pTimer = new PerformanceTimer("SplitterEmitTimer", 1000, 1000, true, + "/home/strato/stratosphere-distrib/log/timer/Splitter" + channelID); @Override - public void flatMap(Tuple1 inTuple, Collector> out) throws Exception { + public void invoke(StreamRecord record) throws Exception { - words = inTuple.f0.split(" "); + words = record.getString(0).split(" "); for (String word : words) { - outTuple.f0 = word; - // pTimer.startTimer(); - out.collect(outTuple); - // pTimer.stopTimer(); - // pCounter.count(); + outputRecord.setString(0, word); + pTimer.startTimer(); + emit(outputRecord); + pTimer.stopTimer(); + pCounter.count(); } } - // @Override - // public String getResult() { - // pCounter.writeCSV(); - // pTimer.writeCSV(); - // return ""; - // } + @Override + public String getResult() { + pCounter.writeCSV(); + pTimer.writeCSV(); + return ""; + } } \ No newline at end of file diff --git a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/FlatMapTest.java b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/FlatMapTest.java index 77549a083ecfdb69cd1770d0d4dc6b28fe2162dc..49772ef5731a631df50467e3b8cb6b56ecb91235 100644 --- a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/FlatMapTest.java +++ b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/FlatMapTest.java @@ -16,6 +16,7 @@ package eu.stratosphere.streaming.api; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.io.ByteArrayInputStream; import java.io.ObjectInputStream; @@ -67,15 +68,19 @@ public class FlatMapTest { for (int i = 0; i < 5; i++) { collector.collect(new Tuple1("hi")); } - } - } @Test public void test() throws Exception { - StreamExecutionEnvironment context = new StreamExecutionEnvironment(); + try { + StreamExecutionEnvironment context2 = new StreamExecutionEnvironment(0); + fail(); + } catch (IllegalArgumentException e) { + } + + StreamExecutionEnvironment context = new StreamExecutionEnvironment(2); DataStream> dataStream0 = context.addSource(new MySource()); DataStream> dataStream1 = context.addDummySource().connectWith(dataStream0)