提交 52f9cf63 编写于 作者: S Stephan Ewen

No explicit caching when hash table is cached

Add tests for explicit cache removal when hash table is cached
上级 a9b1daa1
......@@ -63,4 +63,15 @@ public enum TempMode {
return this;
}
}
public TempMode makeNonCached() {
if (this == CACHED) {
return NONE;
} else if (this == CACHING_PIPELINE_BREAKER) {
return PIPELINE_BREAKER;
} else {
return this;
}
}
}
......@@ -17,6 +17,7 @@ import java.util.Collections;
import java.util.List;
import eu.stratosphere.api.common.operators.util.FieldList;
import eu.stratosphere.compiler.CompilerException;
import eu.stratosphere.compiler.dag.TwoInputNode;
import eu.stratosphere.compiler.dataproperties.LocalProperties;
import eu.stratosphere.compiler.dataproperties.RequestedLocalProperties;
......@@ -57,6 +58,11 @@ public class HashJoinBuildFirstProperties extends AbstractJoinDescriptor {
DriverStrategy strategy;
if(!in1.isOnDynamicPath() && in2.isOnDynamicPath()) {
// sanity check that the first input is cached and remove that cache
if (!in1.getTempMode().isCached()) {
throw new CompilerException("No cache at point where static and dynamic parts meet.");
}
in1.setTempMode(in1.getTempMode().makeNonCached());
strategy = DriverStrategy.HYBRIDHASH_BUILD_FIRST_CACHED;
}
else {
......
......@@ -17,6 +17,7 @@ import java.util.Collections;
import java.util.List;
import eu.stratosphere.api.common.operators.util.FieldList;
import eu.stratosphere.compiler.CompilerException;
import eu.stratosphere.compiler.dag.TwoInputNode;
import eu.stratosphere.compiler.dataproperties.LocalProperties;
import eu.stratosphere.compiler.dataproperties.RequestedLocalProperties;
......@@ -53,7 +54,13 @@ public final class HashJoinBuildSecondProperties extends AbstractJoinDescriptor
public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) {
DriverStrategy strategy;
if(!in2.isOnDynamicPath() && in1.isOnDynamicPath()) {
if (!in2.isOnDynamicPath() && in1.isOnDynamicPath()) {
// sanity check that the first input is cached and remove that cache
if (!in2.getTempMode().isCached()) {
throw new CompilerException("No cache at point where static and dynamic parts meet.");
}
in2.setTempMode(in2.getTempMode().makeNonCached());
strategy = DriverStrategy.HYBRIDHASH_BUILD_SECOND_CACHED;
}
else {
......
......@@ -28,6 +28,7 @@ import eu.stratosphere.api.java.IterativeDataSet;
import eu.stratosphere.api.java.functions.JoinFunction;
import eu.stratosphere.api.java.tuple.Tuple3;
import eu.stratosphere.compiler.PactCompiler;
import eu.stratosphere.compiler.dag.TempMode;
import eu.stratosphere.compiler.plan.DualInputPlanNode;
import eu.stratosphere.compiler.plan.OptimizedPlan;
import eu.stratosphere.compiler.plantranslate.NepheleJobGraphGenerator;
......@@ -57,6 +58,8 @@ public class CachedMatchStrategyCompilerTest extends CompilerTestBase {
// verify correct join strategy
assertEquals(DriverStrategy.HYBRIDHASH_BUILD_SECOND_CACHED, innerJoin.getDriverStrategy());
assertEquals(TempMode.NONE, innerJoin.getInput1().getTempMode());
assertEquals(TempMode.NONE, innerJoin.getInput2().getTempMode());
new NepheleJobGraphGenerator().compileJobGraph(oPlan);
}
......@@ -83,6 +86,8 @@ public class CachedMatchStrategyCompilerTest extends CompilerTestBase {
// verify correct join strategy
assertEquals(DriverStrategy.HYBRIDHASH_BUILD_FIRST, innerJoin.getDriverStrategy());
assertEquals(TempMode.NONE, innerJoin.getInput1().getTempMode());
assertEquals(TempMode.CACHED, innerJoin.getInput2().getTempMode());
new NepheleJobGraphGenerator().compileJobGraph(oPlan);
}
......@@ -109,7 +114,9 @@ public class CachedMatchStrategyCompilerTest extends CompilerTestBase {
DualInputPlanNode innerJoin = resolver.getNode("DummyJoiner");
// verify correct join strategy
assertEquals(DriverStrategy.HYBRIDHASH_BUILD_FIRST_CACHED, innerJoin.getDriverStrategy());
assertEquals(DriverStrategy.HYBRIDHASH_BUILD_FIRST_CACHED, innerJoin.getDriverStrategy());
assertEquals(TempMode.NONE, innerJoin.getInput1().getTempMode());
assertEquals(TempMode.NONE, innerJoin.getInput2().getTempMode());
new NepheleJobGraphGenerator().compileJobGraph(oPlan);
}
......@@ -135,7 +142,9 @@ public class CachedMatchStrategyCompilerTest extends CompilerTestBase {
DualInputPlanNode innerJoin = resolver.getNode("DummyJoiner");
// verify correct join strategy
assertEquals(DriverStrategy.HYBRIDHASH_BUILD_SECOND, innerJoin.getDriverStrategy());
assertEquals(DriverStrategy.HYBRIDHASH_BUILD_SECOND, innerJoin.getDriverStrategy());
assertEquals(TempMode.CACHED, innerJoin.getInput1().getTempMode());
assertEquals(TempMode.NONE, innerJoin.getInput2().getTempMode());
new NepheleJobGraphGenerator().compileJobGraph(oPlan);
}
......@@ -169,13 +178,16 @@ public class CachedMatchStrategyCompilerTest extends CompilerTestBase {
}
}
OptimizedPlan oPlan = compileNoStats(plan);
OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(oPlan);
DualInputPlanNode innerJoin = resolver.getNode("DummyJoiner");
// verify correct join strategy
assertEquals(DriverStrategy.HYBRIDHASH_BUILD_SECOND_CACHED, innerJoin.getDriverStrategy());
assertEquals(DriverStrategy.HYBRIDHASH_BUILD_SECOND_CACHED, innerJoin.getDriverStrategy());
assertEquals(TempMode.NONE, innerJoin.getInput1().getTempMode());
assertEquals(TempMode.NONE, innerJoin.getInput2().getTempMode());
new NepheleJobGraphGenerator().compileJobGraph(oPlan);
}
......@@ -197,16 +209,14 @@ public class CachedMatchStrategyCompilerTest extends CompilerTestBase {
IterativeDataSet<Tuple3<Long, Long, Long>> iteration = bigInput.iterate(10);
DataSet<Tuple3<Long, Long, Long>> inner;
Configuration joinStrategy = new Configuration();
joinStrategy.setString(PactCompiler.HINT_SHIP_STRATEGY, PactCompiler.HINT_SHIP_STRATEGY_REPARTITION_HASH);
if(strategy != "") {
Configuration joinStrategy = new Configuration();
joinStrategy.setString(PactCompiler.HINT_LOCAL_STRATEGY, strategy);
inner = iteration.join(smallInput).where(0).equalTo(0).with(new DummyJoiner()).name("DummyJoiner").withParameters(joinStrategy);
}
else {
inner = iteration.join(smallInput).where(0).equalTo(0).with(new DummyJoiner()).name("DummyJoiner");
}
DataSet<Tuple3<Long, Long, Long>> inner = iteration.join(smallInput).where(0).equalTo(0).with(new DummyJoiner()).name("DummyJoiner").withParameters(joinStrategy);
DataSet<Tuple3<Long, Long, Long>> output = iteration.closeWith(inner);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册