提交 48a703fd 编写于 作者: M Markus Holzemer 提交者: Stephan Ewen

[FLINK-836] Integration of the CachedBuildSideMatchDriver into the optimizer

上级 7b6295d9
......@@ -42,13 +42,15 @@ public abstract class CostEstimator {
public abstract void addLocalSortCost(EstimateProvider estimates, long memorySize, Costs costs);
public abstract void addLocalMergeCost(EstimateProvider estimates1, EstimateProvider estimates2, long memorySize, Costs costs);
public abstract void addLocalMergeCost(EstimateProvider estimates1, EstimateProvider estimates2, long memorySize, Costs costs, int costWeight);
public abstract void addHybridHashCosts(EstimateProvider buildSide, EstimateProvider probeSide, long memorySize, Costs costs);
public abstract void addHybridHashCosts(EstimateProvider buildSide, EstimateProvider probeSide, long memorySize, Costs costs, int costWeight);
public abstract void addCachedHybridHashCosts(EstimateProvider buildSide, EstimateProvider probeSide, long memorySize, Costs costs, int costWeight);
public abstract void addStreamedNestedLoopsCosts(EstimateProvider outerSide, EstimateProvider innerSide, long bufferSize, Costs costs);
public abstract void addStreamedNestedLoopsCosts(EstimateProvider outerSide, EstimateProvider innerSide, long bufferSize, Costs costs, int costWeight);
public abstract void addBlockNestedLoopsCosts(EstimateProvider outerSide, EstimateProvider innerSide, long blockSize, Costs costs);
public abstract void addBlockNestedLoopsCosts(EstimateProvider outerSide, EstimateProvider innerSide, long blockSize, Costs costs, int costWeight);
// ------------------------------------------------------------------------
......@@ -135,6 +137,12 @@ public abstract class CostEstimator {
Channel firstInput = null;
Channel secondInput = null;
Costs driverCosts = new Costs();
int costWeight = 1;
// adjust with the cost weight factor
if (n.isOnDynamicPath()) {
costWeight = n.getCostWeight();
}
// get the inputs, if we have some
{
......@@ -173,35 +181,36 @@ public abstract class CostEstimator {
break;
case MERGE:
addLocalMergeCost(firstInput, secondInput, availableMemory, driverCosts);
addLocalMergeCost(firstInput, secondInput, availableMemory, driverCosts, costWeight);
break;
case HYBRIDHASH_BUILD_FIRST:
addHybridHashCosts(firstInput, secondInput, availableMemory, driverCosts);
addHybridHashCosts(firstInput, secondInput, availableMemory, driverCosts, costWeight);
break;
case HYBRIDHASH_BUILD_SECOND:
addHybridHashCosts(secondInput, firstInput, availableMemory, driverCosts);
addHybridHashCosts(secondInput, firstInput, availableMemory, driverCosts, costWeight);
break;
case HYBRIDHASH_BUILD_FIRST_CACHED:
addCachedHybridHashCosts(firstInput, secondInput, availableMemory, driverCosts, costWeight);
break;
case HYBRIDHASH_BUILD_SECOND_CACHED:
addCachedHybridHashCosts(secondInput, firstInput, availableMemory, driverCosts, costWeight);
break;
case NESTEDLOOP_BLOCKED_OUTER_FIRST:
addBlockNestedLoopsCosts(firstInput, secondInput, availableMemory, driverCosts);
addBlockNestedLoopsCosts(firstInput, secondInput, availableMemory, driverCosts, costWeight);
break;
case NESTEDLOOP_BLOCKED_OUTER_SECOND:
addBlockNestedLoopsCosts(secondInput, firstInput, availableMemory, driverCosts);
addBlockNestedLoopsCosts(secondInput, firstInput, availableMemory, driverCosts, costWeight);
break;
case NESTEDLOOP_STREAMED_OUTER_FIRST:
addStreamedNestedLoopsCosts(firstInput, secondInput, availableMemory, driverCosts);
addStreamedNestedLoopsCosts(firstInput, secondInput, availableMemory, driverCosts, costWeight);
break;
case NESTEDLOOP_STREAMED_OUTER_SECOND:
addStreamedNestedLoopsCosts(secondInput, firstInput, availableMemory, driverCosts);
addStreamedNestedLoopsCosts(secondInput, firstInput, availableMemory, driverCosts, costWeight);
break;
default:
throw new CompilerException("Unknown local strategy: " + n.getDriverStrategy().name());
}
// adjust with the cost weight factor
if (n.isOnDynamicPath()) {
driverCosts.multiplyWith(n.getCostWeight());
}
totalCosts.addCosts(driverCosts);
n.setCosts(totalCosts);
}
......
......@@ -141,13 +141,13 @@ public class DefaultCostEstimator extends CostEstimator {
}
@Override
public void addLocalMergeCost(EstimateProvider input1, EstimateProvider input2, long availableMemory, Costs costs) {
public void addLocalMergeCost(EstimateProvider input1, EstimateProvider input2, long availableMemory, Costs costs, int costWeight) {
// costs nothing. the very rarely incurred cost for a spilling block nested loops join in the
// presence of massively re-occurring duplicate keys is ignored, because not accessible.
}
@Override
public void addHybridHashCosts(EstimateProvider buildSideInput, EstimateProvider probeSideInput, long availableMemory, Costs costs) {
public void addHybridHashCosts(EstimateProvider buildSideInput, EstimateProvider probeSideInput, long availableMemory, Costs costs, int costWeight) {
long bs = buildSideInput.getEstimatedOutputSize();
long ps = probeSideInput.getEstimatedOutputSize();
......@@ -160,10 +160,32 @@ public class DefaultCostEstimator extends CostEstimator {
}
costs.addHeuristicDiskCost(2 * HEURISTIC_COST_BASE);
costs.addHeuristicCpuCost((long) (HEURISTIC_COST_BASE * HASHING_CPU_FACTOR));
costs.multiplyWith(costWeight);
}
/**
* Calculates the costs for the cached variant of the hybrid hash join.
* We are assuming by default that half of the cached hash table fit into memory.
*/
@Override
public void addCachedHybridHashCosts(EstimateProvider buildSideInput, EstimateProvider probeSideInput, long availableMemory, Costs costs, int costWeight) {
long bs = buildSideInput.getEstimatedOutputSize();
long ps = probeSideInput.getEstimatedOutputSize();
if (bs > 0 && ps > 0) {
long overallSize = 2*bs + ps;
costs.addDiskCost(overallSize / 2 + (overallSize / 2) * costWeight);
costs.addCpuCost((long) ((2*bs + ps) * HASHING_CPU_FACTOR));
} else {
costs.setDiskCost(Costs.UNKNOWN);
costs.setCpuCost(Costs.UNKNOWN);
}
costs.addHeuristicDiskCost(2 * HEURISTIC_COST_BASE * costWeight);
costs.addHeuristicCpuCost((long) (HEURISTIC_COST_BASE * HASHING_CPU_FACTOR * costWeight));
}
@Override
public void addStreamedNestedLoopsCosts(EstimateProvider outerSide, EstimateProvider innerSide, long bufferSize, Costs costs) {
public void addStreamedNestedLoopsCosts(EstimateProvider outerSide, EstimateProvider innerSide, long bufferSize, Costs costs, int costWeight) {
long is = innerSide.getEstimatedOutputSize();
long oc = outerSide.getEstimatedNumRecords();
......@@ -181,10 +203,11 @@ public class DefaultCostEstimator extends CostEstimator {
// hack: assume 100k loops (should be expensive enough)
costs.addHeuristicDiskCost(HEURISTIC_COST_BASE * 100000);
costs.addHeuristicCpuCost((long) (HEURISTIC_COST_BASE * 100000 * MATERIALIZATION_CPU_FACTOR));
costs.multiplyWith(costWeight);
}
@Override
public void addBlockNestedLoopsCosts(EstimateProvider outerSide, EstimateProvider innerSide, long blockSize, Costs costs) {
public void addBlockNestedLoopsCosts(EstimateProvider outerSide, EstimateProvider innerSide, long blockSize, Costs costs, int costWeight) {
long is = innerSide.getEstimatedOutputSize();
long os = outerSide.getEstimatedOutputSize();
......@@ -200,6 +223,7 @@ public class DefaultCostEstimator extends CostEstimator {
// hack: assume 1k loops (much cheaper than the streamed variant!)
costs.addHeuristicDiskCost(HEURISTIC_COST_BASE * 1000);
costs.addHeuristicCpuCost((long) (HEURISTIC_COST_BASE * 1000 * MATERIALIZATION_CPU_FACTOR));
costs.multiplyWith(costWeight);
}
// --------------------------------------------------------------------------------------------
......
......@@ -92,6 +92,8 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat
protected boolean onDynamicPath;
protected boolean insideIteration;
protected List<PlanNode> cachedPlans; // cache candidates, because the may be accessed repeatedly
protected int[][] remappedKeys;
......@@ -499,6 +501,14 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat
}
}
public boolean isInsideIteration() {
return insideIteration;
}
public void setInsideIteration(boolean insideIteration) {
this.insideIteration = insideIteration;
}
/**
* Checks whether this node has branching output. A node's output is branched, if it has more
* than one output connection.
......
......@@ -54,7 +54,15 @@ public class HashJoinBuildFirstProperties extends AbstractJoinDescriptor {
@Override
public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) {
return new DualInputPlanNode(node, "Join("+node.getPactContract().getName()+")", in1, in2, DriverStrategy.HYBRIDHASH_BUILD_FIRST, this.keys1, this.keys2);
DriverStrategy strategy;
if(!in1.isOnDynamicPath() && in1.isInsideIteration() && in2.isInsideIteration()) {
strategy = DriverStrategy.HYBRIDHASH_BUILD_FIRST_CACHED;
}
else {
strategy = DriverStrategy.HYBRIDHASH_BUILD_FIRST;
}
return new DualInputPlanNode(node, "Join("+node.getPactContract().getName()+")", in1, in2, strategy, this.keys1, this.keys2);
}
@Override
......
......@@ -51,7 +51,15 @@ public final class HashJoinBuildSecondProperties extends AbstractJoinDescriptor
@Override
public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) {
return new DualInputPlanNode(node, "Join("+node.getPactContract().getName()+")", in1, in2, DriverStrategy.HYBRIDHASH_BUILD_SECOND, this.keys1, this.keys2);
DriverStrategy strategy;
if(!in2.isOnDynamicPath() && in1.isInsideIteration() && in2.isInsideIteration()) {
strategy = DriverStrategy.HYBRIDHASH_BUILD_SECOND_CACHED;
}
else {
strategy = DriverStrategy.HYBRIDHASH_BUILD_SECOND;
}
return new DualInputPlanNode(node, "Join("+node.getPactContract().getName()+")", in1, in2, strategy, this.keys1, this.keys2);
}
@Override
......
......@@ -306,6 +306,10 @@ public class Channel implements EstimateProvider, Cloneable, DumpableConnection<
return this.source.isOnDynamicPath();
}
public boolean isInsideIteration() {
return this.source.isInsideIteration();
}
public int getCostWeight() {
return this.source.getCostWeight();
}
......
......@@ -423,6 +423,10 @@ public abstract class PlanNode implements Visitable<PlanNode>, DumpableNode<Plan
return this.template.getCostWeight();
}
public boolean isInsideIteration() {
return this.template.isInsideIteration();
}
// --------------------------------------------------------------------------------------------
/**
......
......@@ -67,6 +67,10 @@ public enum DriverStrategy {
HYBRIDHASH_BUILD_FIRST(MatchDriver.class, null, FULL_DAM, MATERIALIZING, true),
// the second input is build side, the first side is probe side of a hybrid hash table
HYBRIDHASH_BUILD_SECOND(MatchDriver.class, null, MATERIALIZING, FULL_DAM, true),
// a cached variant of HYBRIDHASH_BUILD_FIRST, that can only be used inside of iterations
HYBRIDHASH_BUILD_FIRST_CACHED(BuildFirstCachedMatchDriver.class, null, FULL_DAM, MATERIALIZING, true),
// cached variant of HYBRIDHASH_BUILD_SECOND, that can only be used inside of iterations
HYBRIDHASH_BUILD_SECOND_CACHED(BuildSecondCachedMatchDriver.class, null, MATERIALIZING, FULL_DAM, true),
// the second input is inner loop, the first input is outer loop and block-wise processed
NESTEDLOOP_BLOCKED_OUTER_FIRST(CrossDriver.class, null, MATERIALIZING, MATERIALIZING, false),
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册