提交 ac80458f 编写于 作者: S Stephan Ewen

[FLINK-1315] [optimizer] Fix bug in branch tracking logic.

上级 9d02f2a4
......@@ -142,17 +142,17 @@ public class BulkIterationPlanNode extends SingleInputPlanNode implements Iterat
}
private void mergeBranchPlanMaps() {
for(OptimizerNode.UnclosedBranchDescriptor desc: template.getOpenBranches()){
for (OptimizerNode.UnclosedBranchDescriptor desc: template.getOpenBranches()) {
OptimizerNode brancher = desc.getBranchingNode();
if(branchPlan == null) {
if (branchPlan == null) {
branchPlan = new HashMap<OptimizerNode, PlanNode>(6);
}
if(!branchPlan.containsKey(brancher)){
if (!branchPlan.containsKey(brancher)) {
PlanNode selectedCandidate = null;
if(rootOfStepFunction.branchPlan != null){
if (rootOfStepFunction.branchPlan != null) {
selectedCandidate = rootOfStepFunction.branchPlan.get(brancher);
}
......
......@@ -25,6 +25,7 @@ import static org.apache.flink.compiler.plan.PlanNode.SourceAndDamReport.NOT_FOU
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import org.apache.flink.api.common.operators.util.FieldList;
......@@ -84,12 +85,14 @@ public class SingleInputPlanNode extends PlanNode {
}
final PlanNode predNode = input.getSource();
if (this.branchPlan == null) {
this.branchPlan = predNode.branchPlan;
} else if (predNode.branchPlan != null) {
if (predNode.branchPlan != null && !predNode.branchPlan.isEmpty()) {
if (this.branchPlan == null) {
this.branchPlan = new HashMap<OptimizerNode, PlanNode>();
}
this.branchPlan.putAll(predNode.branchPlan);
}
}
// --------------------------------------------------------------------------------------------
......
......@@ -200,11 +200,9 @@ public class WorksetIterationPlanNode extends DualInputPlanNode implements Itera
* because they can contain also some of the branching nodes.
*/
@Override
protected void mergeBranchPlanMaps(Map<OptimizerNode, PlanNode> branchPlan1, Map<OptimizerNode,
PlanNode> branchPlan2){
}
protected void mergeBranchPlanMaps(Map<OptimizerNode, PlanNode> branchPlan1, Map<OptimizerNode,PlanNode> branchPlan2) {}
protected void mergeBranchPlanMaps() {
Map<OptimizerNode, PlanNode> branchPlan1 = input1.getSource().branchPlan;
Map<OptimizerNode, PlanNode> branchPlan2 = input2.getSource().branchPlan;
......
......@@ -19,6 +19,8 @@
package org.apache.flink.compiler;
import static org.junit.Assert.*;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
......@@ -30,7 +32,9 @@ import org.apache.flink.api.common.Plan;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichJoinFunction;
import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
import org.apache.flink.api.java.record.operators.BulkIteration;
import org.apache.flink.api.java.record.operators.CoGroupOperator;
import org.apache.flink.api.java.record.operators.CrossOperator;
......@@ -40,9 +44,11 @@ import org.apache.flink.api.java.record.operators.FileDataSource;
import org.apache.flink.api.java.record.operators.JoinOperator;
import org.apache.flink.api.java.record.operators.MapOperator;
import org.apache.flink.api.java.record.operators.ReduceOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.compiler.plan.OptimizedPlan;
import org.apache.flink.compiler.plan.SinkPlanNode;
import org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator;
import org.apache.flink.compiler.testfunctions.DummyFlatJoinFunction;
import org.apache.flink.compiler.testfunctions.IdentityGroupReducer;
import org.apache.flink.compiler.testfunctions.IdentityKeyExtractor;
import org.apache.flink.compiler.testfunctions.IdentityMapper;
......@@ -1017,4 +1023,73 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
Assert.fail(e.getMessage());
}
}
@Test
public void testBranchesOnlyInBCVariables1() {
try{
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Long> input = env.generateSequence(1, 10);
DataSet<Long> bc_input = env.generateSequence(1, 10);
input
.map(new IdentityMapper<Long>()).withBroadcastSet(bc_input, "name1")
.map(new IdentityMapper<Long>()).withBroadcastSet(bc_input, "name2")
.print();
Plan plan = env.createProgramPlan();
compileNoStats(plan);
}
catch(Exception e){
e.printStackTrace();
fail(e.getMessage());
}
}
@Test
public void testBranchesOnlyInBCVariables2() {
try{
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<Long, Long>> input = env.generateSequence(1, 10).map(new Duplicator<Long>()).name("proper input");
DataSet<Long> bc_input1 = env.generateSequence(1, 10).name("BC input 1");
DataSet<Long> bc_input2 = env.generateSequence(1, 10).name("BC input 1");
DataSet<Tuple2<Long, Long>> joinInput1 =
input.map(new IdentityMapper<Tuple2<Long,Long>>())
.withBroadcastSet(bc_input1.map(new IdentityMapper<Long>()), "bc1")
.withBroadcastSet(bc_input2, "bc2");
DataSet<Tuple2<Long, Long>> joinInput2 =
input.map(new IdentityMapper<Tuple2<Long,Long>>())
.withBroadcastSet(bc_input1, "bc1")
.withBroadcastSet(bc_input2, "bc2");
DataSet<Tuple2<Long, Long>> joinResult = joinInput1
.join(joinInput2, JoinHint.REPARTITION_HASH_FIRST).where(0).equalTo(1)
.with(new DummyFlatJoinFunction<Tuple2<Long,Long>>());
input
.map(new IdentityMapper<Tuple2<Long,Long>>())
.withBroadcastSet(bc_input1, "bc1")
.union(joinResult)
.print();
Plan plan = env.createProgramPlan();
compileNoStats(plan);
}
catch(Exception e){
e.printStackTrace();
fail(e.getMessage());
}
}
private static final class Duplicator<T> implements MapFunction<T, Tuple2<T, T>> {
@Override
public Tuple2<T, T> map(T value) {
return new Tuple2<T, T>(value, value);
}
}
}
......@@ -299,7 +299,7 @@ public class IterationsCompilerTest extends CompilerTestBase {
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
new NepheleJobGraphGenerator().compileJobGraph(op);
}
catch (Exception e) {
e.printStackTrace();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册