提交 b4135dca 编写于 作者: S sewen

Adjusted cost heuristics / comparisons.

上级 fad6d505
......@@ -35,6 +35,8 @@ package eu.stratosphere.pact.compiler.costs;
*/
public class Costs implements Comparable<Costs>, Cloneable {
public static final long UNKNOWN = -1;
private long networkCost; // network cost, in transferred bytes
private long diskCost; // bytes to be written and read, in bytes
......@@ -322,7 +324,7 @@ public class Costs implements Comparable<Costs>, Cloneable {
this.cpuCost += other.cpuCost;
}
// ---------- relative costs ----------
// ---------- heuristic costs ----------
this.heuristicNetworkCost += other.heuristicNetworkCost;
this.heuristicDiskCost += other.heuristicDiskCost;
......@@ -389,43 +391,31 @@ public class Costs implements Comparable<Costs>, Cloneable {
*/
@Override
public int compareTo(Costs o) {
// check the network cost. first heuristic, then quantifiable
if (this.heuristicNetworkCost < o.heuristicNetworkCost) {
// check the network cost. if we have actual costs on both, use them, otherwise use the heuristic costs.
if (this.networkCost > -1 && o.networkCost > -1) {
return this.networkCost < o.networkCost ? -1 : this.networkCost > o.networkCost ? 1 : 0;
} else if (this.heuristicNetworkCost < o.heuristicNetworkCost) {
return -1;
} else if (this.heuristicNetworkCost > o.heuristicNetworkCost) {
return 1;
} else if (this.networkCost != -1 && (this.networkCost < o.networkCost || o.networkCost == -1)) {
return -1;
} else if (o.networkCost != -1 && (this.networkCost > o.networkCost || this.networkCost == -1)) {
return 1;
} else if (this.networkCost == -1 && o.networkCost == -1) {
// both have unknown network cost (and equal or no heuristic net cost). treat the costs as equal
return 0;
}
// next, check the disk cost. again heuristic before quantifiable
if (this.heuristicDiskCost < o.heuristicDiskCost) {
// next, check the disk cost. again, if we have actual costs on both, use them, otherwise use the heuristic costs.
if (this.diskCost > -1 && o.diskCost > -1) {
return this.diskCost < o.diskCost ? -1 : this.diskCost > o.diskCost ? 1 : 0;
} else if (this.heuristicDiskCost < o.heuristicDiskCost) {
return -1;
} else if (this.heuristicDiskCost > o.heuristicDiskCost) {
return 1;
} else if (this.diskCost != -1 && (this.diskCost < o.diskCost || o.diskCost == -1)) {
return -1;
} else if (o.diskCost != -1 && (this.diskCost > o.diskCost || this.diskCost == -1)) {
return 1;
} else if (this.diskCost == -1 && o.diskCost == -1) {
// both have unknown disk cost (and equal or no heuristic net cost). treat the costs as equal
return 0;
}
// finally, check the CPU cost. heuristic before quantifiable
if (this.heuristicCpuCost < o.heuristicCpuCost) {
// next, check the disk cost. again, if we have actual costs on both, use them, otherwise use the heuristic costs.
if (this.cpuCost > -1 && o.cpuCost > -1) {
return this.cpuCost < o.cpuCost ? -1 : this.cpuCost > o.cpuCost ? 1 : 0;
} else if (this.heuristicCpuCost < o.heuristicCpuCost) {
return -1;
} else if (this.heuristicCpuCost > o.heuristicCpuCost) {
return 1;
} else if (this.cpuCost != -1 && (this.cpuCost < o.cpuCost || o.cpuCost == -1)) {
return -1;
} else if (o.cpuCost != -1 && (this.cpuCost > o.cpuCost || this.cpuCost == -1)) {
return 1;
} else {
return 0;
}
......
......@@ -48,10 +48,11 @@ public class DefaultCostEstimator extends CostEstimator
// partitioning. no disk costs.
final long estOutShipSize = estimates.getEstimatedOutputSize();
if (estOutShipSize <= 0) {
costs.addHeuristicNetworkCost(HEURISTIC_COST_BASE);
costs.setNetworkCost(Costs.UNKNOWN);
} else {
costs.addNetworkCost(estOutShipSize);
}
costs.addHeuristicNetworkCost(HEURISTIC_COST_BASE);
}
@Override
......@@ -60,10 +61,11 @@ public class DefaultCostEstimator extends CostEstimator
// partitioning. no disk costs.
final long estOutShipSize = estimates.getEstimatedOutputSize();
if (estOutShipSize <= 0) {
costs.addHeuristicNetworkCost(HEURISTIC_COST_BASE);
costs.setNetworkCost(Costs.UNKNOWN);
} else {
costs.addNetworkCost(estOutShipSize);
}
costs.addHeuristicNetworkCost(HEURISTIC_COST_BASE);
}
@Override
......@@ -74,14 +76,14 @@ public class DefaultCostEstimator extends CostEstimator
final long sampled = (long) (dataSize * 0.1f);
// set shipping costs
costs.addNetworkCost(dataSize + sampled);
// we assume a two phase merge sort, so all in all 2 I/O operations per block
costs.addDiskCost(2 * sampled);
} else {
// no costs known. use the same assumption as above on the heuristic costs
final long sampled = (long) (HEURISTIC_COST_BASE * 0.1f);
costs.addHeuristicNetworkCost(HEURISTIC_COST_BASE + sampled);
costs.addHeuristicDiskCost(2 * sampled);
costs.setNetworkCost(Costs.UNKNOWN);
}
// no costs known. use the same assumption as above on the heuristic costs
final long sampled = (long) (HEURISTIC_COST_BASE * 0.1f);
costs.addHeuristicNetworkCost(HEURISTIC_COST_BASE + sampled);
costs.addHeuristicDiskCost(2 * sampled);
}
@Override
......@@ -89,10 +91,11 @@ public class DefaultCostEstimator extends CostEstimator
// assumption: we need ship the whole data over the network to each node.
final long estOutShipSize = estimates.getEstimatedOutputSize();
if (estOutShipSize <= 0) {
costs.addHeuristicNetworkCost(HEURISTIC_COST_BASE * replicationFactor);
costs.setNetworkCost(Costs.UNKNOWN);
} else {
costs.addNetworkCost(replicationFactor * estOutShipSize);
}
costs.addHeuristicNetworkCost(HEURISTIC_COST_BASE * replicationFactor);
}
// --------------------------------------------------------------------------------------------
......@@ -104,8 +107,9 @@ public class DefaultCostEstimator extends CostEstimator
if (fileSizeInBytes >= 0) {
costs.addDiskCost(fileSizeInBytes);
} else {
costs.addHeuristicDiskCost(HEURISTIC_COST_BASE);
costs.setDiskCost(Costs.UNKNOWN);
}
costs.addHeuristicDiskCost(HEURISTIC_COST_BASE);
}
@Override
......@@ -113,10 +117,11 @@ public class DefaultCostEstimator extends CostEstimator
final long s = estimates.getEstimatedOutputSize();
// we assume a two phase merge sort, so all in all 2 I/O operations per block
if (s <= 0) {
costs.addHeuristicDiskCost(2 * HEURISTIC_COST_BASE);
costs.setDiskCost(Costs.UNKNOWN);
} else {
costs.addDiskCost(2 * s);
}
costs.addHeuristicDiskCost(2 * HEURISTIC_COST_BASE);
}
@Override
......@@ -143,8 +148,9 @@ public class DefaultCostEstimator extends CostEstimator
if (bs > 0 && ps > 0) {
costs.addDiskCost(bs + ps);
} else {
costs.addHeuristicDiskCost(2 * HEURISTIC_COST_BASE);
costs.setDiskCost(Costs.UNKNOWN);
}
costs.addHeuristicDiskCost(2 * HEURISTIC_COST_BASE);
}
@Override
......@@ -158,9 +164,11 @@ public class DefaultCostEstimator extends CostEstimator
costs.addDiskCost(oc * is);
}
} else {
// hack: assume 100k loops (should be expensive enough)
costs.addHeuristicDiskCost(HEURISTIC_COST_BASE * 100000);
costs.setDiskCost(Costs.UNKNOWN);
}
// hack: assume 100k loops (should be expensive enough)
costs.addHeuristicDiskCost(HEURISTIC_COST_BASE * 100000);
}
@Override
......@@ -172,9 +180,11 @@ public class DefaultCostEstimator extends CostEstimator
long loops = Math.max(os / blockSize, 1);
costs.addDiskCost(loops * is);
} else {
// hack: assume 1k loops (much cheaper than the streamed variant!)
costs.addHeuristicDiskCost(HEURISTIC_COST_BASE * 1000);
costs.setDiskCost(Costs.UNKNOWN);
}
// hack: assume 1k loops (much cheaper than the streamed variant!)
costs.addHeuristicDiskCost(HEURISTIC_COST_BASE * 1000);
}
// --------------------------------------------------------------------------------------------
......@@ -189,9 +199,10 @@ public class DefaultCostEstimator extends CostEstimator
final long s = estimates.getEstimatedOutputSize();
// we assume spilling and re-reading
if (s <= 0) {
costs.addHeuristicDiskCost(2 * HEURISTIC_COST_BASE);
costs.setDiskCost(Costs.UNKNOWN);
} else {
costs.addDiskCost(2 * s);
}
costs.addHeuristicDiskCost(2 * HEURISTIC_COST_BASE);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册