diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/Channel.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/Channel.java index 4f8b1bee2ea5df0cae57e3a863a80b05bf3aaca5..b139b621e9d586f877d0676553479f101b53e02b 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/Channel.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/Channel.java @@ -36,6 +36,8 @@ import org.apache.flink.runtime.io.network.DataExchangeMode; import org.apache.flink.runtime.operators.shipping.ShipStrategyType; import org.apache.flink.runtime.operators.util.LocalStrategy; +import static com.google.common.base.Preconditions.checkNotNull; + /** * A Channel represents the result produced by an operator and the data exchange * before the consumption by the target operator. @@ -181,7 +183,17 @@ public class Channel implements EstimateProvider, Cloneable, DumpableConnection< } /** - * Gets the data exchange mode (batch / streaming) to use for the data + * Sets the data exchange mode (batch / pipelined) to use for the data + * exchange of this channel. + * + * @return The data exchange mode of this channel. + */ + public void setDataExchangeMode(DataExchangeMode dataExchangeMode) { + this.dataExchangeMode = checkNotNull(dataExchangeMode); + } + + /** + * Gets the data exchange mode (batch / pipelined) to use for the data * exchange of this channel. * * @return The data exchange mode of this channel. diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java index d440063838776095759a7edf9b54766e3099be82..943ec2e248fa265bdc873e2db88cc8ead6320e1c 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java @@ -593,6 +593,16 @@ public class JobGraphGenerator implements Visitor { if (inputPlanNode instanceof NAryUnionPlanNode) { allInChannels = ((NAryUnionPlanNode) inputPlanNode).getListOfInputs().iterator(); + + // If the union node has a batch data exchange, we have to adopt the exchange mode of + // the inputs of the union as well, because the optimizer has a separate union + // node, which does not exist in the JobGraph. Otherwise, this can result in + // deadlocks when closing a branching flow at runtime. + if (input.getDataExchangeMode().equals(DataExchangeMode.BATCH)) { + for (Channel in : inputPlanNode.getInputs()) { + in.setDataExchangeMode(DataExchangeMode.BATCH); + } + } } else if (inputPlanNode instanceof BulkPartialSolutionPlanNode) { if (this.vertices.get(inputPlanNode) == null) { diff --git a/flink-tests/src/test/java/org/apache/flink/test/UnionClosedBranchingTest.java b/flink-tests/src/test/java/org/apache/flink/test/UnionClosedBranchingTest.java new file mode 100644 index 0000000000000000000000000000000000000000..f7ea9119d798456c56279194f299cdee8dcb55e1 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/UnionClosedBranchingTest.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.test; + +import org.apache.flink.api.common.ExecutionMode; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.DiscardingOutputFormat; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.optimizer.plan.Channel; +import org.apache.flink.optimizer.plan.DualInputPlanNode; +import org.apache.flink.optimizer.plan.NAryUnionPlanNode; +import org.apache.flink.optimizer.plan.OptimizedPlan; +import org.apache.flink.optimizer.plan.SinkPlanNode; +import org.apache.flink.optimizer.plan.SourcePlanNode; +import org.apache.flink.optimizer.plantranslate.JobGraphGenerator; +import org.apache.flink.optimizer.util.CompilerTestBase; +import org.apache.flink.runtime.io.network.DataExchangeMode; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate; +import org.apache.flink.runtime.jobgraph.IntermediateDataSet; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Arrays; +import java.util.Collection; +import java.util.List; + +import static org.apache.flink.runtime.io.network.DataExchangeMode.BATCH; +import static org.apache.flink.runtime.io.network.DataExchangeMode.PIPELINED; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * This tests a fix for FLINK-2540. + * + *

This test is necessary, because {@link NAryUnionPlanNode}s are not directly translated + * to runtime tasks by the {@link JobGraphGenerator}. Instead, the network stack unions the + * inputs by directly reading from multiple inputs (via {@link UnionInputGate}). + * + *

+ *   (source)-\        /-\
+ *            (union)-+  (join)
+ *   (source)-/        \-/
+ * 
+ * + * @see FLINK-2540 + */ +@RunWith(Parameterized.class) +@SuppressWarnings({"serial","unchecked"}) +public class UnionClosedBranchingTest extends CompilerTestBase { + + @Parameterized.Parameters + public static Collection params() { + Collection params = Arrays.asList(new Object[][]{ + {ExecutionMode.PIPELINED, PIPELINED, BATCH}, + {ExecutionMode.PIPELINED_FORCED, PIPELINED, PIPELINED}, + {ExecutionMode.BATCH, BATCH, BATCH}, + {ExecutionMode.BATCH_FORCED, BATCH, BATCH}, + }); + + // Make sure that changes to ExecutionMode are reflected in this test. + assertEquals(ExecutionMode.values().length, params.size()); + + return params; + } + + private final ExecutionMode executionMode; + + /** Expected {@link DataExchangeMode} from sources to union. */ + private final DataExchangeMode sourceToUnion; + + /** Expected {@link DataExchangeMode} from union to join. */ + private final DataExchangeMode unionToJoin; + + public UnionClosedBranchingTest( + ExecutionMode executionMode, + DataExchangeMode sourceToUnion, + DataExchangeMode unionToJoin) { + + this.executionMode = executionMode; + this.sourceToUnion = sourceToUnion; + this.unionToJoin = unionToJoin; + } + + @Test + public void testUnionClosedBranchingTest() throws Exception { + + // ----------------------------------------------------------------------------------------- + // Build test program + // ----------------------------------------------------------------------------------------- + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.getConfig().setExecutionMode(executionMode); + env.setParallelism(4); + + DataSet> src1 = env.fromElements(new Tuple1<>(0), new Tuple1<>(1)); + + DataSet> src2 = env.fromElements(new Tuple1<>(0), new Tuple1<>(1)); + + DataSet> union = src1.union(src2); + + DataSet> join = union + .join(union).where(0).equalTo(0) + .projectFirst(0).projectSecond(0); + + join.output(new DiscardingOutputFormat>()); + + // ----------------------------------------------------------------------------------------- + // Verify optimized plan + // ----------------------------------------------------------------------------------------- + + OptimizedPlan optimizedPlan = compileNoStats(env.createProgramPlan()); + + SinkPlanNode sinkNode = optimizedPlan.getDataSinks().iterator().next(); + + DualInputPlanNode joinNode = (DualInputPlanNode) sinkNode.getPredecessor(); + + // Verify that the compiler correctly sets the expected data exchange modes. + for (Channel channel : joinNode.getInputs()) { + assertEquals("Unexpected data exchange mode between union and join node.", + unionToJoin, channel.getDataExchangeMode()); + } + + for (SourcePlanNode src : optimizedPlan.getDataSources()) { + for (Channel channel : src.getOutgoingChannels()) { + assertEquals("Unexpected data exchange mode between source and union node.", + sourceToUnion, channel.getDataExchangeMode()); + } + } + + // ----------------------------------------------------------------------------------------- + // Verify generated JobGraph + // ----------------------------------------------------------------------------------------- + + JobGraphGenerator jgg = new JobGraphGenerator(); + JobGraph jobGraph = jgg.compileJobGraph(optimizedPlan); + + List vertices = jobGraph.getVerticesSortedTopologicallyFromSources(); + + // Sanity check for the test setup + assertEquals("Unexpected number of vertices created.", 4, vertices.size()); + + // Verify all sources + JobVertex[] sources = new JobVertex[]{vertices.get(0), vertices.get(1)}; + + for (JobVertex src : sources) { + // Sanity check + assertTrue("Unexpected vertex type. Test setup is broken.", src.isInputVertex()); + + // The union is not translated to an extra union task, but the join uses a union + // input gate to read multiple inputs. The source create a single result per consumer. + assertEquals("Unexpected number of created results.", 2, + src.getNumberOfProducedIntermediateDataSets()); + + for (IntermediateDataSet dataSet : src.getProducedDataSets()) { + ResultPartitionType dsType = dataSet.getResultType(); + + // The result type is determined by the channel between the union and the join node + // and *not* the channel between source and union. + if (unionToJoin.equals(BATCH)) { + assertTrue("Expected batch exchange, but result type is " + dsType + ".", + dsType.isBlocking()); + } else { + assertFalse("Expected non-batch exchange, but result type is " + dsType + ".", + dsType.isBlocking()); + } + } + } + } + +}