From ce56b1466dd851e253f2fa1042ff1157d74949b9 Mon Sep 17 00:00:00 2001 From: gyfora Date: Mon, 14 Jul 2014 16:29:15 +0200 Subject: [PATCH] [streaming] connectWith and partitioning added to newapi --- .../api/datastream/DataStream.java | 30 +++++++++++++- .../StreamExecutionEnvironment.java | 35 +++++++++++++++-- .../streaming/api/JobGraphBuilder.java | 39 ++++++++++++------- .../streaming/api/FlatMapTest.java | 5 ++- 4 files changed, 88 insertions(+), 21 deletions(-) diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/DataStream.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/DataStream.java index 375edcf37c8..b2965792de0 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/DataStream.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/DataStream.java @@ -15,8 +15,11 @@ package eu.stratosphere.api.datastream; +import java.util.ArrayList; +import java.util.List; import java.util.Random; +import eu.stratosphere.api.datastream.StreamExecutionEnvironment.ConnectionType; import eu.stratosphere.api.java.functions.FlatMapFunction; import eu.stratosphere.api.java.functions.MapFunction; import eu.stratosphere.api.java.tuple.Tuple; @@ -28,11 +31,16 @@ public class DataStream { private TypeInformation type; private final Random random = new Random(); private final String id; + List connectIDs; + ConnectionType ctype = ConnectionType.SHUFFLE; + int cparam = 0; protected DataStream() { // TODO implement context = new StreamExecutionEnvironment(); id = "source"; + connectIDs = new ArrayList(); + connectIDs.add(getId()); } protected DataStream(StreamExecutionEnvironment context) { @@ -40,15 +48,33 @@ public class DataStream { throw new NullPointerException("context is null"); } - //TODO add name based on component number an preferable sequential id + // TODO add name based on component number an preferable sequential id this.id = Long.toHexString(random.nextLong()) + Long.toHexString(random.nextLong()); this.context = context; + connectIDs = new ArrayList(); + connectIDs.add(getId()); } public String getId() { return id; } + public DataStream connectWith(DataStream stream) { + connectIDs.add(stream.getId()); + return this; + } + + public DataStream partitionBy(int keyposition) { + ctype = ConnectionType.FIELD; + cparam = keyposition; + return this; + } + + public DataStream broadcast() { + ctype = ConnectionType.BROADCAST; + return this; + } + public DataStream flatMap(FlatMapFunction flatMapper) { return context.addFlatMapFunction(this, flatMapper); } @@ -56,7 +82,7 @@ public class DataStream { public DataStream map(MapFunction mapper) { return context.addMapFunction(this, mapper); } - + public DataStream addDummySink() { return context.addDummySink(this); } diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/StreamExecutionEnvironment.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/StreamExecutionEnvironment.java index 6a7f016f93b..88dbfc859a9 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/StreamExecutionEnvironment.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/StreamExecutionEnvironment.java @@ -18,11 +18,13 @@ package eu.stratosphere.api.datastream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.ObjectOutputStream; +import java.util.List; import eu.stratosphere.api.java.functions.FlatMapFunction; import eu.stratosphere.api.java.functions.MapFunction; import eu.stratosphere.api.java.tuple.Tuple; import eu.stratosphere.api.java.tuple.Tuple1; +import eu.stratosphere.nephele.io.InputChannelResult; import eu.stratosphere.streaming.api.JobGraphBuilder; import eu.stratosphere.streaming.api.StreamCollector; import eu.stratosphere.streaming.api.invokable.UserSinkInvokable; @@ -54,6 +56,28 @@ public class StreamExecutionEnvironment { } + public static enum ConnectionType { + SHUFFLE, BROADCAST, FIELD + } + + private void connectGraph(List inputIDs, String outputID, ConnectionType type, int param) { + + for (String input : inputIDs) { + switch (type) { + case SHUFFLE: + jobGraphBuilder.shuffleConnect(input, outputID); + break; + case BROADCAST: + jobGraphBuilder.broadcastConnect(input, outputID); + break; + case FIELD: + jobGraphBuilder.fieldsConnect(input, outputID, param); + break; + } + + } + } + public DataStream addFlatMapFunction( DataStream inputStream, final FlatMapFunction flatMapper) { DataStream returnStream = new DataStream(this); @@ -71,7 +95,8 @@ public class StreamExecutionEnvironment { jobGraphBuilder.setTask(returnStream.getId(), new FlatMapInvokable(flatMapper), "flatMap", baos.toByteArray()); - jobGraphBuilder.shuffleConnect(inputStream.getId(), returnStream.getId()); + connectGraph(inputStream.connectIDs, returnStream.getId(), inputStream.ctype, + inputStream.cparam); return returnStream; } @@ -93,8 +118,9 @@ public class StreamExecutionEnvironment { jobGraphBuilder.setTask(returnStream.getId(), new MapInvokable(mapper), "map", baos.toByteArray()); - jobGraphBuilder.shuffleConnect(inputStream.getId(), returnStream.getId()); - + connectGraph(inputStream.connectIDs, returnStream.getId(), inputStream.ctype, + inputStream.cparam); + return returnStream; } @@ -124,7 +150,8 @@ public class StreamExecutionEnvironment { jobGraphBuilder.setSink("sink", new DummySink(), "sink", baos.toByteArray()); - jobGraphBuilder.shuffleConnect(inputStream.getId(), "sink"); + connectGraph(inputStream.connectIDs, "sink", inputStream.ctype, + inputStream.cparam); return new DataStream(this); } diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/JobGraphBuilder.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/JobGraphBuilder.java index 358de144f07..1d2a5beca37 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/JobGraphBuilder.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/JobGraphBuilder.java @@ -128,8 +128,9 @@ public class JobGraphBuilder { * @param subtasksPerInstance * Number of subtasks allocated to a machine */ - public Configuration setSource(String sourceName, UserSourceInvokable InvokableObject, - int parallelism, int subtasksPerInstance) { + public Configuration setSource(String sourceName, + UserSourceInvokable InvokableObject, int parallelism, + int subtasksPerInstance) { final JobInputVertex source = new JobInputVertex(sourceName, jobGraph); source.setInputClass(StreamSource.class); Configuration config = setComponent(sourceName, InvokableObject, parallelism, @@ -174,7 +175,8 @@ public class JobGraphBuilder { * Number of subtasks allocated to a machine * @return */ - public Configuration setTask(String taskName, UserTaskInvokable TaskInvokableObject, + public Configuration setTask(String taskName, + UserTaskInvokable TaskInvokableObject, int parallelism, int subtasksPerInstance) { final JobTaskVertex task = new JobTaskVertex(taskName, jobGraph); task.setTaskClass(StreamTask.class); @@ -186,8 +188,8 @@ public class JobGraphBuilder { return config; } - public void setSink(String sinkName, UserSinkInvokable InvokableObject, String operatorName, - byte[] serializedFunction) { + public void setSink(String sinkName, UserSinkInvokable InvokableObject, + String operatorName, byte[] serializedFunction) { Configuration config = setSink(sinkName, InvokableObject, 1, 1); config.setBytes("operator", serializedFunction); config.setString("operatorName", operatorName); @@ -206,8 +208,9 @@ public class JobGraphBuilder { * @param subtasksPerInstance * Number of subtasks allocated to a machine */ - public Configuration setSink(String sinkName, UserSinkInvokable InvokableObject, - int parallelism, int subtasksPerInstance) { + public Configuration setSink(String sinkName, + UserSinkInvokable InvokableObject, int parallelism, + int subtasksPerInstance) { final JobOutputVertex sink = new JobOutputVertex(sinkName, jobGraph); sink.setOutputClass(StreamSink.class); Configuration config = setComponent(sinkName, InvokableObject, parallelism, @@ -257,8 +260,9 @@ public class JobGraphBuilder { return config; } - private Configuration setComponent(String componentName, UserSourceInvokable InvokableObject, - int parallelism, int subtasksPerInstance, AbstractJobVertex component) { + private Configuration setComponent(String componentName, + UserSourceInvokable InvokableObject, int parallelism, + int subtasksPerInstance, AbstractJobVertex component) { Configuration config = setComponent(componentName, InvokableObject.getClass(), parallelism, subtasksPerInstance, component); @@ -266,8 +270,9 @@ public class JobGraphBuilder { return config; } - private Configuration setComponent(String componentName, UserTaskInvokable InvokableObject, - int parallelism, int subtasksPerInstance, AbstractJobVertex component) { + private Configuration setComponent(String componentName, + UserTaskInvokable InvokableObject, int parallelism, + int subtasksPerInstance, AbstractJobVertex component) { Configuration config = setComponent(componentName, InvokableObject.getClass(), parallelism, subtasksPerInstance, component); @@ -275,8 +280,9 @@ public class JobGraphBuilder { return config; } - private Configuration setComponent(String componentName, UserSinkInvokable InvokableObject, - int parallelism, int subtasksPerInstance, AbstractJobVertex component) { + private Configuration setComponent(String componentName, + UserSinkInvokable InvokableObject, int parallelism, + int subtasksPerInstance, AbstractJobVertex component) { Configuration config = setComponent(componentName, InvokableObject.getClass(), parallelism, subtasksPerInstance, component); @@ -396,6 +402,7 @@ public class JobGraphBuilder { public void broadcastConnect(String upStreamComponentName, String downStreamComponentName) { connect(upStreamComponentName, downStreamComponentName, BroadcastPartitioner.class); addOutputChannels(upStreamComponentName, numberOfInstances.get(downStreamComponentName)); + log.info("Broadcastconnected: " + upStreamComponentName + " to " + downStreamComponentName); } /** @@ -444,6 +451,9 @@ public class JobGraphBuilder { + downStreamComponentName, e); } } + log.info("Fieldsconnected " + upStreamComponentName + " to " + downStreamComponentName + + " on " + keyPosition); + } /** @@ -461,6 +471,8 @@ public class JobGraphBuilder { public void globalConnect(String upStreamComponentName, String downStreamComponentName) { connect(upStreamComponentName, downStreamComponentName, GlobalPartitioner.class); addOutputChannels(upStreamComponentName, 1); + log.info("Globalconnected: " + upStreamComponentName + " to " + downStreamComponentName); + } /** @@ -478,6 +490,7 @@ public class JobGraphBuilder { public void shuffleConnect(String upStreamComponentName, String downStreamComponentName) { connect(upStreamComponentName, downStreamComponentName, ShufflePartitioner.class); addOutputChannels(upStreamComponentName, 1); + log.info("Shuffleconnected: " + upStreamComponentName + " to " + downStreamComponentName); } private void addOutputChannels(String upStreamComponentName, int numOfInstances) { diff --git a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/FlatMapTest.java b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/FlatMapTest.java index c2b98e3cc9d..ac90b89e10d 100644 --- a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/FlatMapTest.java +++ b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/FlatMapTest.java @@ -39,9 +39,10 @@ public class FlatMapTest { Tuple1 tup = new Tuple1("asd"); StreamExecutionEnvironment context = new StreamExecutionEnvironment(); + DataStream> dataStream0 = context.setDummySource(); - DataStream> dataStream = context.setDummySource().flatMap(new MyFlatMap()) - .addDummySink(); + DataStream> dataStream1 = context.setDummySource().connectWith(dataStream0) + .partitionBy(0).flatMap(new MyFlatMap()).broadcast().addDummySink(); context.execute(); -- GitLab