提交 a39148af 编写于 作者: G gyfora 提交者: Stephan Ewen

[streaming] connectWith fix for different connection types

上级 adfb62aa
......@@ -19,6 +19,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import scala.Array;
import eu.stratosphere.api.datastream.StreamExecutionEnvironment.ConnectionType;
import eu.stratosphere.api.java.functions.FlatMapFunction;
import eu.stratosphere.api.java.functions.MapFunction;
......@@ -32,15 +33,14 @@ public class DataStream<T extends Tuple> {
private final Random random = new Random();
private final String id;
List<String> connectIDs;
ConnectionType ctype = ConnectionType.SHUFFLE;
int cparam = 0;
List<ConnectionType> ctypes;
List<Integer> cparams;
protected DataStream() {
// TODO implement
context = new StreamExecutionEnvironment();
id = "source";
connectIDs = new ArrayList<String>();
connectIDs.add(getId());
initConnections();
}
protected DataStream(StreamExecutionEnvironment context) {
......@@ -51,8 +51,17 @@ public class DataStream<T extends Tuple> {
// TODO add name based on component number an preferable sequential id
this.id = Long.toHexString(random.nextLong()) + Long.toHexString(random.nextLong());
this.context = context;
initConnections();
}
private void initConnections() {
connectIDs = new ArrayList<String>();
connectIDs.add(getId());
ctypes = new ArrayList<StreamExecutionEnvironment.ConnectionType>();
ctypes.add(ConnectionType.SHUFFLE);
cparams = new ArrayList<Integer>();
cparams.add(0);
}
public String getId() {
......@@ -60,18 +69,20 @@ public class DataStream<T extends Tuple> {
}
public DataStream<T> connectWith(DataStream<T> stream) {
connectIDs.add(stream.getId());
connectIDs.addAll(stream.connectIDs);
ctypes.addAll(stream.ctypes);
cparams.addAll(stream.cparams);
return this;
}
public DataStream<T> partitionBy(int keyposition) {
ctype = ConnectionType.FIELD;
cparam = keyposition;
ctypes.set(0, ConnectionType.FIELD);
cparams.set(0, keyposition);
return this;
}
public DataStream<T> broadcast() {
ctype = ConnectionType.BROADCAST;
ctypes.set(0, ConnectionType.BROADCAST);
return this;
}
......@@ -82,12 +93,12 @@ public class DataStream<T extends Tuple> {
public <R extends Tuple> DataStream<R> map(MapFunction<T, R> mapper) {
return context.addMapFunction(this, mapper);
}
public DataStream<T> addSink(SinkFunction<T> sinkFunction) {
return context.addSink(this, sinkFunction);
}
public DataStream<T> addDummySink() {
public DataStream<T> addDummySink() {
return context.addDummySink(this);
}
......
......@@ -59,9 +59,13 @@ public class StreamExecutionEnvironment {
SHUFFLE, BROADCAST, FIELD
}
private void connectGraph(List<String> inputIDs, String outputID, ConnectionType type, int param) {
private <T extends Tuple> void connectGraph(DataStream<T> inputStream, String outputID) {
for (int i = 0; i < inputStream.connectIDs.size(); i++) {
ConnectionType type = inputStream.ctypes.get(i);
String input = inputStream.connectIDs.get(i);
int param = inputStream.cparams.get(i);
for (String input : inputIDs) {
switch (type) {
case SHUFFLE:
jobGraphBuilder.shuffleConnect(input, outputID);
......@@ -94,8 +98,7 @@ public class StreamExecutionEnvironment {
jobGraphBuilder.setTask(returnStream.getId(), new FlatMapInvokable<T, R>(flatMapper),
"flatMap", baos.toByteArray());
connectGraph(inputStream.connectIDs, returnStream.getId(), inputStream.ctype,
inputStream.cparam);
connectGraph(inputStream, returnStream.getId());
return returnStream;
}
......@@ -117,8 +120,7 @@ public class StreamExecutionEnvironment {
jobGraphBuilder.setTask(returnStream.getId(), new MapInvokable<T, R>(mapper), "map",
baos.toByteArray());
connectGraph(inputStream.connectIDs, returnStream.getId(), inputStream.ctype,
inputStream.cparam);
connectGraph(inputStream, returnStream.getId());
return returnStream;
}
......@@ -140,7 +142,7 @@ public class StreamExecutionEnvironment {
jobGraphBuilder.setSink("sink", new SinkInvokable<T>(sinkFunction), "sink",
baos.toByteArray());
connectGraph(inputStream.connectIDs, "sink", inputStream.ctype, inputStream.cparam);
connectGraph(inputStream, "sink");
return returnStream;
}
......
......@@ -55,7 +55,6 @@ public class FlatMapTest {
@Override
public void invoke(Tuple1<String> tuple) {
// TODO Auto-generated method stub
System.out.println(tuple);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册