提交 0d82ed29 编写于 作者: S StephanEwen

Changed branch handling logic to correctly recognize disjoint data flow...

Changed branch handling logic to correctly recognize disjoint data flow graphs, rather then erroring out.
上级 2d44c7d1
......@@ -1138,34 +1138,37 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat
}
this.closedBranchingNodes.add(alreadyClosed);
}
/*
* node IDs are assigned in graph-traversal order (pre-order)
* hence, each list is sorted by ID in ascending order and all consecutive lists start with IDs in ascending order
/**
* The node IDs are assigned in graph-traversal order (pre-order), hence, each list is sorted by ID in ascending order and
* all consecutive lists start with IDs in ascending order.
*/
protected List<UnclosedBranchDescriptor> mergeLists(List<UnclosedBranchDescriptor> child1open, List<UnclosedBranchDescriptor> child2open) {
protected final boolean mergeLists(List<UnclosedBranchDescriptor> child1open, List<UnclosedBranchDescriptor> child2open, List<UnclosedBranchDescriptor> result) {
//remove branches which have already been closed
removeClosedBranches(child1open);
removeClosedBranches(child2open);
result.clear();
// check how many open branches we have. the cases:
// 1) if both are null or empty, the result is null
// 2) if one side is null (or empty), the result is the other side.
// 3) both are set, then we need to merge.
if (child1open == null || child1open.isEmpty()) {
return child2open;
result.addAll(child2open);
return false;
}
if (child2open == null || child2open.isEmpty()) {
return child1open;
result.addAll(child1open);
return false;
}
// both have a history. merge...
ArrayList<UnclosedBranchDescriptor> result = new ArrayList<UnclosedBranchDescriptor>(4);
int index1 = child1open.size() - 1;
int index2 = child2open.size() - 1;
boolean didCloseABranch = false;
// as both lists (child1open and child2open) are sorted in ascending ID order
// we can do a merge-join-like loop which preserved the order in the result list
......@@ -1185,6 +1188,8 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat
// match: they share a common branching child
if (id1 == id2) {
didCloseABranch = true;
// if this is the latest common child, remember it
OptimizerNode currBanchingNode = child1open.get(index1).getBranchingNode();
......@@ -1223,7 +1228,7 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat
// merged. now we need to reverse the list, because we added the elements in reverse order
Collections.reverse(result);
return result.isEmpty() ? null : result;
return didCloseABranch;
}
/**
......
......@@ -87,7 +87,16 @@ public class SinkJoiner extends TwoInputNode {
// copy the lists and merge
List<UnclosedBranchDescriptor> result1 = new ArrayList<UnclosedBranchDescriptor>(pred1branches);
List<UnclosedBranchDescriptor> result2 = new ArrayList<UnclosedBranchDescriptor>(pred2branches);
this.openBranches = mergeLists(result1, result2);
ArrayList<UnclosedBranchDescriptor> result = new ArrayList<UnclosedBranchDescriptor>();
boolean didCloseSomeBranch = mergeLists(result1, result2, result);
if (!didCloseSomeBranch) {
// if the sink joiners do not close branches, then we have disjoint data flows.
throw new CompilerException("The given Pact program contains multiple disconnected data flows.");
}
this.openBranches = result.isEmpty() ? Collections.<UnclosedBranchDescriptor>emptyList() : result;
}
/* (non-Javadoc)
......
......@@ -16,6 +16,7 @@
package eu.stratosphere.pact.compiler.plan;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
......@@ -654,7 +655,9 @@ public abstract class TwoInputNode extends OptimizerNode {
List<UnclosedBranchDescriptor> result1 = getFirstPredecessorNode().getBranchesForParent(getFirstIncomingConnection());
List<UnclosedBranchDescriptor> result2 = getSecondPredecessorNode().getBranchesForParent(getSecondIncomingConnection());
this.openBranches = mergeLists(result1, result2);
ArrayList<UnclosedBranchDescriptor> result = new ArrayList<UnclosedBranchDescriptor>();
mergeLists(result1, result2, result);
this.openBranches = result.isEmpty() ? Collections.<UnclosedBranchDescriptor>emptyList() : result;
}
// --------------------------------------------------------------------------------------------
......
......@@ -553,6 +553,7 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
*
* NOTE: this case is currently not caught by the compiler. we should enable the test once it is caught.
*/
@Test
public void testBranchingDisjointPlan() {
// construct the plan
final String out1Path = "file:///test/1";
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册