提交 ce56b146 编写于 作者: G gyfora 提交者: Stephan Ewen

[streaming] connectWith and partitioning added to newapi

上级 ba25a0b0
...@@ -15,8 +15,11 @@ ...@@ -15,8 +15,11 @@
package eu.stratosphere.api.datastream; package eu.stratosphere.api.datastream;
import java.util.ArrayList;
import java.util.List;
import java.util.Random; import java.util.Random;
import eu.stratosphere.api.datastream.StreamExecutionEnvironment.ConnectionType;
import eu.stratosphere.api.java.functions.FlatMapFunction; import eu.stratosphere.api.java.functions.FlatMapFunction;
import eu.stratosphere.api.java.functions.MapFunction; import eu.stratosphere.api.java.functions.MapFunction;
import eu.stratosphere.api.java.tuple.Tuple; import eu.stratosphere.api.java.tuple.Tuple;
...@@ -28,11 +31,16 @@ public class DataStream<T extends Tuple> { ...@@ -28,11 +31,16 @@ public class DataStream<T extends Tuple> {
private TypeInformation<T> type; private TypeInformation<T> type;
private final Random random = new Random(); private final Random random = new Random();
private final String id; private final String id;
List<String> connectIDs;
ConnectionType ctype = ConnectionType.SHUFFLE;
int cparam = 0;
protected DataStream() { protected DataStream() {
// TODO implement // TODO implement
context = new StreamExecutionEnvironment(); context = new StreamExecutionEnvironment();
id = "source"; id = "source";
connectIDs = new ArrayList<String>();
connectIDs.add(getId());
} }
protected DataStream(StreamExecutionEnvironment context) { protected DataStream(StreamExecutionEnvironment context) {
...@@ -40,15 +48,33 @@ public class DataStream<T extends Tuple> { ...@@ -40,15 +48,33 @@ public class DataStream<T extends Tuple> {
throw new NullPointerException("context is null"); throw new NullPointerException("context is null");
} }
//TODO add name based on component number an preferable sequential id // TODO add name based on component number an preferable sequential id
this.id = Long.toHexString(random.nextLong()) + Long.toHexString(random.nextLong()); this.id = Long.toHexString(random.nextLong()) + Long.toHexString(random.nextLong());
this.context = context; this.context = context;
connectIDs = new ArrayList<String>();
connectIDs.add(getId());
} }
public String getId() { public String getId() {
return id; return id;
} }
public DataStream<T> connectWith(DataStream<T> stream) {
connectIDs.add(stream.getId());
return this;
}
public DataStream<T> partitionBy(int keyposition) {
ctype = ConnectionType.FIELD;
cparam = keyposition;
return this;
}
public DataStream<T> broadcast() {
ctype = ConnectionType.BROADCAST;
return this;
}
public <R extends Tuple> DataStream<R> flatMap(FlatMapFunction<T, R> flatMapper) { public <R extends Tuple> DataStream<R> flatMap(FlatMapFunction<T, R> flatMapper) {
return context.addFlatMapFunction(this, flatMapper); return context.addFlatMapFunction(this, flatMapper);
} }
...@@ -56,7 +82,7 @@ public class DataStream<T extends Tuple> { ...@@ -56,7 +82,7 @@ public class DataStream<T extends Tuple> {
public <R extends Tuple> DataStream<R> map(MapFunction<T, R> mapper) { public <R extends Tuple> DataStream<R> map(MapFunction<T, R> mapper) {
return context.addMapFunction(this, mapper); return context.addMapFunction(this, mapper);
} }
public <R extends Tuple> DataStream<R> addDummySink() { public <R extends Tuple> DataStream<R> addDummySink() {
return context.addDummySink(this); return context.addDummySink(this);
} }
......
...@@ -18,11 +18,13 @@ package eu.stratosphere.api.datastream; ...@@ -18,11 +18,13 @@ package eu.stratosphere.api.datastream;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.ObjectOutputStream; import java.io.ObjectOutputStream;
import java.util.List;
import eu.stratosphere.api.java.functions.FlatMapFunction; import eu.stratosphere.api.java.functions.FlatMapFunction;
import eu.stratosphere.api.java.functions.MapFunction; import eu.stratosphere.api.java.functions.MapFunction;
import eu.stratosphere.api.java.tuple.Tuple; import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.api.java.tuple.Tuple1; import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.nephele.io.InputChannelResult;
import eu.stratosphere.streaming.api.JobGraphBuilder; import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.api.StreamCollector; import eu.stratosphere.streaming.api.StreamCollector;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable; import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
...@@ -54,6 +56,28 @@ public class StreamExecutionEnvironment { ...@@ -54,6 +56,28 @@ public class StreamExecutionEnvironment {
} }
public static enum ConnectionType {
SHUFFLE, BROADCAST, FIELD
}
private void connectGraph(List<String> inputIDs, String outputID, ConnectionType type, int param) {
for (String input : inputIDs) {
switch (type) {
case SHUFFLE:
jobGraphBuilder.shuffleConnect(input, outputID);
break;
case BROADCAST:
jobGraphBuilder.broadcastConnect(input, outputID);
break;
case FIELD:
jobGraphBuilder.fieldsConnect(input, outputID, param);
break;
}
}
}
public <T extends Tuple, R extends Tuple> DataStream<R> addFlatMapFunction( public <T extends Tuple, R extends Tuple> DataStream<R> addFlatMapFunction(
DataStream<T> inputStream, final FlatMapFunction<T, R> flatMapper) { DataStream<T> inputStream, final FlatMapFunction<T, R> flatMapper) {
DataStream<R> returnStream = new DataStream<R>(this); DataStream<R> returnStream = new DataStream<R>(this);
...@@ -71,7 +95,8 @@ public class StreamExecutionEnvironment { ...@@ -71,7 +95,8 @@ public class StreamExecutionEnvironment {
jobGraphBuilder.setTask(returnStream.getId(), new FlatMapInvokable<T, R>(flatMapper), jobGraphBuilder.setTask(returnStream.getId(), new FlatMapInvokable<T, R>(flatMapper),
"flatMap", baos.toByteArray()); "flatMap", baos.toByteArray());
jobGraphBuilder.shuffleConnect(inputStream.getId(), returnStream.getId()); connectGraph(inputStream.connectIDs, returnStream.getId(), inputStream.ctype,
inputStream.cparam);
return returnStream; return returnStream;
} }
...@@ -93,8 +118,9 @@ public class StreamExecutionEnvironment { ...@@ -93,8 +118,9 @@ public class StreamExecutionEnvironment {
jobGraphBuilder.setTask(returnStream.getId(), new MapInvokable<T, R>(mapper), "map", jobGraphBuilder.setTask(returnStream.getId(), new MapInvokable<T, R>(mapper), "map",
baos.toByteArray()); baos.toByteArray());
jobGraphBuilder.shuffleConnect(inputStream.getId(), returnStream.getId()); connectGraph(inputStream.connectIDs, returnStream.getId(), inputStream.ctype,
inputStream.cparam);
return returnStream; return returnStream;
} }
...@@ -124,7 +150,8 @@ public class StreamExecutionEnvironment { ...@@ -124,7 +150,8 @@ public class StreamExecutionEnvironment {
jobGraphBuilder.setSink("sink", new DummySink(), "sink", baos.toByteArray()); jobGraphBuilder.setSink("sink", new DummySink(), "sink", baos.toByteArray());
jobGraphBuilder.shuffleConnect(inputStream.getId(), "sink"); connectGraph(inputStream.connectIDs, "sink", inputStream.ctype,
inputStream.cparam);
return new DataStream<R>(this); return new DataStream<R>(this);
} }
......
...@@ -128,8 +128,9 @@ public class JobGraphBuilder { ...@@ -128,8 +128,9 @@ public class JobGraphBuilder {
* @param subtasksPerInstance * @param subtasksPerInstance
* Number of subtasks allocated to a machine * Number of subtasks allocated to a machine
*/ */
public Configuration setSource(String sourceName, UserSourceInvokable<? extends Tuple> InvokableObject, public Configuration setSource(String sourceName,
int parallelism, int subtasksPerInstance) { UserSourceInvokable<? extends Tuple> InvokableObject, int parallelism,
int subtasksPerInstance) {
final JobInputVertex source = new JobInputVertex(sourceName, jobGraph); final JobInputVertex source = new JobInputVertex(sourceName, jobGraph);
source.setInputClass(StreamSource.class); source.setInputClass(StreamSource.class);
Configuration config = setComponent(sourceName, InvokableObject, parallelism, Configuration config = setComponent(sourceName, InvokableObject, parallelism,
...@@ -174,7 +175,8 @@ public class JobGraphBuilder { ...@@ -174,7 +175,8 @@ public class JobGraphBuilder {
* Number of subtasks allocated to a machine * Number of subtasks allocated to a machine
* @return * @return
*/ */
public Configuration setTask(String taskName, UserTaskInvokable<? extends Tuple, ? extends Tuple> TaskInvokableObject, public Configuration setTask(String taskName,
UserTaskInvokable<? extends Tuple, ? extends Tuple> TaskInvokableObject,
int parallelism, int subtasksPerInstance) { int parallelism, int subtasksPerInstance) {
final JobTaskVertex task = new JobTaskVertex(taskName, jobGraph); final JobTaskVertex task = new JobTaskVertex(taskName, jobGraph);
task.setTaskClass(StreamTask.class); task.setTaskClass(StreamTask.class);
...@@ -186,8 +188,8 @@ public class JobGraphBuilder { ...@@ -186,8 +188,8 @@ public class JobGraphBuilder {
return config; return config;
} }
public void setSink(String sinkName, UserSinkInvokable<? extends Tuple> InvokableObject, String operatorName, public void setSink(String sinkName, UserSinkInvokable<? extends Tuple> InvokableObject,
byte[] serializedFunction) { String operatorName, byte[] serializedFunction) {
Configuration config = setSink(sinkName, InvokableObject, 1, 1); Configuration config = setSink(sinkName, InvokableObject, 1, 1);
config.setBytes("operator", serializedFunction); config.setBytes("operator", serializedFunction);
config.setString("operatorName", operatorName); config.setString("operatorName", operatorName);
...@@ -206,8 +208,9 @@ public class JobGraphBuilder { ...@@ -206,8 +208,9 @@ public class JobGraphBuilder {
* @param subtasksPerInstance * @param subtasksPerInstance
* Number of subtasks allocated to a machine * Number of subtasks allocated to a machine
*/ */
public Configuration setSink(String sinkName, UserSinkInvokable<? extends Tuple> InvokableObject, public Configuration setSink(String sinkName,
int parallelism, int subtasksPerInstance) { UserSinkInvokable<? extends Tuple> InvokableObject, int parallelism,
int subtasksPerInstance) {
final JobOutputVertex sink = new JobOutputVertex(sinkName, jobGraph); final JobOutputVertex sink = new JobOutputVertex(sinkName, jobGraph);
sink.setOutputClass(StreamSink.class); sink.setOutputClass(StreamSink.class);
Configuration config = setComponent(sinkName, InvokableObject, parallelism, Configuration config = setComponent(sinkName, InvokableObject, parallelism,
...@@ -257,8 +260,9 @@ public class JobGraphBuilder { ...@@ -257,8 +260,9 @@ public class JobGraphBuilder {
return config; return config;
} }
private Configuration setComponent(String componentName, UserSourceInvokable<? extends Tuple> InvokableObject, private Configuration setComponent(String componentName,
int parallelism, int subtasksPerInstance, AbstractJobVertex component) { UserSourceInvokable<? extends Tuple> InvokableObject, int parallelism,
int subtasksPerInstance, AbstractJobVertex component) {
Configuration config = setComponent(componentName, InvokableObject.getClass(), parallelism, Configuration config = setComponent(componentName, InvokableObject.getClass(), parallelism,
subtasksPerInstance, component); subtasksPerInstance, component);
...@@ -266,8 +270,9 @@ public class JobGraphBuilder { ...@@ -266,8 +270,9 @@ public class JobGraphBuilder {
return config; return config;
} }
private Configuration setComponent(String componentName, UserTaskInvokable<? extends Tuple, ? extends Tuple> InvokableObject, private Configuration setComponent(String componentName,
int parallelism, int subtasksPerInstance, AbstractJobVertex component) { UserTaskInvokable<? extends Tuple, ? extends Tuple> InvokableObject, int parallelism,
int subtasksPerInstance, AbstractJobVertex component) {
Configuration config = setComponent(componentName, InvokableObject.getClass(), parallelism, Configuration config = setComponent(componentName, InvokableObject.getClass(), parallelism,
subtasksPerInstance, component); subtasksPerInstance, component);
...@@ -275,8 +280,9 @@ public class JobGraphBuilder { ...@@ -275,8 +280,9 @@ public class JobGraphBuilder {
return config; return config;
} }
private Configuration setComponent(String componentName, UserSinkInvokable<? extends Tuple> InvokableObject, private Configuration setComponent(String componentName,
int parallelism, int subtasksPerInstance, AbstractJobVertex component) { UserSinkInvokable<? extends Tuple> InvokableObject, int parallelism,
int subtasksPerInstance, AbstractJobVertex component) {
Configuration config = setComponent(componentName, InvokableObject.getClass(), parallelism, Configuration config = setComponent(componentName, InvokableObject.getClass(), parallelism,
subtasksPerInstance, component); subtasksPerInstance, component);
...@@ -396,6 +402,7 @@ public class JobGraphBuilder { ...@@ -396,6 +402,7 @@ public class JobGraphBuilder {
public void broadcastConnect(String upStreamComponentName, String downStreamComponentName) { public void broadcastConnect(String upStreamComponentName, String downStreamComponentName) {
connect(upStreamComponentName, downStreamComponentName, BroadcastPartitioner.class); connect(upStreamComponentName, downStreamComponentName, BroadcastPartitioner.class);
addOutputChannels(upStreamComponentName, numberOfInstances.get(downStreamComponentName)); addOutputChannels(upStreamComponentName, numberOfInstances.get(downStreamComponentName));
log.info("Broadcastconnected: " + upStreamComponentName + " to " + downStreamComponentName);
} }
/** /**
...@@ -444,6 +451,9 @@ public class JobGraphBuilder { ...@@ -444,6 +451,9 @@ public class JobGraphBuilder {
+ downStreamComponentName, e); + downStreamComponentName, e);
} }
} }
log.info("Fieldsconnected " + upStreamComponentName + " to " + downStreamComponentName
+ " on " + keyPosition);
} }
/** /**
...@@ -461,6 +471,8 @@ public class JobGraphBuilder { ...@@ -461,6 +471,8 @@ public class JobGraphBuilder {
public void globalConnect(String upStreamComponentName, String downStreamComponentName) { public void globalConnect(String upStreamComponentName, String downStreamComponentName) {
connect(upStreamComponentName, downStreamComponentName, GlobalPartitioner.class); connect(upStreamComponentName, downStreamComponentName, GlobalPartitioner.class);
addOutputChannels(upStreamComponentName, 1); addOutputChannels(upStreamComponentName, 1);
log.info("Globalconnected: " + upStreamComponentName + " to " + downStreamComponentName);
} }
/** /**
...@@ -478,6 +490,7 @@ public class JobGraphBuilder { ...@@ -478,6 +490,7 @@ public class JobGraphBuilder {
public void shuffleConnect(String upStreamComponentName, String downStreamComponentName) { public void shuffleConnect(String upStreamComponentName, String downStreamComponentName) {
connect(upStreamComponentName, downStreamComponentName, ShufflePartitioner.class); connect(upStreamComponentName, downStreamComponentName, ShufflePartitioner.class);
addOutputChannels(upStreamComponentName, 1); addOutputChannels(upStreamComponentName, 1);
log.info("Shuffleconnected: " + upStreamComponentName + " to " + downStreamComponentName);
} }
private void addOutputChannels(String upStreamComponentName, int numOfInstances) { private void addOutputChannels(String upStreamComponentName, int numOfInstances) {
......
...@@ -39,9 +39,10 @@ public class FlatMapTest { ...@@ -39,9 +39,10 @@ public class FlatMapTest {
Tuple1<String> tup = new Tuple1<String>("asd"); Tuple1<String> tup = new Tuple1<String>("asd");
StreamExecutionEnvironment context = new StreamExecutionEnvironment(); StreamExecutionEnvironment context = new StreamExecutionEnvironment();
DataStream<Tuple1<String>> dataStream0 = context.setDummySource();
DataStream<Tuple1<String>> dataStream = context.setDummySource().flatMap(new MyFlatMap()) DataStream<Tuple1<String>> dataStream1 = context.setDummySource().connectWith(dataStream0)
.addDummySink(); .partitionBy(0).flatMap(new MyFlatMap()).broadcast().addDummySink();
context.execute(); context.execute();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册