提交 ccc1f5f2 编写于 作者: G gyfora 提交者: Stephan Ewen

[streaming] streamcollector2 bugfix

上级 54f06b16
...@@ -47,8 +47,10 @@ public class StreamCollector2<T extends Tuple> implements Collector<T> { ...@@ -47,8 +47,10 @@ public class StreamCollector2<T extends Tuple> implements Collector<T> {
this.keyPostition = keyPosition; this.keyPostition = keyPosition;
for (int i = 0; i < batchSizesOfNotPartitioned.size(); i++) { for (int i = 0; i < batchSizesOfNotPartitioned.size(); i++) {
List<RecordWriter<StreamRecord>> output = new ArrayList<RecordWriter<StreamRecord>>();
output.add(notPartitionedOutputs.get(i));
notPartitionedCollectors.add(new StreamCollector<Tuple>(batchSizesOfNotPartitioned.get(i), notPartitionedCollectors.add(new StreamCollector<Tuple>(batchSizesOfNotPartitioned.get(i),
batchTimeout, channelID, serializationDelegate, notPartitionedOutputs)); batchTimeout, channelID, serializationDelegate, output));
} }
for (int i = 0; i < batchSizesOfPartitioned.size(); i++) { for (int i = 0; i < batchSizesOfPartitioned.size(); i++) {
......
...@@ -51,6 +51,7 @@ public class PrintTest { ...@@ -51,6 +51,7 @@ public class PrintTest {
StreamExecutionEnvironment env = new StreamExecutionEnvironment(); StreamExecutionEnvironment env = new StreamExecutionEnvironment();
DataStream<Tuple2<Integer, String>> source = env.addSource(new MySource(), 1); DataStream<Tuple2<Integer, String>> source = env.addSource(new MySource(), 1);
DataStream<Tuple2<Integer, String>> map = source.flatMap(new MyFlatMap(), 1).print(); DataStream<Tuple2<Integer, String>> map = source.flatMap(new MyFlatMap(), 1).print();
source.print();
env.execute(); env.execute();
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册