提交 8a0a3518 编写于 作者: R Robert Metzger

Rename SetId() to setId()

Hopefully fixed a bug in the pact compiler for branching plans:
	- added a test case provided by Anja Kunkel from HU Berlin
	- my understanding of the bug is the following: During a check if additional pipeline breakers are necessary,
	the compiler checks for the LocalStrategy of a SinkJoinerPlan. The SinkJoinerPlan is a "fake" plan element to group multiple output sinks into one sink (the compiler assumes one root node, this is why a "fake" plan node is required).
	The local strategy of SinkJoinerPlan is initialized as NONE, which means that we just pass through the elements (which makes sence since we are talking about a sink) The initialization of a NONE-local strategy assumes only one input, not two.
	I replaced the LocalStrategy with BINARY_NO_OP, which is a NONE for two input.
	The compiler is now able to do its pipline breaker checks.
上级 967a6e2e
......@@ -677,7 +677,7 @@ public class PactCompiler {
while (iter.hasNext()) {
rootNode = new SinkJoiner(rootNode, iter.next());
rootNode.SetId(id++);
rootNode.setId(id++);
}
} else {
throw new CompilerException("Bug: The optimizer plan representation has no sinks.");
......@@ -962,7 +962,7 @@ public class PactCompiler {
if (n.getId() > 0) {
return;
}
n.SetId(this.id);
n.setId(this.id);
// first connect to the predecessors
n.setInputs(this.con2node);
......
......@@ -153,7 +153,7 @@ public abstract class CostEstimator {
// determine the local costs
switch (n.getDriverStrategy()) {
case NONE:
case BINARY_NO_OP:
case MAP:
case ALL_GROUP:
......
......@@ -260,7 +260,7 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat
* @param id
* The id for this node.
*/
public void SetId(int id) {
public void setId(int id) {
this.id = id;
}
......@@ -819,7 +819,7 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat
lastUnion.setSubtasksPerInstance(getSubtasksPerInstance());
//push id down to newly created union node
lastUnion.SetId(this.id);
lastUnion.setId(this.id);
this.id++;
}
......
......@@ -28,7 +28,7 @@ import eu.stratosphere.pact.runtime.task.DriverStrategy;
public class SinkJoinerPlanNode extends DualInputPlanNode {
public SinkJoinerPlanNode(SinkJoiner template, Channel input1, Channel input2) {
super(template, input1, input2, DriverStrategy.NONE);
super(template, input1, input2, DriverStrategy.BINARY_NO_OP);
}
// --------------------------------------------------------------------------------------------
......
......@@ -48,6 +48,74 @@ import eu.stratosphere.pact.compiler.util.IdentityReduce;
/**
*/
public class BranchingPlansCompilerTest extends CompilerTestBase {
/**
*
* <pre>
* (SRC A)
* |
* (MAP A)
* / \
* (MAP B) (MAP C)
* / / \
* (SINK A) (SINK B) (SINK C)
* </pre>
*/
@Test
public void testBranchingWithMultipleDataSinks2() {
try {
// construct the plan
final String out1Path = "file:///test/1";
final String out2Path = "file:///test/2";
final String out3Path = "file:///test/3";
FileDataSource sourceA = new FileDataSource(DummyInputFormat.class, IN_FILE);
MapContract mapA = MapContract.builder(IdentityMap.class).input(sourceA).name("Map A").build();
MapContract mapB = MapContract.builder(IdentityMap.class).input(mapA).name("Map B").build();
MapContract mapC = MapContract.builder(IdentityMap.class).input(mapA).name("Map C").build();
FileDataSink sinkA = new FileDataSink(DummyOutputFormat.class, out1Path, mapB, "Sink A");
FileDataSink sinkB = new FileDataSink(DummyOutputFormat.class, out2Path, mapC, "Sink B");
FileDataSink sinkC = new FileDataSink(DummyOutputFormat.class, out3Path, mapC, "Sink C");
List<GenericDataSink> sinks = new ArrayList<GenericDataSink>();
sinks.add(sinkA);
sinks.add(sinkB);
sinks.add(sinkC);
// return the PACT plan
Plan plan = new Plan(sinks, "Plans With Multiple Data Sinks");
OptimizedPlan oPlan = compileNoStats(plan);
// ---------- check the optimizer plan ----------
// number of sinks
Assert.assertEquals("Wrong number of data sinks.", 3, oPlan.getDataSinks().size());
// sinks contain all sink paths
Set<String> allSinks = new HashSet<String>();
allSinks.add(out1Path);
allSinks.add(out2Path);
allSinks.add(out3Path);
for (SinkPlanNode n : oPlan.getDataSinks()) {
String path = ((FileDataSink) n.getSinkNode().getPactContract()).getFilePath();
Assert.assertTrue("Invalid data sink.", allSinks.remove(path));
}
// ---------- compile plan to nephele job graph to verify that no error is thrown ----------
NepheleJobGraphGenerator jobGen = new NepheleJobGraphGenerator();
jobGen.compileJobGraph(oPlan);
} catch (Exception e) {
e.printStackTrace();
Assert.fail(e.getMessage());
}
}
/**
* <pre>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册