提交 25484701 编写于 作者: G gyfora 提交者: Stephan Ewen

[streaming] minor update for partitioning setting with connectWith

上级 44c378ae
......@@ -48,7 +48,7 @@ public class DataStream<T extends Tuple> {
if (context == null) {
throw new NullPointerException("context is null");
}
// TODO add name based on component number an preferable sequential id
this.id = Long.toHexString(random.nextLong()) + Long.toHexString(random.nextLong());
this.context = context;
......@@ -77,18 +77,23 @@ public class DataStream<T extends Tuple> {
}
public DataStream<T> partitionBy(int keyposition) {
ctypes.set(0, ConnectionType.FIELD);
cparams.set(0, keyposition);
for (int i = 0; i < ctypes.size(); i++) {
ctypes.set(i, ConnectionType.FIELD);
cparams.set(i, keyposition);
}
return this;
}
public DataStream<T> broadcast() {
ctypes.set(0, ConnectionType.BROADCAST);
for (int i = 0; i < ctypes.size(); i++) {
ctypes.set(i, ConnectionType.BROADCAST);
}
return this;
}
public <R extends Tuple> DataStream<R> flatMap(FlatMapFunction<T, R> flatMapper) {
return context.addFunction("flatMap", this, flatMapper, new FlatMapInvokable<T, R>(flatMapper));
return context.addFunction("flatMap", this, flatMapper, new FlatMapInvokable<T, R>(
flatMapper));
}
public <R extends Tuple> DataStream<R> map(MapFunction<T, R> mapper) {
......@@ -96,13 +101,14 @@ public class DataStream<T extends Tuple> {
}
public <R extends Tuple> DataStream<R> batchReduce(GroupReduceFunction<T, R> reducer) {
return context.addFunction("batchReduce", this, reducer, new BatchReduceInvokable<T, R>(reducer));
return context.addFunction("batchReduce", this, reducer, new BatchReduceInvokable<T, R>(
reducer));
}
public DataStream<T> filter(FilterFunction<T> filter) {
return context.addFunction("filter", this, filter, new FilterInvokable<T>(filter));
}
public DataStream<T> addSink(SinkFunction<T> sinkFunction) {
return context.addSink(this, sinkFunction);
}
......
......@@ -31,11 +31,11 @@ public class JoinLocal {
StreamExecutionEnvironment context = new StreamExecutionEnvironment();
DataStream<Tuple3<String, String, Integer>> source1 = context
.addSource(new JoinSourceOne()).partitionBy(1);
.addSource(new JoinSourceOne());
@SuppressWarnings("unused")
DataStream<Tuple3<String, Integer, Integer>> source2 = context
.addSource(new JoinSourceTwo()).partitionBy(1).connectWith(source1)
.addSource(new JoinSourceTwo()).connectWith(source1).partitionBy(1)
.flatMap(new JoinTask()).addSink(new JoinSink());
context.execute();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册