提交 709edb80 编写于 作者: A Aljoscha Krettek

[FLINK-4460] Add side output ITCase for multiple side out consumers

上级 c3fc8b97
......@@ -165,6 +165,73 @@ public class SideOutputITCase extends StreamingMultipleProgramsTestBase implemen
assertEquals(Arrays.asList("E:1", "E:2", "E:3", "E:4", "E:5", "WM:0", "WM:2", "WM:" + Long.MAX_VALUE), resultSink.getSortedResult());
}
@Test
public void testSideOutputWithMultipleConsumers() throws Exception {
TestListResultSink<String> sideOutputResultSink1 = new TestListResultSink<>();
TestListResultSink<String> sideOutputResultSink2 = new TestListResultSink<>();
TestListResultSink<Integer> resultSink = new TestListResultSink<>();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
DataStream<Integer> dataStream = env.fromCollection(elements);
SingleOutputStreamOperator<Integer> passThroughtStream = dataStream
.process(new ProcessFunction<Integer, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public void processElement(
Integer value, Context ctx, Collector<Integer> out) throws Exception {
out.collect(value);
ctx.output(sideOutputTag1, "sideout-" + String.valueOf(value));
}
});
passThroughtStream.getSideOutput(sideOutputTag1).addSink(sideOutputResultSink1);
passThroughtStream.getSideOutput(sideOutputTag1).addSink(sideOutputResultSink2);
passThroughtStream.addSink(resultSink);
env.execute();
assertEquals(Arrays.asList("sideout-1", "sideout-2", "sideout-3", "sideout-4", "sideout-5"), sideOutputResultSink1.getSortedResult());
assertEquals(Arrays.asList("sideout-1", "sideout-2", "sideout-3", "sideout-4", "sideout-5"), sideOutputResultSink2.getSortedResult());
assertEquals(Arrays.asList(1, 2, 3, 4, 5), resultSink.getSortedResult());
}
@Test
public void testSideOutputWithMultipleConsumersWithObjectReuse() throws Exception {
TestListResultSink<String> sideOutputResultSink1 = new TestListResultSink<>();
TestListResultSink<String> sideOutputResultSink2 = new TestListResultSink<>();
TestListResultSink<Integer> resultSink = new TestListResultSink<>();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableObjectReuse();
env.setParallelism(3);
DataStream<Integer> dataStream = env.fromCollection(elements);
SingleOutputStreamOperator<Integer> passThroughtStream = dataStream
.process(new ProcessFunction<Integer, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public void processElement(
Integer value, Context ctx, Collector<Integer> out) throws Exception {
out.collect(value);
ctx.output(sideOutputTag1, "sideout-" + String.valueOf(value));
}
});
passThroughtStream.getSideOutput(sideOutputTag1).addSink(sideOutputResultSink1);
passThroughtStream.getSideOutput(sideOutputTag1).addSink(sideOutputResultSink2);
passThroughtStream.addSink(resultSink);
env.execute();
assertEquals(Arrays.asList("sideout-1", "sideout-2", "sideout-3", "sideout-4", "sideout-5"), sideOutputResultSink1.getSortedResult());
assertEquals(Arrays.asList("sideout-1", "sideout-2", "sideout-3", "sideout-4", "sideout-5"), sideOutputResultSink2.getSortedResult());
assertEquals(Arrays.asList(1, 2, 3, 4, 5), resultSink.getSortedResult());
}
/**
* Test ProcessFunction side output.
*/
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册