diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/DataStream.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/DataStream.java index 0baad8514b886e5b962654b05dbb9f8c0d957663..cc1b8d83d5b46a0fd8c98caff5804da9cc03a267 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/DataStream.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/DataStream.java @@ -19,6 +19,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Random; +import scala.Array; import eu.stratosphere.api.datastream.StreamExecutionEnvironment.ConnectionType; import eu.stratosphere.api.java.functions.FlatMapFunction; import eu.stratosphere.api.java.functions.MapFunction; @@ -32,15 +33,14 @@ public class DataStream { private final Random random = new Random(); private final String id; List connectIDs; - ConnectionType ctype = ConnectionType.SHUFFLE; - int cparam = 0; + List ctypes; + List cparams; protected DataStream() { // TODO implement context = new StreamExecutionEnvironment(); id = "source"; - connectIDs = new ArrayList(); - connectIDs.add(getId()); + initConnections(); } protected DataStream(StreamExecutionEnvironment context) { @@ -51,8 +51,17 @@ public class DataStream { // TODO add name based on component number an preferable sequential id this.id = Long.toHexString(random.nextLong()) + Long.toHexString(random.nextLong()); this.context = context; + initConnections(); + + } + + private void initConnections() { connectIDs = new ArrayList(); connectIDs.add(getId()); + ctypes = new ArrayList(); + ctypes.add(ConnectionType.SHUFFLE); + cparams = new ArrayList(); + cparams.add(0); } public String getId() { @@ -60,18 +69,20 @@ public class DataStream { } public DataStream connectWith(DataStream stream) { - connectIDs.add(stream.getId()); + connectIDs.addAll(stream.connectIDs); + ctypes.addAll(stream.ctypes); + cparams.addAll(stream.cparams); return this; } public DataStream partitionBy(int keyposition) { - ctype = ConnectionType.FIELD; - cparam = keyposition; + ctypes.set(0, ConnectionType.FIELD); + cparams.set(0, keyposition); return this; } public DataStream broadcast() { - ctype = ConnectionType.BROADCAST; + ctypes.set(0, ConnectionType.BROADCAST); return this; } @@ -82,12 +93,12 @@ public class DataStream { public DataStream map(MapFunction mapper) { return context.addMapFunction(this, mapper); } - + public DataStream addSink(SinkFunction sinkFunction) { return context.addSink(this, sinkFunction); } - public DataStream addDummySink() { + public DataStream addDummySink() { return context.addDummySink(this); } diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/StreamExecutionEnvironment.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/StreamExecutionEnvironment.java index 88c8353f3b306fec3d339a844c1a1d3069288b9a..4e2f4dd507a70791f832fc169979feee731918ca 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/StreamExecutionEnvironment.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/StreamExecutionEnvironment.java @@ -59,9 +59,13 @@ public class StreamExecutionEnvironment { SHUFFLE, BROADCAST, FIELD } - private void connectGraph(List inputIDs, String outputID, ConnectionType type, int param) { + private void connectGraph(DataStream inputStream, String outputID) { + + for (int i = 0; i < inputStream.connectIDs.size(); i++) { + ConnectionType type = inputStream.ctypes.get(i); + String input = inputStream.connectIDs.get(i); + int param = inputStream.cparams.get(i); - for (String input : inputIDs) { switch (type) { case SHUFFLE: jobGraphBuilder.shuffleConnect(input, outputID); @@ -94,8 +98,7 @@ public class StreamExecutionEnvironment { jobGraphBuilder.setTask(returnStream.getId(), new FlatMapInvokable(flatMapper), "flatMap", baos.toByteArray()); - connectGraph(inputStream.connectIDs, returnStream.getId(), inputStream.ctype, - inputStream.cparam); + connectGraph(inputStream, returnStream.getId()); return returnStream; } @@ -117,8 +120,7 @@ public class StreamExecutionEnvironment { jobGraphBuilder.setTask(returnStream.getId(), new MapInvokable(mapper), "map", baos.toByteArray()); - connectGraph(inputStream.connectIDs, returnStream.getId(), inputStream.ctype, - inputStream.cparam); + connectGraph(inputStream, returnStream.getId()); return returnStream; } @@ -140,7 +142,7 @@ public class StreamExecutionEnvironment { jobGraphBuilder.setSink("sink", new SinkInvokable(sinkFunction), "sink", baos.toByteArray()); - connectGraph(inputStream.connectIDs, "sink", inputStream.ctype, inputStream.cparam); + connectGraph(inputStream, "sink"); return returnStream; } diff --git a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/FlatMapTest.java b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/FlatMapTest.java index 6b43ec29b0fbdec03d3b10990a4cab8ed26064be..77549a083ecfdb69cd1770d0d4dc6b28fe2162dc 100644 --- a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/FlatMapTest.java +++ b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/FlatMapTest.java @@ -55,7 +55,6 @@ public class FlatMapTest { @Override public void invoke(Tuple1 tuple) { - // TODO Auto-generated method stub System.out.println(tuple); }