diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java index e61b39ade9c994a8814cd4d93ae040dd4427e09d..9c5d9f072b82e9689d4fb1ac1b242a597eda8a8b 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java @@ -33,6 +33,9 @@ import org.apache.flink.streaming.api.transformations.StreamTransformation; import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; import org.apache.flink.util.Preconditions; +import java.util.HashMap; +import java.util.Map; + import static java.util.Objects.requireNonNull; /** @@ -47,6 +50,13 @@ public class SingleOutputStreamOperator extends DataStream { /** Indicate this is a non-parallel operator and cannot set a non-1 degree of parallelism. **/ protected boolean nonParallel = false; + /** + * We keep track of the side outputs that were already requested and their types. With this, + * we can catch the case when a side output with a matching id is requested for a different + * type because this would lead to problems at runtime. + */ + private Map, TypeInformation> requestedSideOutputs = new HashMap<>(); + protected SingleOutputStreamOperator(StreamExecutionEnvironment environment, StreamTransformation transformation) { super(environment, transformation); } @@ -425,9 +435,22 @@ public class SingleOutputStreamOperator extends DataStream { * * @see org.apache.flink.streaming.api.functions.ProcessFunction.Context#output(OutputTag, Object) */ - public DataStream getSideOutput(OutputTag sideOutputTag){ - sideOutputTag = clean(sideOutputTag); - SideOutputTransformation sideOutputTransformation = new SideOutputTransformation<>(this.getTransformation(), requireNonNull(sideOutputTag)); + public DataStream getSideOutput(OutputTag sideOutputTag) { + sideOutputTag = clean(requireNonNull(sideOutputTag)); + + // make a defensive copy + sideOutputTag = new OutputTag(sideOutputTag.getId(), sideOutputTag.getTypeInfo()); + + TypeInformation type = requestedSideOutputs.get(sideOutputTag); + if (type != null && !type.equals(sideOutputTag.getTypeInfo())) { + throw new UnsupportedOperationException("A side output with a matching id was " + + "already requested with a different type. This is not allowed, side output " + + "ids need to be unique."); + } + + requestedSideOutputs.put(sideOutputTag, sideOutputTag.getTypeInfo()); + + SideOutputTransformation sideOutputTransformation = new SideOutputTransformation<>(this.getTransformation(), sideOutputTag); return new DataStream<>(this.getExecutionEnvironment(), sideOutputTransformation); } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java index e792a5a855d738d6da788355f30b50ee4fb5a184..a99efb1c64f04ba845cab25ba8505e1b846eaafb 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java @@ -310,6 +310,23 @@ public class StreamGraph extends StreamingPlan { throw new IllegalStateException("Already has virtual output node with id " + virtualId); } + // verify that we don't already have a virtual node for the given originalId/outputTag + // combination with a different TypeInformation. This would indicate that someone is trying + // to read a side output from an operation with a different type for the same side output + // id. + + for (Tuple2 tag : virtualSideOutputNodes.values()) { + if (!tag.f0.equals(originalId)) { + // different source operator + continue; + } + + if (!tag.f1.getTypeInfo().equals(outputTag.getTypeInfo())) { + throw new IllegalArgumentException("Trying to add a side input for the same id " + + "with a different type. This is not allowed."); + } + } + virtualSideOutputNodes.put(virtualId, new Tuple2<>(originalId, outputTag)); } @@ -358,7 +375,8 @@ public class StreamGraph extends StreamingPlan { downStreamVertexID, typeNumber, null, - new ArrayList(), null); + new ArrayList(), + null); } diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java index 44ba576abb7b40e934743f3f58072c07d3ca7f6b..27124cc849572d93fa4cc275fbb3b9fd2cf79420 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java @@ -40,7 +40,9 @@ import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; import org.apache.flink.test.streaming.runtime.util.TestListResultSink; import org.apache.flink.util.Collector; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import javax.annotation.Nullable; import java.io.Serializable; @@ -56,6 +58,9 @@ import static org.junit.Assert.assertEquals; */ public class SideOutputITCase extends StreamingMultipleProgramsTestBase implements Serializable { + @Rule + public transient ExpectedException expectedException = ExpectedException.none(); + static List elements = new ArrayList<>(); static { elements.add(1); @@ -65,14 +70,14 @@ public class SideOutputITCase extends StreamingMultipleProgramsTestBase implemen elements.add(4); } - private final static OutputTag sideOutputTag1 = new OutputTag("side"){}; - private final static OutputTag sideOutputTag2 = new OutputTag("other-side"){}; - /** * Verify that watermarks are forwarded to all side outputs. */ @Test public void testWatermarkForwarding() throws Exception { + final OutputTag sideOutputTag1 = new OutputTag("side"){}; + final OutputTag sideOutputTag2 = new OutputTag("other-side"){}; + TestListResultSink sideOutputResultSink1 = new TestListResultSink<>(); TestListResultSink sideOutputResultSink2 = new TestListResultSink<>(); TestListResultSink resultSink = new TestListResultSink<>(); @@ -167,6 +172,8 @@ public class SideOutputITCase extends StreamingMultipleProgramsTestBase implemen @Test public void testSideOutputWithMultipleConsumers() throws Exception { + final OutputTag sideOutputTag = new OutputTag("side"){}; + TestListResultSink sideOutputResultSink1 = new TestListResultSink<>(); TestListResultSink sideOutputResultSink2 = new TestListResultSink<>(); TestListResultSink resultSink = new TestListResultSink<>(); @@ -184,12 +191,12 @@ public class SideOutputITCase extends StreamingMultipleProgramsTestBase implemen public void processElement( Integer value, Context ctx, Collector out) throws Exception { out.collect(value); - ctx.output(sideOutputTag1, "sideout-" + String.valueOf(value)); + ctx.output(sideOutputTag, "sideout-" + String.valueOf(value)); } }); - passThroughtStream.getSideOutput(sideOutputTag1).addSink(sideOutputResultSink1); - passThroughtStream.getSideOutput(sideOutputTag1).addSink(sideOutputResultSink2); + passThroughtStream.getSideOutput(sideOutputTag).addSink(sideOutputResultSink1); + passThroughtStream.getSideOutput(sideOutputTag).addSink(sideOutputResultSink2); passThroughtStream.addSink(resultSink); env.execute(); @@ -200,6 +207,8 @@ public class SideOutputITCase extends StreamingMultipleProgramsTestBase implemen @Test public void testSideOutputWithMultipleConsumersWithObjectReuse() throws Exception { + final OutputTag sideOutputTag = new OutputTag("side"){}; + TestListResultSink sideOutputResultSink1 = new TestListResultSink<>(); TestListResultSink sideOutputResultSink2 = new TestListResultSink<>(); TestListResultSink resultSink = new TestListResultSink<>(); @@ -218,12 +227,12 @@ public class SideOutputITCase extends StreamingMultipleProgramsTestBase implemen public void processElement( Integer value, Context ctx, Collector out) throws Exception { out.collect(value); - ctx.output(sideOutputTag1, "sideout-" + String.valueOf(value)); + ctx.output(sideOutputTag, "sideout-" + String.valueOf(value)); } }); - passThroughtStream.getSideOutput(sideOutputTag1).addSink(sideOutputResultSink1); - passThroughtStream.getSideOutput(sideOutputTag1).addSink(sideOutputResultSink2); + passThroughtStream.getSideOutput(sideOutputTag).addSink(sideOutputResultSink1); + passThroughtStream.getSideOutput(sideOutputTag).addSink(sideOutputResultSink2); passThroughtStream.addSink(resultSink); env.execute(); @@ -232,13 +241,13 @@ public class SideOutputITCase extends StreamingMultipleProgramsTestBase implemen assertEquals(Arrays.asList(1, 2, 3, 4, 5), resultSink.getSortedResult()); } - /** - * Test ProcessFunction side output. - */ @Test - public void testProcessFunctionSideOutput() throws Exception { - TestListResultSink sideOutputResultSink = new TestListResultSink<>(); - TestListResultSink resultSink = new TestListResultSink<>(); + public void testSideOutputNameClash() throws Exception { + final OutputTag sideOutputTag1 = new OutputTag("side"){}; + final OutputTag sideOutputTag2 = new OutputTag("side"){}; + + TestListResultSink sideOutputResultSink1 = new TestListResultSink<>(); + TestListResultSink sideOutputResultSink2 = new TestListResultSink<>(); StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment(); see.setParallelism(3); @@ -254,22 +263,23 @@ public class SideOutputITCase extends StreamingMultipleProgramsTestBase implemen Integer value, Context ctx, Collector out) throws Exception { out.collect(value); ctx.output(sideOutputTag1, "sideout-" + String.valueOf(value)); + ctx.output(sideOutputTag2, 13); } }); - passThroughtStream.getSideOutput(sideOutputTag1).addSink(sideOutputResultSink); - passThroughtStream.addSink(resultSink); - see.execute(); + passThroughtStream.getSideOutput(sideOutputTag1).addSink(sideOutputResultSink1); - assertEquals(Arrays.asList("sideout-1", "sideout-2", "sideout-3", "sideout-4", "sideout-5"), sideOutputResultSink.getSortedResult()); - assertEquals(Arrays.asList(1, 2, 3, 4, 5), resultSink.getSortedResult()); + expectedException.expect(UnsupportedOperationException.class); + passThroughtStream.getSideOutput(sideOutputTag2).addSink(sideOutputResultSink2); } /** - * Test keyed ProcessFunction side output. + * Test ProcessFunction side output. */ @Test - public void testKeyedProcessFunctionSideOutput() throws Exception { + public void testProcessFunctionSideOutput() throws Exception { + final OutputTag sideOutputTag = new OutputTag("side"){}; + TestListResultSink sideOutputResultSink = new TestListResultSink<>(); TestListResultSink resultSink = new TestListResultSink<>(); @@ -279,14 +289,6 @@ public class SideOutputITCase extends StreamingMultipleProgramsTestBase implemen DataStream dataStream = see.fromCollection(elements); SingleOutputStreamOperator passThroughtStream = dataStream - .keyBy(new KeySelector() { - private static final long serialVersionUID = 1L; - - @Override - public Integer getKey(Integer value) throws Exception { - return value; - } - }) .process(new ProcessFunction() { private static final long serialVersionUID = 1L; @@ -294,11 +296,11 @@ public class SideOutputITCase extends StreamingMultipleProgramsTestBase implemen public void processElement( Integer value, Context ctx, Collector out) throws Exception { out.collect(value); - ctx.output(sideOutputTag1, "sideout-" + String.valueOf(value)); + ctx.output(sideOutputTag, "sideout-" + String.valueOf(value)); } }); - passThroughtStream.getSideOutput(sideOutputTag1).addSink(sideOutputResultSink); + passThroughtStream.getSideOutput(sideOutputTag).addSink(sideOutputResultSink); passThroughtStream.addSink(resultSink); see.execute(); @@ -306,20 +308,30 @@ public class SideOutputITCase extends StreamingMultipleProgramsTestBase implemen assertEquals(Arrays.asList(1, 2, 3, 4, 5), resultSink.getSortedResult()); } - /** - * Test ProcessFunction side outputs with wrong {@code OutputTag}. + * Test keyed ProcessFunction side output. */ @Test - public void testProcessFunctionSideOutputWithWrongTag() throws Exception { + public void testKeyedProcessFunctionSideOutput() throws Exception { + final OutputTag sideOutputTag = new OutputTag("side"){}; + TestListResultSink sideOutputResultSink = new TestListResultSink<>(); + TestListResultSink resultSink = new TestListResultSink<>(); StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment(); see.setParallelism(3); DataStream dataStream = see.fromCollection(elements); - dataStream + SingleOutputStreamOperator passThroughtStream = dataStream + .keyBy(new KeySelector() { + private static final long serialVersionUID = 1L; + + @Override + public Integer getKey(Integer value) throws Exception { + return value; + } + }) .process(new ProcessFunction() { private static final long serialVersionUID = 1L; @@ -327,20 +339,27 @@ public class SideOutputITCase extends StreamingMultipleProgramsTestBase implemen public void processElement( Integer value, Context ctx, Collector out) throws Exception { out.collect(value); - ctx.output(sideOutputTag2, "sideout-" + String.valueOf(value)); + ctx.output(sideOutputTag, "sideout-" + String.valueOf(value)); } - }).getSideOutput(sideOutputTag1).addSink(sideOutputResultSink); + }); + passThroughtStream.getSideOutput(sideOutputTag).addSink(sideOutputResultSink); + passThroughtStream.addSink(resultSink); see.execute(); - assertEquals(Arrays.asList(), sideOutputResultSink.getSortedResult()); + assertEquals(Arrays.asList("sideout-1", "sideout-2", "sideout-3", "sideout-4", "sideout-5"), sideOutputResultSink.getSortedResult()); + assertEquals(Arrays.asList(1, 2, 3, 4, 5), resultSink.getSortedResult()); } + /** - * Test keyed ProcessFunction side outputs with wrong {@code OutputTag}. + * Test ProcessFunction side outputs with wrong {@code OutputTag}. */ @Test - public void testKeyedProcessFunctionSideOutputWithWrongTag() throws Exception { + public void testProcessFunctionSideOutputWithWrongTag() throws Exception { + final OutputTag sideOutputTag1 = new OutputTag("side"){}; + final OutputTag sideOutputTag2 = new OutputTag("other-side"){}; + TestListResultSink sideOutputResultSink = new TestListResultSink<>(); StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment(); @@ -349,14 +368,6 @@ public class SideOutputITCase extends StreamingMultipleProgramsTestBase implemen DataStream dataStream = see.fromCollection(elements); dataStream - .keyBy(new KeySelector() { - private static final long serialVersionUID = 1L; - - @Override - public Integer getKey(Integer value) throws Exception { - return value; - } - }) .process(new ProcessFunction() { private static final long serialVersionUID = 1L; @@ -373,7 +384,6 @@ public class SideOutputITCase extends StreamingMultipleProgramsTestBase implemen assertEquals(Arrays.asList(), sideOutputResultSink.getSortedResult()); } - private static class TestWatermarkAssigner implements AssignerWithPunctuatedWatermarks { private static final long serialVersionUID = 1L;