提交 2414e7f5 编写于 作者: S sewen

Reworking of cost estimation.

Reworking of data properties.
上级 9805f397
......@@ -31,7 +31,7 @@ import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.pact.compiler.CompilerException;
import eu.stratosphere.pact.compiler.DataStatistics;
import eu.stratosphere.pact.compiler.PactCompiler;
import eu.stratosphere.pact.compiler.costs.FixedSizeClusterCostEstimator;
import eu.stratosphere.pact.compiler.costs.DefaultCostEstimator;
import eu.stratosphere.pact.compiler.jobgen.JSONGenerator;
import eu.stratosphere.pact.compiler.jobgen.JobGraphGenerator;
import eu.stratosphere.pact.compiler.plan.OptimizedPlan;
......@@ -63,7 +63,7 @@ public class Client {
nepheleConfig.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, jobManagerAddress.getAddress().getHostAddress());
nepheleConfig.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerAddress.getPort());
this.compiler = new PactCompiler(new DataStatistics(), new FixedSizeClusterCostEstimator(), jobManagerAddress);
this.compiler = new PactCompiler(new DataStatistics(), new DefaultCostEstimator(), jobManagerAddress);
}
/**
......@@ -87,7 +87,7 @@ public class Client {
}
final InetSocketAddress jobManagerAddress = new InetSocketAddress(address, port);
this.compiler = new PactCompiler(new DataStatistics(), new FixedSizeClusterCostEstimator(), jobManagerAddress);
this.compiler = new PactCompiler(new DataStatistics(), new DefaultCostEstimator(), jobManagerAddress);
}
......
......@@ -77,7 +77,7 @@ import eu.stratosphere.pact.common.type.Value;
import eu.stratosphere.pact.common.util.PactConfigConstants;
import eu.stratosphere.pact.compiler.CompilerException;
import eu.stratosphere.pact.compiler.PactCompiler;
import eu.stratosphere.pact.compiler.costs.FixedSizeClusterCostEstimator;
import eu.stratosphere.pact.compiler.costs.DefaultCostEstimator;
import eu.stratosphere.pact.compiler.jobgen.JobGraphGenerator;
import eu.stratosphere.pact.compiler.plan.OptimizedPlan;
import eu.stratosphere.pact.compiler.plan.OptimizerNode;
......@@ -1075,7 +1075,7 @@ public class TestPlan implements Closeable {
}
}
private static final class CostEstimator extends FixedSizeClusterCostEstimator {
private static final class CostEstimator extends DefaultCostEstimator {
private CostEstimator() {
super();
}
......
......@@ -68,6 +68,18 @@ public class Costs implements Comparable<Costs>, Cloneable {
public void setNetworkCost(long bytes) {
this.networkCost = bytes;
}
/**
* Adds the costs for network to the current network costs
* for this Costs object.
*
* @param bytes
* The network cost to add, in bytes to be transferred.
*/
public void addNetworkCost(long bytes) {
this.networkCost =
(this.networkCost < 0 || bytes < 0) ? -1 : this.networkCost + bytes;
}
/**
* Gets the costs for secondary storage.
......@@ -87,6 +99,18 @@ public class Costs implements Comparable<Costs>, Cloneable {
public void setSecondaryStorageCost(long bytes) {
this.secondaryStorageCost = bytes;
}
/**
* Adds the costs for secondary storage to the current secondary storage costs
* for this Costs object.
*
* @param bytes
* The secondary storage cost to add, in bytes to be written and read.
*/
public void addSecondaryStorageCost(long bytes) {
this.secondaryStorageCost =
(this.secondaryStorageCost < 0 || bytes < 0) ? -1 : this.secondaryStorageCost + bytes;
}
/**
* Adds the given costs to these costs. If for one of the different cost components (network, secondary storage),
......
......@@ -27,7 +27,7 @@ import eu.stratosphere.pact.common.io.statistics.BaseStatistics;
* <p>
* This class is thread safe.
*
* @author Stephan Ewen (stephan.ewen@tu-berlin.de)
* @author Stephan Ewen
*/
public class DataStatistics
{
......@@ -38,8 +38,7 @@ public class DataStatistics
/**
* Creates a new statistics object, with an empty cache.
*/
public DataStatistics()
{
public DataStatistics() {
this.baseStatisticsCache = new HashMap<String, BaseStatistics>();
}
......
......@@ -15,19 +15,22 @@
package eu.stratosphere.pact.compiler;
import eu.stratosphere.pact.common.contract.Order;
import eu.stratosphere.pact.common.contract.Ordering;
import eu.stratosphere.pact.common.util.FieldList;
import eu.stratosphere.pact.compiler.plan.OptimizerNode;
/**
* This class represents global properties of the data. Global properties are properties that
* describe data across different partitions.
* <p>
* Currently, the properties are the following: A partitioning type (ANY, HASH, RANGE), and EITHER an ordering (for range partitioning)
* or an FieldSet with the hash partitioning columns.
*/
public final class GlobalProperties implements Cloneable
{
private PartitionProperty partitioning; // the type partitioning
private OptimizerFieldSet partitioningFields;
private OptimizerFieldSet partitioningFields; // the fields which are partitioned
private Ordering ordering; // order of the partitioned fields, if it is an ordered (range) range partitioning
......@@ -47,7 +50,15 @@ public final class GlobalProperties implements Cloneable
public GlobalProperties(PartitionProperty partitioning, Ordering ordering) {
this.partitioning = partitioning;
this.ordering = ordering;
this.partitioningFields = OptimizerFieldList.getFromOrdering(ordering);
}
/**
* @param partitioning
* @param partitioningFields
*/
public GlobalProperties(PartitionProperty partitioning, OptimizerFieldSet partitioningFields) {
this.partitioning = partitioning;
this.partitioningFields = partitioningFields;
}
// --------------------------------------------------------------------------------------------
......@@ -67,7 +78,6 @@ public final class GlobalProperties implements Cloneable
public void setPartitioning(PartitionProperty partitioning, Ordering ordering) {
this.partitioning = partitioning;
this.ordering = ordering;
this.partitioningFields = OptimizerFieldList.getFromOrdering(ordering);
}
/**
......@@ -79,8 +89,13 @@ public final class GlobalProperties implements Cloneable
return partitioning;
}
/**
* Gets the fields on which the data is partitioned.
*
* @return The partitioning fields.
*/
public OptimizerFieldSet getPartitionedFields() {
return this.partitioning == PartitionProperty.NONE ? null : this.partitioningFields;
return this.partitioningFields;
}
/**
......@@ -105,6 +120,7 @@ public final class GlobalProperties implements Cloneable
public void reset() {
this.partitioning = PartitionProperty.NONE;
this.ordering = null;
this.partitioningFields = null;
}
/**
......@@ -114,85 +130,60 @@ public final class GlobalProperties implements Cloneable
* The output contract.
* @return True, if any non-default value is preserved, false otherwise.
*/
public boolean filterByNodesConstantSet(OptimizerNode node, int input) {
// // check if partitioning survives
// if (partitionedFields != null) {
// for (Integer index : partitionedFields) {
// if (node.isFieldKept(input, index) == false) {
// partitionedFields = null;
// partitioning = PartitionProperty.NONE;
// }
// }
// }
//
// // check, whether the global order is preserved
// if (ordering != null) {
// ArrayList<Integer> involvedIndexes = ordering.getInvolvedIndexes();
// for (int i = 0; i < involvedIndexes.size(); i++) {
// if (node.isFieldKept(input, i) == false) {
// ordering = ordering.createNewOrderingUpToIndex(i);
// break;
// }
// }
// }
//
// return !isTrivial();
return false;
public GlobalProperties filterByNodesConstantSet(OptimizerNode node, int input)
{
// check if partitioning survives
if (this.ordering != null) {
for (int col : this.ordering.getInvolvedIndexes()) {
if (!node.isFieldConstant(input, col)) {
return null;
}
}
} else if (this.partitioningFields != null) {
for (ColumnWithType col : this.partitioningFields) {
if (!node.isFieldConstant(input, col.getColumnIndex())) {
return null;
}
}
}
return this;
}
public GlobalProperties createInterestingGlobalProperties(OptimizerNode node, int input) {
// // check if partitioning survives
// ArrayList<Integer> newPartitionedFields = null;
// PartitionProperty newPartitioning = PartitionProperty.NONE;
// Ordering newOrdering = null;
//
// if (node instanceof UnionNode) {
// //only HashPartitioning is interesting for union nodes
// if (partitioning == PartitionProperty.HASH_PARTITIONED) {
// // fields are kept as there is no user code involved
// newPartitioning = PartitionProperty.HASH_PARTITIONED;
// newPartitionedFields = partitionedFields;
// }
// }
// else {
// if (partitionedFields != null) {
// for (Integer index : partitionedFields) {
// if (node.isFieldKept(input, index) == true) {
// if (newPartitionedFields == null) {
// newPartitioning = this.partitioning;
// newPartitionedFields = new ArrayList<Integer>();
// }
// newPartitionedFields.add(index);
// }
// }
// }
//
// // check, whether the global order is preserved
// if (ordering != null) {
// boolean orderingPreserved = true;
// ArrayList<Integer> involvedIndexes = ordering.getInvolvedIndexes();
// for (int i = 0; i < involvedIndexes.size(); i++) {
// if (node.isFieldKept(input, i) == false) {
// orderingPreserved = false;
// break;
// }
// }
//
// if (orderingPreserved) {
// newOrdering = ordering.clone();
// }
// }
// }
//
// if (newPartitioning == PartitionProperty.NONE && newOrdering == null) {
return null;
// } else {
// FieldList partitionFields = new FieldList();
// partitionFields.addAll(newPartitionedFields);
// return new GlobalProperties(newPartitioning, newOrdering, partitionFields);
// }
public GlobalProperties createInterestingGlobalPropertiesTopDownSubset(OptimizerNode node, int input)
{
// check, whether the global order is preserved
if (this.ordering != null) {
for (int i : this.ordering.getInvolvedIndexes()) {
if (!node.isFieldConstant(input, i)) {
return null;
}
}
return this;
}
else if (partitioningFields != null) {
boolean allIn = true;
boolean atLeasOneIn = false;
for (ColumnWithType col : this.partitioningFields) {
boolean res = node.isFieldConstant(input, col.getColumnIndex());
allIn &= res;
atLeasOneIn |= res;
}
if (allIn) {
return this;
} else if (atLeasOneIn) {
OptimizerFieldSet newFields = new OptimizerFieldSet();
for (ColumnWithType nc : this.partitioningFields) {
if (node.isFieldConstant(input, nc.getColumnIndex())) {
newFields.add(nc);
}
}
return new GlobalProperties(this.partitioning, newFields);
} else {
return null;
}
} else {
return this;
}
}
/**
......@@ -203,33 +194,44 @@ public final class GlobalProperties implements Cloneable
* The properties for which to check whether they meet these properties.
* @return True, if the properties are met, false otherwise.
*/
public boolean isMetBy(GlobalProperties other) {
// if (this.partitioning != PartitionProperty.NONE) {
// if (this.partitioning == PartitionProperty.ANY) {
// if (other.partitioning == PartitionProperty.NONE) {
// return false;
// }
// } else if (other.partitioning != this.partitioning) {
// return false;
// }
// }
//
// FieldList otherPartitionedFields = other.getPartitionedFields();
// if (this.partitionedFields != null) {
// if (other.partitionedFields == null) {
// return false;
// }
// if(!otherPartitionedFields.containsAll(this.partitionedFields)) {
// return false;
// }
// }
//
// if (this.ordering != null && this.ordering.isMetBy(other.getOrdering()) == false) {
// return false;
// }
//
// return true;
return false;
public boolean isMetBy(GlobalProperties other)
{
if (this.partitioning == PartitionProperty.NONE) {
return true;
}
if (this.partitioning == PartitionProperty.ANY) {
if (other.partitioning == PartitionProperty.NONE) {
return false;
}
} else if (other.partitioning != this.partitioning) {
return false;
}
if (this.ordering != null) {
if (other.ordering == null)
throw new CompilerException("BUG: Equal partitioning property, ordering not equally set.");
if (this.ordering.getInvolvedIndexes().isValidSubset(other.ordering.getInvolvedIndexes())) {
// check if the directions match
for (int i = 0; i < other.ordering.getNumberOfFields(); i++) {
Order to = this.ordering.getOrder(i);
Order oo = other.ordering.getOrder(i);
if (to == Order.NONE)
continue;
if (oo == Order.NONE)
return false;
if (to != Order.ANY && to != oo)
return false;
}
return true;
} else {
return false;
}
} else if (this.partitioningFields != null) {
return this.partitioningFields.isValidSubset(other.partitioningFields);
} else {
throw new RuntimeException("Found a partitioning property, but no fields.");
}
}
// ------------------------------------------------------------------------
......@@ -242,9 +244,9 @@ public final class GlobalProperties implements Cloneable
public int hashCode() {
final int prime = 31;
int result = 1;
// result = prime * result + ((partitioning == null) ? 0 : partitioning.hashCode());
// result = prime * result + ((partitionedFields == null) ? 0 : partitionedFields.hashCode());
// result = prime * result + ((ordering == null) ? 0 : ordering.hashCode());
result = prime * result + ((partitioning == null) ? 0 : partitioning.hashCode());
result = prime * result + ((partitioningFields == null) ? 0 : partitioningFields.hashCode());
result = prime * result + ((ordering == null) ? 0 : ordering.hashCode());
return result;
}
......@@ -254,22 +256,15 @@ public final class GlobalProperties implements Cloneable
*/
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
} else if (obj == null) {
return false;
} else if (getClass() != obj.getClass()) {
if (obj != null && obj instanceof GlobalProperties) {
GlobalProperties other = (GlobalProperties) obj;
return (ordering == other.getOrdering() || (ordering != null && ordering.equals(other.getOrdering())))
&& (partitioning == other.getPartitioning())
&& (partitioningFields == other.partitioningFields ||
(partitioningFields != null && partitioningFields.equals(other.getPartitionedFields())));
} else {
return false;
}
// GlobalProperties other = (GlobalProperties) obj;
// if ((ordering == other.getOrdering() || (ordering != null && ordering.equals(other.getOrdering())))
// && partitioning == other.getPartitioning() && partitionedFields != null
// && partitionedFields.equals(other.getPartitionedFields())) {
// return true;
// } else {
return false;
// }
}
/*
......@@ -278,31 +273,18 @@ public final class GlobalProperties implements Cloneable
*/
@Override
public String toString() {
return "GlobalProperties [partitioning=" + partitioning + " on fields=" // + partitionedFields + ", ordering="
+ ordering + "]";
return "GlobalProperties [partitioning=" + partitioning +
(this.partitioningFields == null ? "" : ", on fields " + this.partitioningFields) +
(this.ordering == null ? "" : ", with ordering " + this.ordering) + "]";
}
/*
* (non-Javadoc)
* @see java.lang.Object#clone()
*/
public GlobalProperties clone() throws CloneNotSupportedException {
GlobalProperties newProps = (GlobalProperties) super.clone();
if (this.ordering != null) {
newProps.ordering = this.ordering.clone();
}
return newProps;
}
/**
* Convenience method to create copies without the cloning exception.
*
* @return A perfect deep copy of this object.
*/
public final GlobalProperties createCopy() {
public GlobalProperties clone() {
try {
return this.clone();
return (GlobalProperties) super.clone();
} catch (CloneNotSupportedException cnse) {
// should never happen, but propagate just in case
throw new RuntimeException(cnse);
......
......@@ -30,7 +30,7 @@ public final class LocalProperties implements Cloneable
{
private Ordering ordering; // order inside a partition, null if not ordered
private FieldSet groupedFields; // fields by which the stream is grouped. null if not grouped.
private OptimizerFieldSet groupedFields; // fields by which the stream is grouped. null if not grouped.
private FieldSet uniqueFields; // fields whose value combination is unique in the stream
......@@ -41,27 +41,18 @@ public final class LocalProperties implements Cloneable
*/
public LocalProperties() {}
/**
* Creates a new instance of local properties that have only the given ordering as property.
* The ordering does automatically imply a grouping, though.
*
* @param ordering The ordering represented by these local properties.
*/
public LocalProperties(Ordering ordering) {
this.ordering = ordering;
this.groupedFields = ordering.getInvolvedIndexes();
}
/**
* Creates a new instance of local properties that have the given ordering as property,
* field grouping and field uniqueness. Any of the given parameters may be null. Beware, though,
* that a null grouping is inconsistent with a non-null ordering.
* <p>
* This constructor is used only for internal copy creation.
*
* @param ordering The ordering represented by these local properties.
* @param groupedFields The grouped fields for these local properties.
* @param uniqueFields The unique fields for these local properties.
*/
public LocalProperties(Ordering ordering, FieldSet groupedFields, FieldSet uniqueFields) {
private LocalProperties(Ordering ordering, OptimizerFieldSet groupedFields, FieldSet uniqueFields) {
this.ordering = ordering;
this.groupedFields = groupedFields;
this.uniqueFields = uniqueFields;
......@@ -93,7 +84,7 @@ public final class LocalProperties implements Cloneable
*
* @return The grouped fields, or <code>null</code> if nothing is grouped.
*/
public FieldSet getGroupedFields() {
public OptimizerFieldSet getGroupedFields() {
return this.groupedFields;
}
......@@ -102,7 +93,7 @@ public final class LocalProperties implements Cloneable
*
* @param groupedFields The fields that are grouped in these data properties.
*/
public void setGroupedFields(FieldSet groupedFields) {
public void setGroupedFields(OptimizerFieldSet groupedFields) {
this.groupedFields = groupedFields;
}
......@@ -155,24 +146,32 @@ public final class LocalProperties implements Cloneable
*
* @return True, if the resulting properties are non trivial.
*/
public boolean filterByNodesConstantSet(OptimizerNode node, int input) {
public LocalProperties filterByNodesConstantSet(OptimizerNode node, int input)
{
// check, whether the local order is preserved
Ordering no = this.ordering;
FieldSet ngf = this.groupedFields;
FieldSet nuf = this.uniqueFields;
if (this.ordering != null) {
FieldList involvedIndexes = this.ordering.getInvolvedIndexes();
for (int i = 0; i < involvedIndexes.size(); i++) {
if (!node.isFieldConstant(input, involvedIndexes.get(i))) {
this.ordering = this.ordering.createNewOrderingUpToIndex(i);
if (i == 0) {
no = null;
ngf = null;
} else {
no = this.ordering.createNewOrderingUpToIndex(i);
ngf = no.getInvolvedIndexes();
}
break;
}
}
}
// check, whether the local key grouping is preserved
if (this.groupedFields != null) {
} else if (this.groupedFields != null) {
// check, whether the local key grouping is preserved
for (Integer index : this.groupedFields) {
if (!node.isFieldConstant(input, index)) {
this.groupedFields = null;
break;
ngf = null;
}
}
}
......@@ -181,65 +180,15 @@ public final class LocalProperties implements Cloneable
if (this.uniqueFields != null) {
for (Integer index : this.uniqueFields) {
if (!node.isFieldConstant(input, index)) {
this.uniqueFields = null;
nuf = null;
break;
}
}
}
return !isTrivial();
}
public LocalProperties createInterestingLocalProperties(OptimizerNode node, int input)
{
// // check, whether the local order is preserved
// boolean newGrouped = false;
// Ordering newOrdering = null;
// FieldSet newGroupedFields = null;
//
//
// // no interesting LocalProperties for input of Unions
// if (node instanceof UnionNode) return null;
//
//
// // check, whether the local key grouping is preserved
// if (this.groupedFields != null) {
// boolean groupingPreserved = true;
// for (Integer index : this.groupedFields) {
// if (node.isFieldKept(input, index) == false) {
// groupingPreserved = false;
// break;
// }
// }
//
// if (groupingPreserved) {
// newGroupedFields = (FieldSet) this.groupedFields.clone();
// newGrouped = true;
// }
// }
//
// // check, whether the global order is preserved
// if (ordering != null) {
// boolean orderingPreserved = true;
// ArrayList<Integer> involvedIndexes = ordering.getInvolvedIndexes();
// for (int i = 0; i < involvedIndexes.size(); i++) {
// if (node.isFieldKept(input, i) == false) {
// orderingPreserved = false;
// break;
// }
// }
//
// if (orderingPreserved) {
// newOrdering = ordering.clone();
// }
// }
//
// if (newGrouped == false && newOrdering == null) {
return null;
// }
// else {
// return new LocalProperties(newGrouped, newGroupedFields, newOrdering);
// }
return (no == this.ordering && ngf == this.groupedFields && nuf == this.uniqueFields) ? this :
(no == null && ngf == null && nuf == null) ? null :
new LocalProperties(no, ngf, nuf);
}
/**
......
......@@ -46,7 +46,7 @@ import eu.stratosphere.pact.common.plan.Plan;
import eu.stratosphere.pact.common.plan.Visitor;
import eu.stratosphere.pact.common.util.PactConfigConstants;
import eu.stratosphere.pact.compiler.costs.CostEstimator;
import eu.stratosphere.pact.compiler.costs.FixedSizeClusterCostEstimator;
import eu.stratosphere.pact.compiler.costs.DefaultCostEstimator;
import eu.stratosphere.pact.compiler.plan.CoGroupNode;
import eu.stratosphere.pact.compiler.plan.CrossNode;
import eu.stratosphere.pact.compiler.plan.DataSinkNode;
......@@ -363,7 +363,7 @@ public class PactCompiler {
* The address of the job manager (to obtain system characteristics) is determined via the global configuration.
*/
public PactCompiler() {
this(null, new FixedSizeClusterCostEstimator());
this(null, new DefaultCostEstimator());
}
/**
......@@ -378,7 +378,7 @@ public class PactCompiler {
* The statistics to be used to determine the input properties.
*/
public PactCompiler(DataStatistics stats) {
this(stats, new FixedSizeClusterCostEstimator());
this(stats, new DefaultCostEstimator());
}
/**
......
......@@ -15,53 +15,39 @@
package eu.stratosphere.pact.compiler.costs;
import java.util.List;
import java.util.Iterator;
import eu.stratosphere.pact.compiler.CompilerException;
import eu.stratosphere.pact.compiler.Costs;
import eu.stratosphere.pact.compiler.plan.OptimizerNode;
import eu.stratosphere.pact.compiler.plan.PactConnection;
import eu.stratosphere.pact.runtime.resettable.BlockResettableMutableObjectIterator;
import eu.stratosphere.pact.compiler.plan.EstimateProvider;
import eu.stratosphere.pact.compiler.plan.candidate.Channel;
import eu.stratosphere.pact.compiler.plan.candidate.PlanNode;
/**
* @author Stephan Ewen (stephan.ewen@tu-berlin.de)
* @author Stephan Ewen
*/
public abstract class CostEstimator {
public abstract void getRangePartitionCost(PactConnection conn, Costs costs);
public abstract void addRangePartitionCost(EstimateProvider estimates, Costs costs);
public abstract void getHashPartitioningCost(PactConnection conn, Costs costs);
public abstract void addHashPartitioningCost(EstimateProvider estimates, Costs costs);
public abstract void getBroadcastCost(PactConnection conn, Costs costs);
public abstract void addBroadcastCost(EstimateProvider estimates, int replicationFactor, Costs costs);
// ------------------------------------------------------------------------
public abstract void getLocalSortCost(OptimizerNode node, PactConnection input, Costs costs);
public abstract void getLocalDoubleSortMergeCost(OptimizerNode node, PactConnection input1, PactConnection input2,
Costs costs);
public abstract void getLocalSingleSortMergeCost(OptimizerNode node, PactConnection input1, PactConnection input2,
Costs costs);
public abstract void addLocalSortCost(EstimateProvider estimates, long memorySize, Costs costs);
public abstract void getLocalMergeCost(OptimizerNode node, PactConnection input1, PactConnection input2,
Costs costs);
public abstract void addLocalMergeCost(EstimateProvider estimates1, EstimateProvider estimates2, long memorySize, Costs costs);
public abstract void getLocalSortSelfNestedLoopCost(OptimizerNode node, PactConnection input, int bufferSize, Costs costs);
public abstract void getLocalSelfNestedLoopCost(OptimizerNode node, PactConnection input, int bufferSize, Costs costs);
public abstract void addLocalSelfNestedLoopCost(EstimateProvider estimates, long bufferSize, Costs costs);
public abstract void getHybridHashCosts(OptimizerNode node, PactConnection buildSideInput,
PactConnection probeSideInput, Costs costs);
public abstract void addHybridHashCosts(EstimateProvider buildSide, EstimateProvider probeSide, long memorySize, Costs costs);
public abstract void getMainMemHashCosts(OptimizerNode node, PactConnection buildSideInput,
PactConnection probeSideInput, Costs costs);
public abstract void addStreamedNestedLoopsCosts(EstimateProvider outerSide, EstimateProvider innerSide, long bufferSize, Costs costs);
public abstract void getStreamedNestedLoopsCosts(OptimizerNode node, PactConnection outerSide,
PactConnection innerSide, int bufferSize, Costs costs);
public abstract void getBlockNestedLoopsCosts(OptimizerNode node, PactConnection outerSide, PactConnection innerSide,
int blockSize, Costs costs);
public abstract void addBlockNestedLoopsCosts(EstimateProvider outerSide, EstimateProvider innerSide, long blockSize, Costs costs);
// ------------------------------------------------------------------------
......@@ -69,126 +55,106 @@ public abstract class CostEstimator {
* This method computes the costs for an operator. It requires that all inputs are set and have a proper
* <tt>ShipStrategy</tt> set, which is not equal to <tt>NONE</tt>.
*
* @param n
* The node to compute the costs for.
* @param n The node to compute the costs for.
*/
public void costOperator(OptimizerNode n) {
if (n.getIncomingConnections() == null) {
throw new CompilerException("Cannot compute costs on operator before incoming connections are set.");
}
public void costOperator(PlanNode n)
{
// initialize costs objects with currently unknown costs
Costs globCost = new Costs();
Costs locCost = new Costs();
globCost.setNetworkCost(0);
globCost.setSecondaryStorageCost(0);
final Costs costs = new Costs();
final long availableMemory = n.getTotalAvailableMemory();
List<PactConnection> incomingConnections = n.getIncomingConnections();
for (int i = 0; i < incomingConnections.size(); i++) {
PactConnection connection = incomingConnections.get(i);
// add the shipping strategy costs
for (Iterator<Channel> channels = n.getInputs(); channels.hasNext(); ) {
final Channel channel = channels.next();
Costs tempGlobalCost = new Costs();
switch (connection.getShipStrategy().type()) {
switch (channel.getShipStrategy()) {
case NONE:
throw new CompilerException(
"Cannot determine costs: Shipping strategy has not been set for an input.");
case FORWARD:
case PARTITION_LOCAL_HASH:
tempGlobalCost.setNetworkCost(0);
tempGlobalCost.setSecondaryStorageCost(0);
break;
case PARTITION_HASH:
getHashPartitioningCost(connection, tempGlobalCost);
addHashPartitioningCost(channel, costs);
break;
case PARTITION_RANGE:
getRangePartitionCost(connection, tempGlobalCost);
addRangePartitionCost(channel, costs);
break;
case BROADCAST:
getBroadcastCost(connection, tempGlobalCost);
addBroadcastCost(channel, channel.getReplicationFactor(), costs);
break;
case SFR:
throw new CompilerException("Symmetric-Fragment-And-Replicate Strategy currently not supported.");
default:
throw new CompilerException("Unknown shipping strategy for input: " + connection.getShipStrategy().name());
throw new CompilerException("Unknown shipping strategy for input: " + channel.getShipStrategy());
}
globCost.addCosts(tempGlobalCost);
}
PactConnection primConn = null;
PactConnection secConn = null;
Channel firstInput = null;
Channel secondInput = null;
// get the inputs, if we have some
{
if (incomingConnections.size() > 0) {
primConn = incomingConnections.get(0);
}
if (incomingConnections.size() > 1) {
secConn = incomingConnections.get(1);
}
Iterator<Channel> channels = n.getInputs();
if (channels.hasNext())
firstInput = channels.next();
if (channels.hasNext())
secondInput = channels.next();
}
// determine the local costs
locCost.setNetworkCost(0);
switch (n.getLocalStrategy()) {
case NONE:
locCost.setNetworkCost(0);
locCost.setSecondaryStorageCost(0);
break;
case COMBININGSORT:
case SORT:
getLocalSortCost(n, primConn, locCost);
addLocalSortCost(firstInput, availableMemory, costs);
break;
case SORT_BOTH_MERGE:
getLocalDoubleSortMergeCost(n, primConn, secConn, locCost);
addLocalSortCost(firstInput, availableMemory / 2, costs);
addLocalSortCost(secondInput, availableMemory / 2, costs);
addLocalMergeCost(firstInput, secondInput, 0, costs);
break;
case SORT_FIRST_MERGE:
getLocalSingleSortMergeCost(n, primConn, secConn, locCost);
addLocalSortCost(firstInput, availableMemory, costs);
addLocalMergeCost(firstInput, secondInput, 0, costs);
break;
case SORT_SECOND_MERGE:
getLocalSingleSortMergeCost(n, secConn, primConn, locCost);
addLocalSortCost(secondInput, availableMemory, costs);
addLocalMergeCost(firstInput, secondInput, 0, costs);
break;
case MERGE:
getLocalMergeCost(n, primConn, secConn, locCost);
addLocalMergeCost(firstInput, secondInput, 0, costs);
break;
case SORT_SELF_NESTEDLOOP:
getLocalSortSelfNestedLoopCost(n, primConn, 10, locCost);
addLocalSortCost(firstInput, availableMemory, costs);
addLocalSelfNestedLoopCost(firstInput, 10, costs);
break;
case SELF_NESTEDLOOP:
getLocalSelfNestedLoopCost(n, primConn, 10, locCost);
addLocalSelfNestedLoopCost(firstInput, 10, costs);
break;
case HYBRIDHASH_FIRST:
getHybridHashCosts(n, primConn, secConn, locCost);
addHybridHashCosts(firstInput, secondInput, availableMemory, costs);
break;
case HYBRIDHASH_SECOND:
getHybridHashCosts(n, secConn, primConn, locCost);
addHybridHashCosts(secondInput, firstInput, availableMemory, costs);
break;
case NESTEDLOOP_BLOCKED_OUTER_FIRST:
getBlockNestedLoopsCosts(n, primConn, secConn, BlockResettableMutableObjectIterator.MIN_BUFFER_SIZE, locCost);
addBlockNestedLoopsCosts(firstInput, secondInput, availableMemory, costs);
break;
case NESTEDLOOP_BLOCKED_OUTER_SECOND:
getBlockNestedLoopsCosts(n, secConn, primConn, BlockResettableMutableObjectIterator.MIN_BUFFER_SIZE, locCost);
addBlockNestedLoopsCosts(secondInput, firstInput, availableMemory, costs);
break;
case NESTEDLOOP_STREAMED_OUTER_FIRST:
getStreamedNestedLoopsCosts(n, primConn, secConn,
128 * 1024, locCost);
addStreamedNestedLoopsCosts(firstInput, secondInput, availableMemory, costs);
break;
case NESTEDLOOP_STREAMED_OUTER_SECOND:
getStreamedNestedLoopsCosts(n, secConn, primConn,
128 * 1024, locCost);
addStreamedNestedLoopsCosts(secondInput, firstInput, availableMemory, costs);
break;
default:
throw new CompilerException("Unknown local strategy: " + n.getLocalStrategy().name());
}
// add the costs and set them
globCost.addCosts(locCost);
n.setCosts(globCost);
n.setCosts(costs);
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.pact.compiler.costs;
import eu.stratosphere.pact.compiler.Costs;
import eu.stratosphere.pact.compiler.plan.EstimateProvider;
/**
* A default cost estimator that has access to basic size and cardinality estimates.
* <p>
* For robustness reasons, we always assume that the whole data is shipped during a repartition step. We deviate from
* the typical estimate of <code>(n - 1) / n</code> (with <i>n</i> being the number of nodes), because for a parallelism
* of 1, that would yield a shipping of zero bytes. While this is usually correct, the runtime scheduling may still
* choose to move tasks to different nodes, so that we do not know that no data is shipped.
*
* @author Stephan Ewen
*/
public class DefaultCostEstimator extends CostEstimator
{
@Override
public void addRangePartitionCost(EstimateProvider estimates, Costs costs)
{
final long dataSize = estimates.getEstimatedOutputSize();
if (dataSize != -1) {
// Assume sampling of 10% of the data and spilling it to disk
final long sampled = (long) (dataSize * 1.1f);
// set shipping costs
costs.addNetworkCost(dataSize + sampled);
// we assume a two phase merge sort, so all in all 2 I/O operations per block
costs.addSecondaryStorageCost(2 * sampled);
} else {
// no costs known
costs.setNetworkCost(-1);
costs.setSecondaryStorageCost(-1);
}
}
@Override
public void addHashPartitioningCost(EstimateProvider estimates, Costs costs) {
// conservative estimate: we need ship the whole data over the network to establish the
// partitioning. no disk costs.
final long estOutShipSize = estimates.getEstimatedOutputSize();
if (estOutShipSize == -1) {
costs.addNetworkCost(-1);
} else {
costs.addNetworkCost(estOutShipSize);
}
}
@Override
public void addBroadcastCost(EstimateProvider estimates, int replicationFactor, Costs costs) {
// assumption: we need ship the whole data over the network to each node.
final long estOutShipSize = estimates.getEstimatedOutputSize();
if (estOutShipSize < 0) {
costs.setNetworkCost(-1);
} else {
costs.addNetworkCost(replicationFactor * estOutShipSize);
}
}
@Override
public void addLocalSortCost(EstimateProvider estimates, long availableMemory, Costs costs) {
final long s = estimates.getEstimatedOutputSize();
// we assume a two phase merge sort, so all in all 2 I/O operations per block
costs.addSecondaryStorageCost(s < 0 ? -1 : 2 * s);
}
@Override
public void addLocalMergeCost(EstimateProvider input1, EstimateProvider input2, long availableMemory, Costs costs) {
}
@Override
public void addLocalSelfNestedLoopCost(EstimateProvider estimates, long bufferSize, Costs costs) {
long is = estimates.getEstimatedOutputSize();
long ic = estimates.getEstimatedNumRecords();
long loops = ic == -1 ? 10 : ic / bufferSize;
costs.addSecondaryStorageCost(is == -1 ? -1 : (loops + 2) * is);
}
@Override
public void addHybridHashCosts(EstimateProvider buildSideInput, EstimateProvider probeSideInput, long availableMemory, Costs costs) {
long bs = buildSideInput.getEstimatedOutputSize();
long ps = probeSideInput.getEstimatedOutputSize();
// half the table has to spill time 2 I/O
costs.addSecondaryStorageCost(bs < 0 || ps < 0 ? -1 : bs + ps);
}
@Override
public void addStreamedNestedLoopsCosts(EstimateProvider outerSide, EstimateProvider innerSide, long bufferSize, Costs costs) {
long is = innerSide.getEstimatedOutputSize();
long oc = outerSide.getEstimatedNumRecords();
// check whether the inner side can be cached
if (is > bufferSize) {
costs.addSecondaryStorageCost(is >= 0 && oc >= 0 ? oc * is : -1);
}
}
@Override
public void addBlockNestedLoopsCosts(EstimateProvider outerSide, EstimateProvider innerSide, long blockSize, Costs costs) {
long is = innerSide.getEstimatedOutputSize();
long os = outerSide.getEstimatedOutputSize();
long loops = os < 0 ? 1000 : Math.max(os / blockSize, 1);
costs.addSecondaryStorageCost(is == -1 ? -1 : loops * is);
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.pact.compiler.costs;
import eu.stratosphere.pact.common.contract.DataDistribution;
import eu.stratosphere.pact.compiler.Costs;
import eu.stratosphere.pact.compiler.plan.OptimizerNode;
import eu.stratosphere.pact.compiler.plan.PactConnection;
/**
* A cost estimator that assumes a given number of nodes in order to estimate the costs of
* shipping strategies.
* <p>
* For robustness reasons, we always assume that the whole data is shipped during a repartition step. We deviate from
* the typical estimate of <code>(n - 1) / n</code> (with <i>n</i> being the number of nodes), because for a parallelism
* of 1, that would yield a shipping of zero bytes. While this is usually correct, the runtime scheduling may still
* choose to move tasks to different nodes, so that we do not know that no data is shipped.
*
* @author Stephan Ewen (stephan.ewen@tu-berlin.de)
*/
public class FixedSizeClusterCostEstimator extends CostEstimator {
/**
* Creates a new cost estimator that assumes four nodes, unless
* the parameters of a contract indicate anything else.
*
*/
public FixedSizeClusterCostEstimator() {
}
/*
* (non-Javadoc)
* @see eu.stratosphere.pact.compiler.costs.CostEstimator#getRangePartitionCost(
* eu.stratosphere.pact.compiler.plan.PactConnection,
* eu.stratosphere.pact.compiler.Costs)
*/
@Override
public void getRangePartitionCost(PactConnection conn, Costs costs) {
Class<? extends DataDistribution> distribution = null;
if(distribution == null) {
if(conn.getSourcePact().getEstimatedOutputSize() != -1) {
// Assume sampling of 10% of the data
long estOutShipSize = (long)(conn.getReplicationFactor() * conn.getSourcePact().getEstimatedOutputSize() * 1.1);
// set shipping costs
costs.setNetworkCost((long) (1.5f * estOutShipSize));
// we assume a two phase merge sort, so all in all 2 I/O operations per block
costs.setSecondaryStorageCost(2 * estOutShipSize);
} else {
// no costs known
costs.setNetworkCost(-1);
costs.setSecondaryStorageCost(-1);
}
// TODO: reactivate if data distribution becomes available
// } else {
// // If data distribution is given, no extra sampling has to be done => same cost as HashPartitioning
//
// if(conn.getSourcePact().getEstimatedOutputSize() != -1) {
// long estOutShipSize = (long) conn.getReplicationFactor() * conn.getSourcePact().getEstimatedOutputSize();
// costs.setNetworkCost(estOutShipSize);
// } else {
// // no costs known
// costs.setNetworkCost(-1);
// }
// costs.setSecondaryStorageCost(0);
}
}
/*
* (non-Javadoc)
* @see eu.stratosphere.pact.compiler.costs.CostEstimator#getHashPartitioningCost(
* eu.stratosphere.pact.compiler.plan.PactConnection,
* eu.stratosphere.pact.compiler.Costs)
*/
@Override
public void getHashPartitioningCost(PactConnection conn, Costs costs) {
// conservative estimate: we need ship the whole data over the network to establish the
// partitioning. no disk costs.
final long estOutShipSize = conn.getReplicationFactor() * conn.getSourcePact().getEstimatedOutputSize();
if (estOutShipSize == -1) {
costs.setNetworkCost(-1);
} else {
costs.setNetworkCost(estOutShipSize);
}
costs.setSecondaryStorageCost(0);
}
/*
* (non-Javadoc)
* @see eu.stratosphere.pact.compiler.costs.CostEstimator#getBroadcastCost(
* eu.stratosphere.pact.compiler.plan.PactConnection,
* eu.stratosphere.pact.compiler.Costs)
*/
@Override
public void getBroadcastCost(PactConnection conn, Costs costs) {
// estimate: we need ship the whole data over the network to each node.
final int replicationFactor = conn.getReplicationFactor() < 1 ? 100 : conn.getReplicationFactor();
final long estOutShipSize = replicationFactor * conn.getSourcePact().getEstimatedOutputSize();
if (estOutShipSize == -1) {
costs.setNetworkCost(-1);
} else {
costs.setNetworkCost(estOutShipSize);
}
// no disk costs.
costs.setSecondaryStorageCost(0);
}
/*
* (non-Javadoc)
* @see eu.stratosphere.pact.compiler.costs.CostEstimator#getLocalSortCost(
* eu.stratosphere.pact.compiler.plan.OptimizerNode,
* eu.stratosphere.pact.compiler.plan.PactConnection,
* eu.stratosphere.pact.compiler.Costs)
*/
@Override
public void getLocalSortCost(OptimizerNode node, PactConnection input, Costs costs) {
costs.setNetworkCost(0);
long s = input.getSourcePact().getEstimatedOutputSize() * input.getReplicationFactor();
// we assume a two phase merge sort, so all in all 2 I/O operations per block
costs.setSecondaryStorageCost(s < 0 ? -1 : 2 * s);
}
/*
* (non-Javadoc)
* @see eu.stratosphere.pact.compiler.costs.CostEstimator#getLocalDoubleSortMergeCost(
* eu.stratosphere.pact.compiler.plan.OptimizerNode,
* eu.stratosphere.pact.compiler.plan.PactConnection,
* eu.stratosphere.pact.compiler.plan.PactConnection,
* eu.stratosphere.pact.compiler.Costs)
*/
@Override
public void getLocalDoubleSortMergeCost(OptimizerNode node, PactConnection input1, PactConnection input2, Costs costs) {
costs.setNetworkCost(0);
long s1 = input1.getSourcePact().getEstimatedOutputSize() * input1.getReplicationFactor();
long s2 = input2.getSourcePact().getEstimatedOutputSize() * input2.getReplicationFactor();
// we assume a two phase merge sort, so all in all 2 I/O operations per block for both sides
costs.setSecondaryStorageCost(s1 < 0 || s2 < 0 ? -1 : 2 * (s1 + s2));
}
/*
* (non-Javadoc)
* @see eu.stratosphere.pact.compiler.costs.CostEstimator#getLocalSingleSortMergeCost(
* eu.stratosphere.pact.compiler.plan.OptimizerNode,
* eu.stratosphere.pact.compiler.plan.PactConnection,
* eu.stratosphere.pact.compiler.plan.PactConnection,
* eu.stratosphere.pact.compiler.Costs)
*/
@Override
public void getLocalSingleSortMergeCost(OptimizerNode node, PactConnection unsortedInput, PactConnection sortedInput, Costs costs) {
costs.setNetworkCost(0);
long s1 = unsortedInput.getSourcePact().getEstimatedOutputSize() * unsortedInput.getReplicationFactor();
// we assume a two phase merge sort, so all in all 2 I/O operations per block for the unsorted input
costs.setSecondaryStorageCost(s1 < 0 ? -1 : 2 * s1);
}
/*
* (non-Javadoc)
* @see eu.stratosphere.pact.compiler.costs.CostEstimator#getLocalMergeCost(
* eu.stratosphere.pact.compiler.plan.OptimizerNode,
* eu.stratosphere.pact.compiler.plan.PactConnection,
* eu.stratosphere.pact.compiler.plan.PactConnection,
* eu.stratosphere.pact.compiler.Costs)
*/
@Override
public void getLocalMergeCost(OptimizerNode node, PactConnection input1, PactConnection input2, Costs costs) {
costs.setNetworkCost(0);
// inputs are sorted. No network and secondary storage costs produced
costs.setSecondaryStorageCost(0);
}
/*
* (non-Javadoc)
* @see eu.stratosphere.pact.compiler.costs.CostEstimator#getLocalSortSelfNestedLoopCost(
* eu.stratosphere.pact.compiler.plan.OptimizerNode,
* eu.stratosphere.pact.compiler.plan.PactConnection,
* int,
* eu.stratosphere.pact.compiler.Costs)
*/
@Override
public void getLocalSortSelfNestedLoopCost(OptimizerNode node, PactConnection input, int bufferSize, Costs costs) {
costs.setNetworkCost(0);
long is = input.getSourcePact().getEstimatedOutputSize() * input.getReplicationFactor();
long ic = input.getSourcePact().getEstimatedNumRecords();
long loops = ic < 0 ? 1000 : ic / bufferSize;
// we assume a two phase merge sort, so all in all 2 I/O operations per block
// plus I/O for the SpillingResettableIterators: 2 for writing plus reading
costs.setSecondaryStorageCost(is < 0 ? -1 : (loops + 4) * is);
}
/*
* (non-Javadoc)
* @see eu.stratosphere.pact.compiler.costs.CostEstimator#getLocalSelfNestedLoopCost(
* eu.stratosphere.pact.compiler.plan.OptimizerNode,
* eu.stratosphere.pact.compiler.plan.PactConnection,
* int,
* eu.stratosphere.pact.compiler.Costs)
*/
@Override
public void getLocalSelfNestedLoopCost(OptimizerNode node, PactConnection input, int bufferSize, Costs costs) {
long is = input.getSourcePact().getEstimatedOutputSize() * input.getReplicationFactor();
long ic = input.getSourcePact().getEstimatedNumRecords();
long loops = ic == -1 ? 10 : ic / bufferSize;
costs.setSecondaryStorageCost(is == -1 ? -1 : (loops + 2) * is);
}
/*
* (non-Javadoc)
* @see eu.stratosphere.pact.compiler.costs.CostEstimator#getHybridHashCosts(
* eu.stratosphere.pact.compiler.plan.OptimizerNode,
* eu.stratosphere.pact.compiler.plan.PactConnection,
* eu.stratosphere.pact.compiler.plan.PactConnection,
* eu.stratosphere.pact.compiler.Costs)
*/
@Override
public void getHybridHashCosts(OptimizerNode node, PactConnection buildSideInput, PactConnection probeSideInput,
Costs costs) {
costs.setNetworkCost(0);
long bs = buildSideInput.getSourcePact().getEstimatedOutputSize() * buildSideInput.getReplicationFactor();
long ps = probeSideInput.getSourcePact().getEstimatedOutputSize() * probeSideInput.getReplicationFactor();
// we assume that the build side has to spill and requires one recursive repartitioning
// so 4 I/O operations per block on the build side, and 2 on the probe side
// NOTE: This is currently artificially expensive to prevent the compiler from using the hash-strategies, which are
// being reworked from in-memory and grace towards a gradually degrading hybrid hash join
costs.setSecondaryStorageCost(bs < 0 || ps < 0 ? -1 : 2 * bs + ps);
}
/*
* (non-Javadoc)
* @see eu.stratosphere.pact.compiler.costs.CostEstimator#getMainMemHashCosts(
* eu.stratosphere.pact.compiler.plan.OptimizerNode,
* eu.stratosphere.pact.compiler.plan.PactConnection,
* eu.stratosphere.pact.compiler.plan.PactConnection,
* eu.stratosphere.pact.compiler.Costs)
*/
@Override
public void getMainMemHashCosts(OptimizerNode node, PactConnection buildSideInput, PactConnection probeSideInput,
Costs target) {
target.setNetworkCost(0);
target.setSecondaryStorageCost(0);
}
/*
* (non-Javadoc)
* @see eu.stratosphere.pact.compiler.costs.CostEstimator#getStreamedNestedLoopsCosts(
* eu.stratosphere.pact.compiler.plan.OptimizerNode,
* eu.stratosphere.pact.compiler.plan.PactConnection,
* eu.stratosphere.pact.compiler.plan.PactConnection,
* eu.stratosphere.pact.compiler.Costs)
*/
@Override
public void getStreamedNestedLoopsCosts(OptimizerNode node, PactConnection outerSide, PactConnection innerSide,
int bufferSize, Costs costs)
{
costs.setNetworkCost(0);
long is = innerSide.getSourcePact().getEstimatedOutputSize() * innerSide.getReplicationFactor();
int repFac = innerSide.getReplicationFactor();
long oc = outerSide.getSourcePact().getEstimatedNumRecords() * outerSide.getReplicationFactor();
// check whether the inner side can be cached
if (is < (bufferSize * repFac)) {
is = 0;
}
costs.setSecondaryStorageCost(is >= 0 && oc >= 0 ? oc * is : -1);
}
/*
* (non-Javadoc)
* @see eu.stratosphere.pact.compiler.costs.CostEstimator#getBlockNestedLoopsCosts(
* eu.stratosphere.pact.compiler.plan.OptimizerNode,
* eu.stratosphere.pact.compiler.plan.PactConnection,
* eu.stratosphere.pact.compiler.plan.PactConnection,
* int,
* eu.stratosphere.pact.compiler.Costs)
*/
@Override
public void getBlockNestedLoopsCosts(OptimizerNode node, PactConnection outerSide, PactConnection innerSide,
int blockSize, Costs costs)
{
costs.setNetworkCost(0);
long is = innerSide.getSourcePact().getEstimatedOutputSize() * innerSide.getReplicationFactor();
long os = outerSide.getSourcePact().getEstimatedOutputSize() * outerSide.getReplicationFactor();
long loops = Math.max(os < 0 ? 1000 : os / blockSize, 1);
costs.setSecondaryStorageCost(is == -1 ? -1 : loops * is);
}
}
......@@ -25,7 +25,6 @@ import eu.stratosphere.pact.common.contract.GenericDataSink;
import eu.stratosphere.pact.common.contract.Ordering;
import eu.stratosphere.pact.common.plan.Visitor;
import eu.stratosphere.pact.compiler.CompilerException;
import eu.stratosphere.pact.compiler.Costs;
import eu.stratosphere.pact.compiler.DataStatistics;
import eu.stratosphere.pact.compiler.PartitionProperty;
import eu.stratosphere.pact.compiler.costs.CostEstimator;
......@@ -228,45 +227,36 @@ public class DataSinkNode extends OptimizerNode {
// range partitioned only or global sort
// in both cases create a range partitioned only IP
InterestingProperties partitioningProps = new InterestingProperties();
partitioningProps.getGlobalProperties().setPartitioning(PartitionProperty.RANGE_PARTITIONED, partitioning);
estimator.getRangePartitionCost(this.input, partitioningProps.getMaximalCosts());
estimator.addRangePartitionCost(this.input, partitioningProps.getMaximalCosts());
this.input.addInterestingProperties(partitioningProps);
} else if (localOrder == null) {
this.input.setNoInterestingProperties();
}
if (localOrder != null) {
if (partitioning != null && localOrder.equals(partitioning)) {
// global sort case: create IP for range partitioned and sorted
InterestingProperties globalSortProps = new InterestingProperties();
globalSortProps.getGlobalProperties().setPartitioning(PartitionProperty.RANGE_PARTITIONED, partitioning);
estimator.getRangePartitionCost(this.input, globalSortProps.getMaximalCosts());
estimator.addRangePartitionCost(this.input, globalSortProps.getMaximalCosts());
globalSortProps.getLocalProperties().setOrdering(partitioning);
estimator.addLocalSortCost(this.input, -1, globalSortProps.getMaximalCosts());
Costs sortCosts = new Costs();
estimator.getLocalSortCost(this, this.input, sortCosts);
globalSortProps.getMaximalCosts().addCosts(sortCosts);
this.input.addInterestingProperties(globalSortProps);
} else {
// local order only
InterestingProperties localSortProps = new InterestingProperties();
localSortProps.getLocalProperties().setOrdering(partitioning);
estimator.addLocalSortCost(this.input, -1, localSortProps.getMaximalCosts());
this.input.addInterestingProperties(localSortProps);
}
} else {
Ordering localOrdering = getPactContract().getLocalOrder();
if (localOrdering != null && localOrdering.equals(partitioning)) {
InterestingProperties i = partitioningProps.clone();
i.getLocalProperties().setOrdering(partitioning);
this.input.addInterestingProperties(i);
}
}
else if (getPactContract().getLocalOrder() != null) {
InterestingProperties i = new InterestingProperties();
i.getLocalProperties().setOrdering(getPactContract().getLocalOrder());
estimator.getLocalSortCost(this, this.input, i.getMaximalCosts());
this.input.addInterestingProperties(i);
} else {
this.input.setNoInterestingProperties();
}
}
......
/***********************************************************************************************************************
*
* Copyright (C) 2012 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.pact.compiler.plan;
import java.util.Map;
import eu.stratosphere.pact.common.util.FieldSet;
/**
*
*
* @author Stephan Ewen
*/
public interface EstimateProvider
{
/**
* Gets the estimated output size from this node.
*
* @return The estimated output size.
*/
long getEstimatedOutputSize();
/**
* Gets the estimated number of records in the output of this node.
*
* @return The estimated number of records.
*/
long getEstimatedNumRecords();
Map<FieldSet, Long> getEstimatedCardinalities();
long getEstimatedCardinality(FieldSet cP);
}
......@@ -98,17 +98,6 @@ public class InterestingProperties implements Cloneable
return globalProps;
}
/**
* Copies the maximal costs from the given costs object.
*
* @param c
* The costs object to copy.
*/
public void copyMaximalCosts(Costs c) {
this.maximalCosts.setNetworkCost(c.getNetworkCost());
this.maximalCosts.setSecondaryStorageCost(c.getSecondaryStorageCost());
}
/**
* Checks, if the given <tt>InterestingProperties</tt> object has the same properties as this one.
* This method is a lesser version of the <code>equals(...)</code> method, as it does not take the
......@@ -133,6 +122,22 @@ public class InterestingProperties implements Cloneable
public boolean isMetBy(PlanNode node) {
return globalProps.isMetBy(node.getGlobalProperties()) && localProps.isMetBy(node.getLocalProperties());
}
public InterestingProperties filterByCodeAnnotations(OptimizerNode node, int input) {
GlobalProperties gp = this.globalProps.filterByNodesConstantSet(node, input);
LocalProperties lp = this.localProps.filterByNodesConstantSet(node, input);
if (gp != this.globalProps || lp != this.localProps) {
if (gp == null && lp == null) {
return null;
} else {
return new InterestingProperties(this.maximalCosts,
gp == null ? new GlobalProperties() : gp, lp == null ? new LocalProperties() : lp);
}
} else {
return this;
}
}
// ------------------------------------------------------------------------
......@@ -156,29 +161,14 @@ public class InterestingProperties implements Cloneable
*/
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
InterestingProperties other = (InterestingProperties) obj;
if (globalProps == null) {
if (other.globalProps != null)
return false;
} else if (!globalProps.equals(other.globalProps))
if (obj != null && obj instanceof InterestingProperties) {
InterestingProperties other = (InterestingProperties) obj;
return this.globalProps.equals(other.globalProps) &&
this.localProps.equals(other.localProps) &&
this.maximalCosts.equals(other.maximalCosts);
} else {
return false;
if (localProps == null) {
if (other.localProps != null)
return false;
} else if (!localProps.equals(other.localProps))
return false;
if (maximalCosts == null) {
if (other.maximalCosts != null)
return false;
} else if (!maximalCosts.equals(other.maximalCosts))
return false;
return true;
}
}
/*
......@@ -196,11 +186,9 @@ public class InterestingProperties implements Cloneable
* @see java.lang.Object#clone()
*/
@Override
public InterestingProperties clone()
{
// return new InterestingProperties(maximalCosts.createCopy(),
// globalProps.createCopy(), localProps.createCopy());
return null;
public InterestingProperties clone() {
return new InterestingProperties(this.maximalCosts.clone(),
this.globalProps.clone(), this.localProps.createCopy());
}
// ------------------------------------------------------------------------
......@@ -219,28 +207,23 @@ public class InterestingProperties implements Cloneable
* @param toMerge
* The properties that is added / merged into the previous collection.
*/
public static void mergeUnionOfInterestingProperties(List<InterestingProperties> properties,
InterestingProperties toMerge) {
boolean subsumed = false;
public static void mergeUnionOfInterestingProperties(
List<InterestingProperties> properties, InterestingProperties toMerge)
{
// go through all existing property sets
for (InterestingProperties toCheck : properties) {
for (int i = 0; i < properties.size(); i++) {
InterestingProperties toCheck = properties.get(i);
if (toCheck.hasEqualProperties(toMerge)) {
subsumed = true;
// the properties are equal. keep the one with the higher maximal cost,
// because it indicates, that the properties are worth more.
if (toMerge.getMaximalCosts().compareTo(toCheck.getMaximalCosts()) > 0) {
toCheck.copyMaximalCosts(toMerge.getMaximalCosts());
properties.set(i, toMerge);
}
break;
return;
}
}
// if it was not subsumes, add it
if (!subsumed) {
properties.add(toMerge.clone());
}
// if it was not subsumed, add it
properties.add(toMerge.clone());
}
/**
......@@ -264,74 +247,31 @@ public class InterestingProperties implements Cloneable
mergeUnionOfInterestingProperties(properties, candidate);
}
}
// /**
// * Utility method that checks, how the given interesting properties that a node receives from its
// * successors, are relevant to its predecessors. That depends, of course, on the output contract,
// * as that determines which properties can be inferred to be preserved by the node. The returned
// * set will not contain interesting properties objects that are reduced to trivial properties,
// * i.e. where all properties have the default value, such as for example <i>none</i> for the
// * partitioning.
// *
// * @param props
// * The collection of interesting properties that a node receives from its successors.
// * @param contract
// * The output contract.
// * @return A collection with the interesting properties that are relevant with respect to the given output
// * contract. Contains the same objects as in the input set, with properties accordingly restricted.
// * Returns always a modifiable collection, even if no properties are preserved.
// */
// public static final List<InterestingProperties> filterByOutputContract(List<InterestingProperties> props,
// OutputContract contract) {
// // if the output contract is NONE, it basically destroys all properties,
// // as they always refer to the key, and the key is potentially switched
// if (contract == OutputContract.None) {
// return new ArrayList<InterestingProperties>();
// } else {
// List<InterestingProperties> preserved = new ArrayList<InterestingProperties>();
//
// // process all interesting properties
// for (InterestingProperties p : props) {
// boolean nonTrivial = p.getGlobalProperties().filterByOutputContract(contract);
// nonTrivial |= p.getLocalProperties().filterByOutputContract(contract);
//
// if (nonTrivial) {
// preserved.add(p);
// }
// }
//
// return preserved;
// }
// }
public static final List<InterestingProperties> createInterestingPropertiesForInput(List<InterestingProperties> props,
OptimizerNode node, int input) {
List<InterestingProperties> preserved = new ArrayList<InterestingProperties>();
public static final List<InterestingProperties> filterInterestingPropertiesForInput(
List<InterestingProperties> props, OptimizerNode node, int input)
{
List<InterestingProperties> preserved = null;
for (InterestingProperties p : props) {
// GlobalProperties preservedGp = p.getGlobalProperties().createCopy();
// LocalProperties preservedLp = p.getLocalProperties().createCopy();
// boolean nonTrivial = preservedGp.filterByNodesConstantSet(node, input);
// nonTrivial |= preservedLp.filterByNodesConstantSet(node, input);
final InterestingProperties filteredProps = p.filterByCodeAnnotations(node, input);
if (filteredProps == null) {
continue;
}
GlobalProperties preservedGp =
p.getGlobalProperties().createInterestingGlobalProperties(node, input);
LocalProperties preservedLp =
p.getLocalProperties().createInterestingLocalProperties(node, input);
if (preservedGp != null || preservedLp != null) {
if (preservedGp == null) {
preservedGp = new GlobalProperties();
}
if (preservedLp == null) {
preservedLp = new LocalProperties();
}
InterestingProperties newIp = new InterestingProperties(p.getMaximalCosts().clone(), preservedGp, preservedLp);
mergeUnionOfInterestingProperties(preserved, newIp);
final GlobalProperties topDownAdjustedGP = filteredProps.globalProps.createInterestingGlobalPropertiesTopDownSubset(node, input);
if (topDownAdjustedGP == null && filteredProps.localProps.isTrivial())
continue;
if (preserved == null) {
preserved = new ArrayList<InterestingProperties>();
}
final InterestingProperties toAdd = topDownAdjustedGP == filteredProps.getGlobalProperties() ? filteredProps :
new InterestingProperties(filteredProps.getMaximalCosts(), topDownAdjustedGP, filteredProps.localProps);
mergeUnionOfInterestingProperties(preserved, toAdd);
}
return preserved;
}
}
......@@ -24,7 +24,7 @@ import eu.stratosphere.pact.runtime.task.util.TaskConfig.LocalStrategy;
/**
* The Optimizer representation of a <i>Map</i> contract node.
*
* @author Stephan Ewen (stephan.ewen@tu-berlin.de)
* @author Stephan Ewen
*/
public class MapNode extends SingleInputNode {
......@@ -100,13 +100,12 @@ public class MapNode extends SingleInputNode {
// if so, propagate to the child.
List<InterestingProperties> thisNodesIntProps = getInterestingProperties();
List<InterestingProperties> props = InterestingProperties.createInterestingPropertiesForInput(thisNodesIntProps,
this, 0);
List<InterestingProperties> props = InterestingProperties.filterInterestingPropertiesForInput(thisNodesIntProps, this, 0);
if (!props.isEmpty()) {
this.inConn.addAllInterestingProperties(props);
} else {
if (props.isEmpty()) {
this.inConn.setNoInterestingProperties();
} else {
this.inConn.addAllInterestingProperties(props);
}
}
......
......@@ -45,7 +45,7 @@ import eu.stratosphere.pact.runtime.task.util.TaskConfig.LocalStrategy;
* @author Stephan Ewen
* @author Fabian Hueske
*/
public abstract class OptimizerNode implements Visitable<OptimizerNode>
public abstract class OptimizerNode implements Visitable<OptimizerNode>, EstimateProvider
{
// ------------------------------------------------------------------------
// Members
......
......@@ -19,7 +19,9 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import eu.stratosphere.pact.common.util.FieldSet;
import eu.stratosphere.pact.compiler.CompilerException;
import eu.stratosphere.pact.compiler.GlobalProperties;
import eu.stratosphere.pact.compiler.LocalProperties;
......@@ -33,7 +35,7 @@ import eu.stratosphere.pact.runtime.shipping.ShipStrategy.ShipStrategyType;
* The connections are also used by the optimization algorithm to propagate interesting properties from the sinks in the
* direction of the sources.
*/
public class PactConnection
public class PactConnection implements EstimateProvider
{
private final OptimizerNode sourcePact; // The source node of the connection
......@@ -223,6 +225,42 @@ public class PactConnection
return PactConnection.getLocalPropertiesAfterConnection(this.sourcePact, this.targetPact, this.shipStrategy);
}
// --------------------------------------------------------------------------------------------
/* (non-Javadoc)
* @see eu.stratosphere.pact.compiler.plan.EstimateProvider#getEstimatedOutputSize()
*/
@Override
public long getEstimatedOutputSize() {
return this.sourcePact.getEstimatedOutputSize();
}
/* (non-Javadoc)
* @see eu.stratosphere.pact.compiler.plan.EstimateProvider#getEstimatedNumRecords()
*/
@Override
public long getEstimatedNumRecords() {
return this.sourcePact.getEstimatedNumRecords();
}
/* (non-Javadoc)
* @see eu.stratosphere.pact.compiler.plan.EstimateProvider#getEstimatedCardinalities()
*/
@Override
public Map<FieldSet, Long> getEstimatedCardinalities() {
return this.sourcePact.getEstimatedCardinalities();
}
/* (non-Javadoc)
* @see eu.stratosphere.pact.compiler.plan.EstimateProvider#getEstimatedCardinality(eu.stratosphere.pact.common.util.FieldSet)
*/
@Override
public long getEstimatedCardinality(FieldSet cP) {
return this.sourcePact.getEstimatedCardinality(cP);
}
// --------------------------------------------------------------------------------------------
/*
* (non-Javadoc)
* @see java.lang.Object#toString()
......@@ -348,5 +386,4 @@ public class PactConnection
// return lp;
return null;
}
}
\ No newline at end of file
......@@ -166,39 +166,25 @@ public class ReduceNode extends SingleInputNode {
public void computeInterestingPropertiesForInputs(CostEstimator estimator) {
// check, if there is an output contract that tells us that certain properties are preserved.
// if so, propagate to the child.
List<InterestingProperties> thisNodesIntProps = getInterestingProperties();
List<InterestingProperties> props = InterestingProperties.createInterestingPropertiesForInput(thisNodesIntProps,
this, 0);
List<InterestingProperties> inheritedIntProps = getInterestingProperties();
List<InterestingProperties> props =
InterestingProperties.filterInterestingPropertiesForInput(inheritedIntProps, this, 0);
// add the first interesting properties: partitioned and grouped
InterestingProperties ip1 = new InterestingProperties();
ip1.getGlobalProperties().setPartitioning(PartitionProperty.ANY, (FieldList)this.keyList.clone());
ip1.getLocalProperties().setGroupedFields(new FieldSet(this.keyList));
ip1.getMaximalCosts().setNetworkCost(0);
ip1.getMaximalCosts().setSecondaryStorageCost(0);
Costs cost = new Costs();
estimator.getHashPartitioningCost(this.inConn, cost);
ip1.getMaximalCosts().addCosts(cost);
cost = new Costs();
estimator.getLocalSortCost(this, this.inConn, cost);
ip1.getMaximalCosts().addCosts(cost);
ip1.getGlobalProperties().setPartitioning(PartitionProperty.ANY, this.keys);
ip1.getLocalProperties().setGroupedFields(this.keys);
estimator.addHashPartitioningCost(this.inConn, ip1.getMaximalCosts());
estimator.addLocalSortCost(this.inConn, -1, ip1.getMaximalCosts());
// add the second interesting properties: partitioned only
InterestingProperties ip2 = new InterestingProperties();
ip2.getGlobalProperties().setPartitioning(PartitionProperty.ANY, (FieldList)this.keyList.clone());
ip2.getMaximalCosts().setNetworkCost(0);
ip2.getMaximalCosts().setSecondaryStorageCost(0);
estimator.getHashPartitioningCost(this.inConn, cost);
ip2.getMaximalCosts().addCosts(cost);
ip2.getGlobalProperties().setPartitioning(PartitionProperty.ANY, this.keys);
estimator.addHashPartitioningCost(this.inConn, ip2.getMaximalCosts());
InterestingProperties.mergeUnionOfInterestingProperties(props, ip1);
InterestingProperties.mergeUnionOfInterestingProperties(props, ip2);
inConn.addAllInterestingProperties(props);
this.inConn.addAllInterestingProperties(props);
}
@Override
......
......@@ -27,8 +27,11 @@ import eu.stratosphere.pact.common.contract.SingleInputContract;
import eu.stratosphere.pact.common.plan.Visitor;
import eu.stratosphere.pact.common.stubs.StubAnnotation.ConstantFields;
import eu.stratosphere.pact.common.stubs.StubAnnotation.ConstantFieldsExcept;
import eu.stratosphere.pact.common.type.Key;
import eu.stratosphere.pact.common.util.FieldSet;
import eu.stratosphere.pact.compiler.ColumnWithType;
import eu.stratosphere.pact.compiler.CompilerException;
import eu.stratosphere.pact.compiler.OptimizerFieldSet;
import eu.stratosphere.pact.compiler.PactCompiler;
import eu.stratosphere.pact.compiler.costs.CostEstimator;
import eu.stratosphere.pact.runtime.shipping.ShipStrategy.ShipStrategyType;
......@@ -49,7 +52,7 @@ public abstract class SingleInputNode extends OptimizerNode {
protected FieldSet constantSet; // set of fields that are left unchanged by the stub
protected FieldSet notConstantSet; // set of fields that are changed by the stub
protected final FieldSet keyList; // The set of key fields
protected final OptimizerFieldSet keys; // The set of key fields
// ------------------------------
......@@ -61,7 +64,15 @@ public abstract class SingleInputNode extends OptimizerNode {
*/
public SingleInputNode(SingleInputContract<?> pactContract) {
super(pactContract);
this.keyList = new FieldSet(pactContract.getKeyColumnNumbers(0));
this.keys = new OptimizerFieldSet();
int[] keyPos = pactContract.getKeyColumnNumbers(0);
Class<? extends Key>[] keyTypes = pactContract.getKeyClasses();
if (keyPos != null) {
for (int i = 0; i < keyPos.length; i++) {
this.keys.add(new ColumnWithType(keyPos[i], keyTypes[i]));
}
}
}
// /**
......@@ -198,8 +209,8 @@ public abstract class SingleInputNode extends OptimizerNode {
*
* @return The key fields of this optimizer node.
*/
public FieldSet getKeySet() {
return this.keyList;
public OptimizerFieldSet getKeySet() {
return this.keys;
}
// --------------------------------------------------------------------------------------------
......
......@@ -35,6 +35,7 @@ import eu.stratosphere.pact.compiler.CompilerException;
import eu.stratosphere.pact.compiler.Costs;
import eu.stratosphere.pact.compiler.GlobalProperties;
import eu.stratosphere.pact.compiler.LocalProperties;
import eu.stratosphere.pact.compiler.OptimizerFieldSet;
import eu.stratosphere.pact.compiler.PactCompiler;
import eu.stratosphere.pact.compiler.costs.CostEstimator;
import eu.stratosphere.pact.runtime.shipping.ShipStrategy.BroadcastSS;
......@@ -45,19 +46,19 @@ import eu.stratosphere.pact.runtime.shipping.ShipStrategy.PartitionHashSS;
* A node in the optimizer plan that represents a PACT with a two different inputs, such as MATCH or CROSS.
* The two inputs are not substitutable in their sides.
*
* @author Stephan Ewen (stephan.ewen@tu-berlin.de)
* @author Stephan Ewen
*/
public abstract class TwoInputNode extends OptimizerNode
{
private List<OptimizerNode> cachedPlans; // a cache for the computed alternative plans
protected PactConnection input1 = null; // The first input edge
protected PactConnection input1; // The first input edge
protected PactConnection input2 = null; // The second input edge
protected PactConnection input2; // The second input edge
protected FieldList keySet1; // The set of key fields for the first input (order is relevant!)
protected OptimizerFieldSet keySet1; // The set of key fields for the first input
protected FieldList keySet2; // The set of key fields for the second input (order is relevant!)
protected OptimizerFieldSet keySet2; // The set of key fields for the second input
// ------------- Stub Annotations
......@@ -78,83 +79,83 @@ public abstract class TwoInputNode extends OptimizerNode
public TwoInputNode(DualInputContract<?> pactContract) {
super(pactContract);
this.keySet1 = new FieldList(pactContract.getKeyColumnNumbers(0));
this.keySet2 = new FieldList(pactContract.getKeyColumnNumbers(1));
this.keySet1 = new OptimizerFieldSet();
this.keySet2 = new OptimizerFieldSet();
}
/**
* Copy constructor to create a copy of a node with different predecessors. The predecessors
* is assumed to be of the same type as in the template node and merely copies with different
* strategies, as they are created in the process of the plan enumeration.
*
* @param template
* The node to create a copy of.
* @param pred1
* The new predecessor for the first input.
* @param pred2
* The new predecessor for the second input.
* @param conn1
* The old connection of the first input to copy properties from.
* @param conn2
* The old connection of the second input to copy properties from.
* @param globalProps
* The global properties of this copy.
* @param localProps
* The local properties of this copy.
*/
protected TwoInputNode(TwoInputNode template, OptimizerNode pred1, OptimizerNode pred2, PactConnection conn1,
PactConnection conn2, GlobalProperties globalProps, LocalProperties localProps)
{
super(template, globalProps, localProps);
this.constant1 = template.constant1;
this.constant2 = template.constant2;
this.keySet1 = template.keySet1;
this.keySet2 = template.keySet2;
if(pred1 != null) {
this.input1 = new PactConnection(conn1, pred1, this);
}
if(pred2 != null) {
this.input2 = new PactConnection(conn2, pred2, this);
}
// merge the branchPlan maps according the the template's uncloseBranchesStack
if (template.openBranches != null)
{
if (this.branchPlan == null) {
this.branchPlan = new HashMap<OptimizerNode, OptimizerNode>(8);
}
for (UnclosedBranchDescriptor uc : template.openBranches) {
OptimizerNode brancher = uc.branchingNode;
OptimizerNode selectedCandidate = null;
if(pred1 != null) {
if(pred1.branchPlan != null) {
// predecessor 1 has branching children, see if it got the branch we are looking for
selectedCandidate = pred1.branchPlan.get(brancher);
this.branchPlan.put(brancher, selectedCandidate);
}
}
if(selectedCandidate == null && pred2 != null) {
if(pred2.branchPlan != null) {
// predecessor 2 has branching children, see if it got the branch we are looking for
selectedCandidate = pred2.branchPlan.get(brancher);
this.branchPlan.put(brancher, selectedCandidate);
}
}
if (selectedCandidate == null) {
throw new CompilerException(
"Candidates for a node with open branches are missing information about the selected candidate ");
}
}
}
}
// /**
// * Copy constructor to create a copy of a node with different predecessors. The predecessors
// * is assumed to be of the same type as in the template node and merely copies with different
// * strategies, as they are created in the process of the plan enumeration.
// *
// * @param template
// * The node to create a copy of.
// * @param pred1
// * The new predecessor for the first input.
// * @param pred2
// * The new predecessor for the second input.
// * @param conn1
// * The old connection of the first input to copy properties from.
// * @param conn2
// * The old connection of the second input to copy properties from.
// * @param globalProps
// * The global properties of this copy.
// * @param localProps
// * The local properties of this copy.
// */
// protected TwoInputNode(TwoInputNode template, OptimizerNode pred1, OptimizerNode pred2, PactConnection conn1,
// PactConnection conn2, GlobalProperties globalProps, LocalProperties localProps)
// {
// super(template, globalProps, localProps);
//
// this.constant1 = template.constant1;
// this.constant2 = template.constant2;
// this.keySet1 = template.keySet1;
// this.keySet2 = template.keySet2;
//
// if(pred1 != null) {
// this.input1 = new PactConnection(conn1, pred1, this);
// }
//
// if(pred2 != null) {
// this.input2 = new PactConnection(conn2, pred2, this);
// }
//
// // merge the branchPlan maps according the the template's uncloseBranchesStack
// if (template.openBranches != null)
// {
// if (this.branchPlan == null) {
// this.branchPlan = new HashMap<OptimizerNode, OptimizerNode>(8);
// }
//
// for (UnclosedBranchDescriptor uc : template.openBranches) {
// OptimizerNode brancher = uc.branchingNode;
// OptimizerNode selectedCandidate = null;
//
// if(pred1 != null) {
// if(pred1.branchPlan != null) {
// // predecessor 1 has branching children, see if it got the branch we are looking for
// selectedCandidate = pred1.branchPlan.get(brancher);
// this.branchPlan.put(brancher, selectedCandidate);
// }
// }
//
// if(selectedCandidate == null && pred2 != null) {
// if(pred2.branchPlan != null) {
// // predecessor 2 has branching children, see if it got the branch we are looking for
// selectedCandidate = pred2.branchPlan.get(brancher);
// this.branchPlan.put(brancher, selectedCandidate);
// }
// }
//
// if (selectedCandidate == null) {
// throw new CompilerException(
// "Candidates for a node with open branches are missing information about the selected candidate ");
// }
// }
// }
// }
// ------------------------------------------------------------------------
......@@ -176,26 +177,6 @@ public abstract class TwoInputNode extends OptimizerNode
public PactConnection getSecondInConn() {
return this.input2;
}
/**
* Sets the <tt>PactConnection</tt> through which this node receives its <i>first</i> input.
*
* @param conn
* The first input connection.
*/
public void setFirstInConn(PactConnection conn) {
this.input1 = conn;
}
/**
* Sets the <tt>PactConnection</tt> through which this node receives its <i>second</i> input.
*
* @param conn
* The second input connection.
*/
public void setSecondInConn(PactConnection conn) {
this.input2 = conn;
}
/**
* TODO
......@@ -326,27 +307,27 @@ public abstract class TwoInputNode extends OptimizerNode
*/
@Override
final public List<OptimizerNode> getAlternativePlans(CostEstimator estimator) {
// check if we have a cached version
if (this.cachedPlans != null) {
return this.cachedPlans;
}
// step down to all producer nodes for first input and calculate alternative plans
List<? extends OptimizerNode> subPlans1 = this.getFirstPredNode().getAlternativePlans(estimator);
// step down to all producer nodes for second input and calculate alternative plans
List<? extends OptimizerNode> subPlans2 = this.getSecondPredNode().getAlternativePlans(estimator);
// // check if we have a cached version
// if (this.cachedPlans != null) {
// return this.cachedPlans;
// }
//
// // step down to all producer nodes for first input and calculate alternative plans
// List<? extends OptimizerNode> subPlans1 = this.getFirstPredNode().getAlternativePlans(estimator);
//
// // step down to all producer nodes for second input and calculate alternative plans
// List<? extends OptimizerNode> subPlans2 = this.getSecondPredNode().getAlternativePlans(estimator);
List<OptimizerNode> outputPlans = new ArrayList<OptimizerNode>();
computeValidPlanAlternatives(subPlans1, subPlans2, estimator, outputPlans);
// prune the plans
prunePlanAlternatives(outputPlans);
// cache the result only if we have multiple outputs --> this function gets invoked multiple times
if (this.getOutConns() != null && this.getOutConns().size() > 1) {
this.cachedPlans = outputPlans;
}
// computeValidPlanAlternatives(subPlans1, subPlans2, estimator, outputPlans);
//
// // prune the plans
// prunePlanAlternatives(outputPlans);
//
// // cache the result only if we have multiple outputs --> this function gets invoked multiple times
// if (this.getOutConns() != null && this.getOutConns().size() > 1) {
// this.cachedPlans = outputPlans;
// }
return outputPlans;
}
......@@ -371,11 +352,7 @@ public abstract class TwoInputNode extends OptimizerNode
* @return {@code true} if all values are valid, {@code false} otherwise
*/
protected boolean haveValidOutputEstimates(OptimizerNode subPlan) {
if(subPlan.getEstimatedOutputSize() == -1)
return false;
else
return true;
return subPlan.getEstimatedOutputSize() != -1;
}
/*
......@@ -504,35 +481,40 @@ public abstract class TwoInputNode extends OptimizerNode
* @param input The input for which key fields must be returned.
* @return the key fields of the given input.
*/
public FieldList getInputKeySet(int input) {
public OptimizerFieldSet getInputKeySet(int input) {
switch(input) {
case 0: return keySet1;
case 1: return keySet2;
default: throw new IndexOutOfBoundsException();
case 0: return keySet1;
case 1: return keySet2;
default: throw new IndexOutOfBoundsException();
}
}
public boolean isFieldKept(int input, int fieldNumber) {
/* (non-Javadoc)
* @see eu.stratosphere.pact.compiler.plan.OptimizerNode#isFieldConstant(int, int)
*/
@Override
public boolean isFieldConstant(int input, int fieldNumber) {
switch(input) {
case 0:
if (this.constant1 == null) {
if (this.notConstant1 == null) {
return false;
} else {
return !this.notConstant1.contains(fieldNumber);
}
return this.notConstant1.contains(fieldNumber) == false;
} else {
return this.constant1.contains(fieldNumber);
}
return this.constant1.contains(fieldNumber);
case 1:
if (this.constant2 == null) {
if (this.notConstant2 == null) {
return false;
} else {
return !this.notConstant2.contains(fieldNumber);
}
return this.notConstant2.contains(fieldNumber) == false;
} else {
return this.constant2.contains(fieldNumber);
}
return this.constant2.contains(fieldNumber);
default:
throw new IndexOutOfBoundsException();
}
......
......@@ -15,13 +15,16 @@
package eu.stratosphere.pact.compiler.plan.candidate;
import eu.stratosphere.pact.compiler.plan.EstimateProvider;
import eu.stratosphere.pact.runtime.shipping.ShipStrategy.ShipStrategyType;
/**
*
*
* @author Stephan Ewen
*/
public class Channel
public class Channel implements EstimateProvider
{
/**
* Enumeration to indicate the mode of temporarily materializing the data that flows across a connection.
......@@ -38,6 +41,8 @@ public class Channel
private final PlanNode target = null;
private ShipStrategyType shipStrategy;
private TempMode tempMode;
private int replicationFactor;
......@@ -50,7 +55,7 @@ public class Channel
* @return The source.
*/
public PlanNode getSource() {
return source;
return this.source;
}
/**
......@@ -59,7 +64,11 @@ public class Channel
* @return The target.
*/
public PlanNode getTarget() {
return target;
return this.target;
}
public ShipStrategyType getShipStrategy() {
return this.shipStrategy;
}
/**
......
......@@ -154,10 +154,20 @@ public abstract class PlanNode implements Visitable<PlanNode>
}
}
public int getDegreeOfParallelism() {
return 1;
}
public long getTotalAvailableMemory() {
return 3 * 1024 * 1024 * getDegreeOfParallelism(); // mock: 3 MiBytes per task
}
// --------------------------------------------------------------------------------------------
//
// --------------------------------------------------------------------------------------------
public abstract Iterator<Channel> getInputs();
public abstract Iterator<PlanNode> getPredecessors();
// --------------------------------------------------------------------------------------------
......
......@@ -41,7 +41,7 @@ import eu.stratosphere.pact.common.contract.MatchContract;
import eu.stratosphere.pact.common.contract.ReduceContract;
import eu.stratosphere.pact.common.plan.Plan;
import eu.stratosphere.pact.common.type.base.PactInteger;
import eu.stratosphere.pact.compiler.costs.FixedSizeClusterCostEstimator;
import eu.stratosphere.pact.compiler.costs.DefaultCostEstimator;
import eu.stratosphere.pact.compiler.jobgen.JobGraphGenerator;
import eu.stratosphere.pact.compiler.plan.OptimizedPlan;
import eu.stratosphere.pact.compiler.util.DummyCoGroupStub;
......@@ -83,7 +83,7 @@ public class BranchingPlansCompilerTest {
// prepare the statistics
DataStatistics dataStats = new DataStatistics();
this.compiler = new PactCompiler(dataStats, new FixedSizeClusterCostEstimator(), dummyAddress);
this.compiler = new PactCompiler(dataStats, new DefaultCostEstimator(), dummyAddress);
}
catch (Exception ex) {
ex.printStackTrace();
......
......@@ -36,7 +36,7 @@ import eu.stratosphere.pact.common.contract.ReduceContract;
import eu.stratosphere.pact.common.plan.Plan;
import eu.stratosphere.pact.common.plan.Visitor;
import eu.stratosphere.pact.common.type.base.PactInteger;
import eu.stratosphere.pact.compiler.costs.FixedSizeClusterCostEstimator;
import eu.stratosphere.pact.compiler.costs.DefaultCostEstimator;
import eu.stratosphere.pact.compiler.jobgen.JobGraphGenerator;
import eu.stratosphere.pact.compiler.plan.MatchNode;
import eu.stratosphere.pact.compiler.plan.OptimizedPlan;
......@@ -75,7 +75,7 @@ public class PartitionDOPChangeTest {
// prepare the statistics
DataStatistics dataStats = new DataStatistics();
this.compiler = new PactCompiler(dataStats, new FixedSizeClusterCostEstimator(), dummyAddress);
this.compiler = new PactCompiler(dataStats, new DefaultCostEstimator(), dummyAddress);
}
catch (Exception ex) {
ex.printStackTrace();
......
......@@ -35,7 +35,7 @@ import eu.stratosphere.pact.common.contract.ReduceContract;
import eu.stratosphere.pact.common.plan.Plan;
import eu.stratosphere.pact.common.plan.Visitor;
import eu.stratosphere.pact.common.type.base.PactInteger;
import eu.stratosphere.pact.compiler.costs.FixedSizeClusterCostEstimator;
import eu.stratosphere.pact.compiler.costs.DefaultCostEstimator;
import eu.stratosphere.pact.compiler.jobgen.JobGraphGenerator;
import eu.stratosphere.pact.compiler.plan.OptimizedPlan;
import eu.stratosphere.pact.compiler.plan.OptimizerNode;
......@@ -73,7 +73,7 @@ public class UnionPropertyPropagationTest {
// prepare the statistics
DataStatistics dataStats = new DataStatistics();
this.compiler = new PactCompiler(dataStats, new FixedSizeClusterCostEstimator(), dummyAddress);
this.compiler = new PactCompiler(dataStats, new DefaultCostEstimator(), dummyAddress);
}
catch (Exception ex) {
ex.printStackTrace();
......
......@@ -29,7 +29,7 @@ import eu.stratosphere.nephele.util.StringUtils;
import eu.stratosphere.pact.common.plan.Plan;
import eu.stratosphere.pact.compiler.DataStatistics;
import eu.stratosphere.pact.compiler.PactCompiler;
import eu.stratosphere.pact.compiler.costs.FixedSizeClusterCostEstimator;
import eu.stratosphere.pact.compiler.costs.DefaultCostEstimator;
import eu.stratosphere.pact.compiler.jobgen.JobGraphGenerator;
import eu.stratosphere.pact.compiler.plan.OptimizedPlan;
import eu.stratosphere.sopremo.execution.ExecutionResponse.ExecutionState;
......@@ -154,7 +154,7 @@ public class SopremoExecutionThread implements Runnable {
JobGraph getJobGraph(final Plan pactPlan) {
PactCompiler compiler =
new PactCompiler(new DataStatistics(), new FixedSizeClusterCostEstimator(), this.jobManagerAddress);
new PactCompiler(new DataStatistics(), new DefaultCostEstimator(), this.jobManagerAddress);
final OptimizedPlan optPlan = compiler.compile(pactPlan);
JobGraphGenerator gen = new JobGraphGenerator();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册