diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/plan/BulkIterationPlanNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/plan/BulkIterationPlanNode.java index 7fd8993e00f9a04ac2cca81d123acf4ff89fe9f9..3f413b55a81596ee1ab0a0c61789831b6d8786af 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/plan/BulkIterationPlanNode.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/plan/BulkIterationPlanNode.java @@ -142,17 +142,17 @@ public class BulkIterationPlanNode extends SingleInputPlanNode implements Iterat } private void mergeBranchPlanMaps() { - for(OptimizerNode.UnclosedBranchDescriptor desc: template.getOpenBranches()){ + for (OptimizerNode.UnclosedBranchDescriptor desc: template.getOpenBranches()) { OptimizerNode brancher = desc.getBranchingNode(); - if(branchPlan == null) { + if (branchPlan == null) { branchPlan = new HashMap(6); } - if(!branchPlan.containsKey(brancher)){ + if (!branchPlan.containsKey(brancher)) { PlanNode selectedCandidate = null; - if(rootOfStepFunction.branchPlan != null){ + if (rootOfStepFunction.branchPlan != null) { selectedCandidate = rootOfStepFunction.branchPlan.get(brancher); } diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/plan/SingleInputPlanNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/plan/SingleInputPlanNode.java index 9b9202bba61112f05e975019a26c39348c474490..d77d82ef1a706a430c46ab5965c7a1c6553ebc1b 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/plan/SingleInputPlanNode.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/plan/SingleInputPlanNode.java @@ -25,6 +25,7 @@ import static org.apache.flink.compiler.plan.PlanNode.SourceAndDamReport.NOT_FOU import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; import org.apache.flink.api.common.operators.util.FieldList; @@ -84,12 +85,14 @@ public class SingleInputPlanNode extends PlanNode { } final PlanNode predNode = input.getSource(); - if (this.branchPlan == null) { - this.branchPlan = predNode.branchPlan; - } else if (predNode.branchPlan != null) { + + if (predNode.branchPlan != null && !predNode.branchPlan.isEmpty()) { + + if (this.branchPlan == null) { + this.branchPlan = new HashMap(); + } this.branchPlan.putAll(predNode.branchPlan); } - } // -------------------------------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/plan/WorksetIterationPlanNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/plan/WorksetIterationPlanNode.java index 8d38814ff485fdac77f799cf56df8353085c4e39..47e9b69879de7cc5d42549e9a4595cb1df9117c1 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/plan/WorksetIterationPlanNode.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/plan/WorksetIterationPlanNode.java @@ -200,11 +200,9 @@ public class WorksetIterationPlanNode extends DualInputPlanNode implements Itera * because they can contain also some of the branching nodes. */ @Override - protected void mergeBranchPlanMaps(Map branchPlan1, Map branchPlan2){ - - } + protected void mergeBranchPlanMaps(Map branchPlan1, Map branchPlan2) {} + protected void mergeBranchPlanMaps() { Map branchPlan1 = input1.getSource().branchPlan; Map branchPlan2 = input2.getSource().branchPlan; diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java index 2b4ff021bcfcd2a593196f5fe9f902ef81af39a8..ae185f9e5803814b1d2094074d6311ec7e6168b5 100644 --- a/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java +++ b/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java @@ -19,6 +19,8 @@ package org.apache.flink.compiler; +import static org.junit.Assert.*; + import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -30,7 +32,9 @@ import org.apache.flink.api.common.Plan; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.IterativeDataSet; +import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.RichJoinFunction; +import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint; import org.apache.flink.api.java.record.operators.BulkIteration; import org.apache.flink.api.java.record.operators.CoGroupOperator; import org.apache.flink.api.java.record.operators.CrossOperator; @@ -40,9 +44,11 @@ import org.apache.flink.api.java.record.operators.FileDataSource; import org.apache.flink.api.java.record.operators.JoinOperator; import org.apache.flink.api.java.record.operators.MapOperator; import org.apache.flink.api.java.record.operators.ReduceOperator; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.compiler.plan.OptimizedPlan; import org.apache.flink.compiler.plan.SinkPlanNode; import org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator; +import org.apache.flink.compiler.testfunctions.DummyFlatJoinFunction; import org.apache.flink.compiler.testfunctions.IdentityGroupReducer; import org.apache.flink.compiler.testfunctions.IdentityKeyExtractor; import org.apache.flink.compiler.testfunctions.IdentityMapper; @@ -1017,4 +1023,73 @@ public class BranchingPlansCompilerTest extends CompilerTestBase { Assert.fail(e.getMessage()); } } + + @Test + public void testBranchesOnlyInBCVariables1() { + try{ + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet input = env.generateSequence(1, 10); + DataSet bc_input = env.generateSequence(1, 10); + + input + .map(new IdentityMapper()).withBroadcastSet(bc_input, "name1") + .map(new IdentityMapper()).withBroadcastSet(bc_input, "name2") + .print(); + + Plan plan = env.createProgramPlan(); + compileNoStats(plan); + } + catch(Exception e){ + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testBranchesOnlyInBCVariables2() { + try{ + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet> input = env.generateSequence(1, 10).map(new Duplicator()).name("proper input"); + + DataSet bc_input1 = env.generateSequence(1, 10).name("BC input 1"); + DataSet bc_input2 = env.generateSequence(1, 10).name("BC input 1"); + + DataSet> joinInput1 = + input.map(new IdentityMapper>()) + .withBroadcastSet(bc_input1.map(new IdentityMapper()), "bc1") + .withBroadcastSet(bc_input2, "bc2"); + + DataSet> joinInput2 = + input.map(new IdentityMapper>()) + .withBroadcastSet(bc_input1, "bc1") + .withBroadcastSet(bc_input2, "bc2"); + + DataSet> joinResult = joinInput1 + .join(joinInput2, JoinHint.REPARTITION_HASH_FIRST).where(0).equalTo(1) + .with(new DummyFlatJoinFunction>()); + + input + .map(new IdentityMapper>()) + .withBroadcastSet(bc_input1, "bc1") + .union(joinResult) + .print(); + + Plan plan = env.createProgramPlan(); + compileNoStats(plan); + } + catch(Exception e){ + e.printStackTrace(); + fail(e.getMessage()); + } + } + + private static final class Duplicator implements MapFunction> { + + @Override + public Tuple2 map(T value) { + return new Tuple2(value, value); + } + } } diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java index 9ceb25c973812bae51280781e0a8e90a4cd63105..23b7cfeeda380aefed215d72739944090192a584 100644 --- a/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java +++ b/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java @@ -299,7 +299,7 @@ public class IterationsCompilerTest extends CompilerTestBase { Plan p = env.createProgramPlan(); OptimizedPlan op = compileNoStats(p); - + new NepheleJobGraphGenerator().compileJobGraph(op); } catch (Exception e) { e.printStackTrace();