diff --git a/pact/pact-clients/src/main/java/eu/stratosphere/pact/client/nephele/api/Client.java b/pact/pact-clients/src/main/java/eu/stratosphere/pact/client/nephele/api/Client.java index 2544ad7e0a46fa6fcea7ec7e40600c465b8b850a..40e31565621dfe10424d46725d96ca62b38fce1f 100644 --- a/pact/pact-clients/src/main/java/eu/stratosphere/pact/client/nephele/api/Client.java +++ b/pact/pact-clients/src/main/java/eu/stratosphere/pact/client/nephele/api/Client.java @@ -31,7 +31,7 @@ import eu.stratosphere.nephele.jobgraph.JobGraph; import eu.stratosphere.pact.compiler.CompilerException; import eu.stratosphere.pact.compiler.DataStatistics; import eu.stratosphere.pact.compiler.PactCompiler; -import eu.stratosphere.pact.compiler.costs.FixedSizeClusterCostEstimator; +import eu.stratosphere.pact.compiler.costs.DefaultCostEstimator; import eu.stratosphere.pact.compiler.jobgen.JSONGenerator; import eu.stratosphere.pact.compiler.jobgen.JobGraphGenerator; import eu.stratosphere.pact.compiler.plan.OptimizedPlan; @@ -63,7 +63,7 @@ public class Client { nepheleConfig.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, jobManagerAddress.getAddress().getHostAddress()); nepheleConfig.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerAddress.getPort()); - this.compiler = new PactCompiler(new DataStatistics(), new FixedSizeClusterCostEstimator(), jobManagerAddress); + this.compiler = new PactCompiler(new DataStatistics(), new DefaultCostEstimator(), jobManagerAddress); } /** @@ -87,7 +87,7 @@ public class Client { } final InetSocketAddress jobManagerAddress = new InetSocketAddress(address, port); - this.compiler = new PactCompiler(new DataStatistics(), new FixedSizeClusterCostEstimator(), jobManagerAddress); + this.compiler = new PactCompiler(new DataStatistics(), new DefaultCostEstimator(), jobManagerAddress); } diff --git a/pact/pact-clients/src/main/java/eu/stratosphere/pact/testing/TestPlan.java b/pact/pact-clients/src/main/java/eu/stratosphere/pact/testing/TestPlan.java index 255424fd195ec6541cab48744922b8091bf73031..7a8f4cdc981a52dbd46dd38b629601f14dcbd185 100644 --- a/pact/pact-clients/src/main/java/eu/stratosphere/pact/testing/TestPlan.java +++ b/pact/pact-clients/src/main/java/eu/stratosphere/pact/testing/TestPlan.java @@ -77,7 +77,7 @@ import eu.stratosphere.pact.common.type.Value; import eu.stratosphere.pact.common.util.PactConfigConstants; import eu.stratosphere.pact.compiler.CompilerException; import eu.stratosphere.pact.compiler.PactCompiler; -import eu.stratosphere.pact.compiler.costs.FixedSizeClusterCostEstimator; +import eu.stratosphere.pact.compiler.costs.DefaultCostEstimator; import eu.stratosphere.pact.compiler.jobgen.JobGraphGenerator; import eu.stratosphere.pact.compiler.plan.OptimizedPlan; import eu.stratosphere.pact.compiler.plan.OptimizerNode; @@ -1075,7 +1075,7 @@ public class TestPlan implements Closeable { } } - private static final class CostEstimator extends FixedSizeClusterCostEstimator { + private static final class CostEstimator extends DefaultCostEstimator { private CostEstimator() { super(); } diff --git a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/Costs.java b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/Costs.java index 112c90175741053eaaa2dac72b208060fb199a54..3633e0f7ffcd76f87ed6bdd01158bf8bdd6d23aa 100644 --- a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/Costs.java +++ b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/Costs.java @@ -68,6 +68,18 @@ public class Costs implements Comparable, Cloneable { public void setNetworkCost(long bytes) { this.networkCost = bytes; } + + /** + * Adds the costs for network to the current network costs + * for this Costs object. + * + * @param bytes + * The network cost to add, in bytes to be transferred. + */ + public void addNetworkCost(long bytes) { + this.networkCost = + (this.networkCost < 0 || bytes < 0) ? -1 : this.networkCost + bytes; + } /** * Gets the costs for secondary storage. @@ -87,6 +99,18 @@ public class Costs implements Comparable, Cloneable { public void setSecondaryStorageCost(long bytes) { this.secondaryStorageCost = bytes; } + + /** + * Adds the costs for secondary storage to the current secondary storage costs + * for this Costs object. + * + * @param bytes + * The secondary storage cost to add, in bytes to be written and read. + */ + public void addSecondaryStorageCost(long bytes) { + this.secondaryStorageCost = + (this.secondaryStorageCost < 0 || bytes < 0) ? -1 : this.secondaryStorageCost + bytes; + } /** * Adds the given costs to these costs. If for one of the different cost components (network, secondary storage), diff --git a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/DataStatistics.java b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/DataStatistics.java index 334c8e1d0baec9b561fae843c6cd1841d8ede18f..f0d50505339873ad614f1374b3bad57718b4390a 100644 --- a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/DataStatistics.java +++ b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/DataStatistics.java @@ -27,7 +27,7 @@ import eu.stratosphere.pact.common.io.statistics.BaseStatistics; *

* This class is thread safe. * - * @author Stephan Ewen (stephan.ewen@tu-berlin.de) + * @author Stephan Ewen */ public class DataStatistics { @@ -38,8 +38,7 @@ public class DataStatistics /** * Creates a new statistics object, with an empty cache. */ - public DataStatistics() - { + public DataStatistics() { this.baseStatisticsCache = new HashMap(); } diff --git a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/GlobalProperties.java b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/GlobalProperties.java index 83b0a903f90a5030e030db612a19a4a3930c135f..70cdeff093e484faa6d3c9810464a2f195d968ee 100644 --- a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/GlobalProperties.java +++ b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/GlobalProperties.java @@ -15,19 +15,22 @@ package eu.stratosphere.pact.compiler; +import eu.stratosphere.pact.common.contract.Order; import eu.stratosphere.pact.common.contract.Ordering; -import eu.stratosphere.pact.common.util.FieldList; import eu.stratosphere.pact.compiler.plan.OptimizerNode; /** * This class represents global properties of the data. Global properties are properties that * describe data across different partitions. + *

+ * Currently, the properties are the following: A partitioning type (ANY, HASH, RANGE), and EITHER an ordering (for range partitioning) + * or an FieldSet with the hash partitioning columns. */ public final class GlobalProperties implements Cloneable { private PartitionProperty partitioning; // the type partitioning - private OptimizerFieldSet partitioningFields; + private OptimizerFieldSet partitioningFields; // the fields which are partitioned private Ordering ordering; // order of the partitioned fields, if it is an ordered (range) range partitioning @@ -47,7 +50,15 @@ public final class GlobalProperties implements Cloneable public GlobalProperties(PartitionProperty partitioning, Ordering ordering) { this.partitioning = partitioning; this.ordering = ordering; - this.partitioningFields = OptimizerFieldList.getFromOrdering(ordering); + } + + /** + * @param partitioning + * @param partitioningFields + */ + public GlobalProperties(PartitionProperty partitioning, OptimizerFieldSet partitioningFields) { + this.partitioning = partitioning; + this.partitioningFields = partitioningFields; } // -------------------------------------------------------------------------------------------- @@ -67,7 +78,6 @@ public final class GlobalProperties implements Cloneable public void setPartitioning(PartitionProperty partitioning, Ordering ordering) { this.partitioning = partitioning; this.ordering = ordering; - this.partitioningFields = OptimizerFieldList.getFromOrdering(ordering); } /** @@ -79,8 +89,13 @@ public final class GlobalProperties implements Cloneable return partitioning; } + /** + * Gets the fields on which the data is partitioned. + * + * @return The partitioning fields. + */ public OptimizerFieldSet getPartitionedFields() { - return this.partitioning == PartitionProperty.NONE ? null : this.partitioningFields; + return this.partitioningFields; } /** @@ -105,6 +120,7 @@ public final class GlobalProperties implements Cloneable public void reset() { this.partitioning = PartitionProperty.NONE; this.ordering = null; + this.partitioningFields = null; } /** @@ -114,85 +130,60 @@ public final class GlobalProperties implements Cloneable * The output contract. * @return True, if any non-default value is preserved, false otherwise. */ - public boolean filterByNodesConstantSet(OptimizerNode node, int input) { - -// // check if partitioning survives -// if (partitionedFields != null) { -// for (Integer index : partitionedFields) { -// if (node.isFieldKept(input, index) == false) { -// partitionedFields = null; -// partitioning = PartitionProperty.NONE; -// } -// } -// } -// -// // check, whether the global order is preserved -// if (ordering != null) { -// ArrayList involvedIndexes = ordering.getInvolvedIndexes(); -// for (int i = 0; i < involvedIndexes.size(); i++) { -// if (node.isFieldKept(input, i) == false) { -// ordering = ordering.createNewOrderingUpToIndex(i); -// break; -// } -// } -// } -// -// return !isTrivial(); - return false; + public GlobalProperties filterByNodesConstantSet(OptimizerNode node, int input) + { + // check if partitioning survives + if (this.ordering != null) { + for (int col : this.ordering.getInvolvedIndexes()) { + if (!node.isFieldConstant(input, col)) { + return null; + } + } + } else if (this.partitioningFields != null) { + for (ColumnWithType col : this.partitioningFields) { + if (!node.isFieldConstant(input, col.getColumnIndex())) { + return null; + } + } + } + return this; } - public GlobalProperties createInterestingGlobalProperties(OptimizerNode node, int input) { -// // check if partitioning survives -// ArrayList newPartitionedFields = null; -// PartitionProperty newPartitioning = PartitionProperty.NONE; -// Ordering newOrdering = null; -// -// if (node instanceof UnionNode) { -// //only HashPartitioning is interesting for union nodes -// if (partitioning == PartitionProperty.HASH_PARTITIONED) { -// // fields are kept as there is no user code involved -// newPartitioning = PartitionProperty.HASH_PARTITIONED; -// newPartitionedFields = partitionedFields; -// } -// } -// else { -// if (partitionedFields != null) { -// for (Integer index : partitionedFields) { -// if (node.isFieldKept(input, index) == true) { -// if (newPartitionedFields == null) { -// newPartitioning = this.partitioning; -// newPartitionedFields = new ArrayList(); -// } -// newPartitionedFields.add(index); -// } -// } -// } -// -// // check, whether the global order is preserved -// if (ordering != null) { -// boolean orderingPreserved = true; -// ArrayList involvedIndexes = ordering.getInvolvedIndexes(); -// for (int i = 0; i < involvedIndexes.size(); i++) { -// if (node.isFieldKept(input, i) == false) { -// orderingPreserved = false; -// break; -// } -// } -// -// if (orderingPreserved) { -// newOrdering = ordering.clone(); -// } -// } -// } -// -// if (newPartitioning == PartitionProperty.NONE && newOrdering == null) { - return null; -// } else { -// FieldList partitionFields = new FieldList(); -// partitionFields.addAll(newPartitionedFields); -// return new GlobalProperties(newPartitioning, newOrdering, partitionFields); -// } - + public GlobalProperties createInterestingGlobalPropertiesTopDownSubset(OptimizerNode node, int input) + { + // check, whether the global order is preserved + if (this.ordering != null) { + for (int i : this.ordering.getInvolvedIndexes()) { + if (!node.isFieldConstant(input, i)) { + return null; + } + } + return this; + } + else if (partitioningFields != null) { + boolean allIn = true; + boolean atLeasOneIn = false; + for (ColumnWithType col : this.partitioningFields) { + boolean res = node.isFieldConstant(input, col.getColumnIndex()); + allIn &= res; + atLeasOneIn |= res; + } + if (allIn) { + return this; + } else if (atLeasOneIn) { + OptimizerFieldSet newFields = new OptimizerFieldSet(); + for (ColumnWithType nc : this.partitioningFields) { + if (node.isFieldConstant(input, nc.getColumnIndex())) { + newFields.add(nc); + } + } + return new GlobalProperties(this.partitioning, newFields); + } else { + return null; + } + } else { + return this; + } } /** @@ -203,33 +194,44 @@ public final class GlobalProperties implements Cloneable * The properties for which to check whether they meet these properties. * @return True, if the properties are met, false otherwise. */ - public boolean isMetBy(GlobalProperties other) { -// if (this.partitioning != PartitionProperty.NONE) { -// if (this.partitioning == PartitionProperty.ANY) { -// if (other.partitioning == PartitionProperty.NONE) { -// return false; -// } -// } else if (other.partitioning != this.partitioning) { -// return false; -// } -// } -// -// FieldList otherPartitionedFields = other.getPartitionedFields(); -// if (this.partitionedFields != null) { -// if (other.partitionedFields == null) { -// return false; -// } -// if(!otherPartitionedFields.containsAll(this.partitionedFields)) { -// return false; -// } -// } -// -// if (this.ordering != null && this.ordering.isMetBy(other.getOrdering()) == false) { -// return false; -// } -// -// return true; - return false; + public boolean isMetBy(GlobalProperties other) + { + if (this.partitioning == PartitionProperty.NONE) { + return true; + } + + if (this.partitioning == PartitionProperty.ANY) { + if (other.partitioning == PartitionProperty.NONE) { + return false; + } + } else if (other.partitioning != this.partitioning) { + return false; + } + + if (this.ordering != null) { + if (other.ordering == null) + throw new CompilerException("BUG: Equal partitioning property, ordering not equally set."); + if (this.ordering.getInvolvedIndexes().isValidSubset(other.ordering.getInvolvedIndexes())) { + // check if the directions match + for (int i = 0; i < other.ordering.getNumberOfFields(); i++) { + Order to = this.ordering.getOrder(i); + Order oo = other.ordering.getOrder(i); + if (to == Order.NONE) + continue; + if (oo == Order.NONE) + return false; + if (to != Order.ANY && to != oo) + return false; + } + return true; + } else { + return false; + } + } else if (this.partitioningFields != null) { + return this.partitioningFields.isValidSubset(other.partitioningFields); + } else { + throw new RuntimeException("Found a partitioning property, but no fields."); + } } // ------------------------------------------------------------------------ @@ -242,9 +244,9 @@ public final class GlobalProperties implements Cloneable public int hashCode() { final int prime = 31; int result = 1; -// result = prime * result + ((partitioning == null) ? 0 : partitioning.hashCode()); -// result = prime * result + ((partitionedFields == null) ? 0 : partitionedFields.hashCode()); -// result = prime * result + ((ordering == null) ? 0 : ordering.hashCode()); + result = prime * result + ((partitioning == null) ? 0 : partitioning.hashCode()); + result = prime * result + ((partitioningFields == null) ? 0 : partitioningFields.hashCode()); + result = prime * result + ((ordering == null) ? 0 : ordering.hashCode()); return result; } @@ -254,22 +256,15 @@ public final class GlobalProperties implements Cloneable */ @Override public boolean equals(Object obj) { - if (this == obj) { - return true; - } else if (obj == null) { - return false; - } else if (getClass() != obj.getClass()) { + if (obj != null && obj instanceof GlobalProperties) { + GlobalProperties other = (GlobalProperties) obj; + return (ordering == other.getOrdering() || (ordering != null && ordering.equals(other.getOrdering()))) + && (partitioning == other.getPartitioning()) + && (partitioningFields == other.partitioningFields || + (partitioningFields != null && partitioningFields.equals(other.getPartitionedFields()))); + } else { return false; } - -// GlobalProperties other = (GlobalProperties) obj; -// if ((ordering == other.getOrdering() || (ordering != null && ordering.equals(other.getOrdering()))) -// && partitioning == other.getPartitioning() && partitionedFields != null -// && partitionedFields.equals(other.getPartitionedFields())) { -// return true; -// } else { - return false; -// } } /* @@ -278,31 +273,18 @@ public final class GlobalProperties implements Cloneable */ @Override public String toString() { - return "GlobalProperties [partitioning=" + partitioning + " on fields=" // + partitionedFields + ", ordering=" - + ordering + "]"; + return "GlobalProperties [partitioning=" + partitioning + + (this.partitioningFields == null ? "" : ", on fields " + this.partitioningFields) + + (this.ordering == null ? "" : ", with ordering " + this.ordering) + "]"; } /* * (non-Javadoc) * @see java.lang.Object#clone() */ - public GlobalProperties clone() throws CloneNotSupportedException { - GlobalProperties newProps = (GlobalProperties) super.clone(); - if (this.ordering != null) { - newProps.ordering = this.ordering.clone(); - } - - return newProps; - } - - /** - * Convenience method to create copies without the cloning exception. - * - * @return A perfect deep copy of this object. - */ - public final GlobalProperties createCopy() { + public GlobalProperties clone() { try { - return this.clone(); + return (GlobalProperties) super.clone(); } catch (CloneNotSupportedException cnse) { // should never happen, but propagate just in case throw new RuntimeException(cnse); diff --git a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/LocalProperties.java b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/LocalProperties.java index b902c1c87b66b51519e80ef77a4972eb2dfcf5c4..2dc8e517ead344f0d0f419c3ed95f42cc4632615 100644 --- a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/LocalProperties.java +++ b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/LocalProperties.java @@ -30,7 +30,7 @@ public final class LocalProperties implements Cloneable { private Ordering ordering; // order inside a partition, null if not ordered - private FieldSet groupedFields; // fields by which the stream is grouped. null if not grouped. + private OptimizerFieldSet groupedFields; // fields by which the stream is grouped. null if not grouped. private FieldSet uniqueFields; // fields whose value combination is unique in the stream @@ -41,27 +41,18 @@ public final class LocalProperties implements Cloneable */ public LocalProperties() {} - /** - * Creates a new instance of local properties that have only the given ordering as property. - * The ordering does automatically imply a grouping, though. - * - * @param ordering The ordering represented by these local properties. - */ - public LocalProperties(Ordering ordering) { - this.ordering = ordering; - this.groupedFields = ordering.getInvolvedIndexes(); - } - /** * Creates a new instance of local properties that have the given ordering as property, * field grouping and field uniqueness. Any of the given parameters may be null. Beware, though, * that a null grouping is inconsistent with a non-null ordering. + *

+ * This constructor is used only for internal copy creation. * * @param ordering The ordering represented by these local properties. * @param groupedFields The grouped fields for these local properties. * @param uniqueFields The unique fields for these local properties. */ - public LocalProperties(Ordering ordering, FieldSet groupedFields, FieldSet uniqueFields) { + private LocalProperties(Ordering ordering, OptimizerFieldSet groupedFields, FieldSet uniqueFields) { this.ordering = ordering; this.groupedFields = groupedFields; this.uniqueFields = uniqueFields; @@ -93,7 +84,7 @@ public final class LocalProperties implements Cloneable * * @return The grouped fields, or null if nothing is grouped. */ - public FieldSet getGroupedFields() { + public OptimizerFieldSet getGroupedFields() { return this.groupedFields; } @@ -102,7 +93,7 @@ public final class LocalProperties implements Cloneable * * @param groupedFields The fields that are grouped in these data properties. */ - public void setGroupedFields(FieldSet groupedFields) { + public void setGroupedFields(OptimizerFieldSet groupedFields) { this.groupedFields = groupedFields; } @@ -155,24 +146,32 @@ public final class LocalProperties implements Cloneable * * @return True, if the resulting properties are non trivial. */ - public boolean filterByNodesConstantSet(OptimizerNode node, int input) { + public LocalProperties filterByNodesConstantSet(OptimizerNode node, int input) + { // check, whether the local order is preserved + Ordering no = this.ordering; + FieldSet ngf = this.groupedFields; + FieldSet nuf = this.uniqueFields; + if (this.ordering != null) { FieldList involvedIndexes = this.ordering.getInvolvedIndexes(); for (int i = 0; i < involvedIndexes.size(); i++) { if (!node.isFieldConstant(input, involvedIndexes.get(i))) { - this.ordering = this.ordering.createNewOrderingUpToIndex(i); + if (i == 0) { + no = null; + ngf = null; + } else { + no = this.ordering.createNewOrderingUpToIndex(i); + ngf = no.getInvolvedIndexes(); + } break; } } - } - - // check, whether the local key grouping is preserved - if (this.groupedFields != null) { + } else if (this.groupedFields != null) { + // check, whether the local key grouping is preserved for (Integer index : this.groupedFields) { if (!node.isFieldConstant(input, index)) { - this.groupedFields = null; - break; + ngf = null; } } } @@ -181,65 +180,15 @@ public final class LocalProperties implements Cloneable if (this.uniqueFields != null) { for (Integer index : this.uniqueFields) { if (!node.isFieldConstant(input, index)) { - this.uniqueFields = null; + nuf = null; break; } } } - return !isTrivial(); - } - - public LocalProperties createInterestingLocalProperties(OptimizerNode node, int input) - { -// // check, whether the local order is preserved -// boolean newGrouped = false; -// Ordering newOrdering = null; -// FieldSet newGroupedFields = null; -// -// -// // no interesting LocalProperties for input of Unions -// if (node instanceof UnionNode) return null; -// -// -// // check, whether the local key grouping is preserved -// if (this.groupedFields != null) { -// boolean groupingPreserved = true; -// for (Integer index : this.groupedFields) { -// if (node.isFieldKept(input, index) == false) { -// groupingPreserved = false; -// break; -// } -// } -// -// if (groupingPreserved) { -// newGroupedFields = (FieldSet) this.groupedFields.clone(); -// newGrouped = true; -// } -// } -// -// // check, whether the global order is preserved -// if (ordering != null) { -// boolean orderingPreserved = true; -// ArrayList involvedIndexes = ordering.getInvolvedIndexes(); -// for (int i = 0; i < involvedIndexes.size(); i++) { -// if (node.isFieldKept(input, i) == false) { -// orderingPreserved = false; -// break; -// } -// } -// -// if (orderingPreserved) { -// newOrdering = ordering.clone(); -// } -// } -// -// if (newGrouped == false && newOrdering == null) { - return null; -// } -// else { -// return new LocalProperties(newGrouped, newGroupedFields, newOrdering); -// } + return (no == this.ordering && ngf == this.groupedFields && nuf == this.uniqueFields) ? this : + (no == null && ngf == null && nuf == null) ? null : + new LocalProperties(no, ngf, nuf); } /** diff --git a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/PactCompiler.java b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/PactCompiler.java index 1f0a320a01005af648b71f5a952aa92a538fe994..47ac974acbec033b398d69b28bfc2d619d4d349c 100644 --- a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/PactCompiler.java +++ b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/PactCompiler.java @@ -46,7 +46,7 @@ import eu.stratosphere.pact.common.plan.Plan; import eu.stratosphere.pact.common.plan.Visitor; import eu.stratosphere.pact.common.util.PactConfigConstants; import eu.stratosphere.pact.compiler.costs.CostEstimator; -import eu.stratosphere.pact.compiler.costs.FixedSizeClusterCostEstimator; +import eu.stratosphere.pact.compiler.costs.DefaultCostEstimator; import eu.stratosphere.pact.compiler.plan.CoGroupNode; import eu.stratosphere.pact.compiler.plan.CrossNode; import eu.stratosphere.pact.compiler.plan.DataSinkNode; @@ -363,7 +363,7 @@ public class PactCompiler { * The address of the job manager (to obtain system characteristics) is determined via the global configuration. */ public PactCompiler() { - this(null, new FixedSizeClusterCostEstimator()); + this(null, new DefaultCostEstimator()); } /** @@ -378,7 +378,7 @@ public class PactCompiler { * The statistics to be used to determine the input properties. */ public PactCompiler(DataStatistics stats) { - this(stats, new FixedSizeClusterCostEstimator()); + this(stats, new DefaultCostEstimator()); } /** diff --git a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/costs/CostEstimator.java b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/costs/CostEstimator.java index 1ef2f17371d37336c0e5f8ab74547aa58d335007..138d765759dc66cf037ba198656b6509af029eee 100644 --- a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/costs/CostEstimator.java +++ b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/costs/CostEstimator.java @@ -15,53 +15,39 @@ package eu.stratosphere.pact.compiler.costs; -import java.util.List; +import java.util.Iterator; import eu.stratosphere.pact.compiler.CompilerException; import eu.stratosphere.pact.compiler.Costs; -import eu.stratosphere.pact.compiler.plan.OptimizerNode; -import eu.stratosphere.pact.compiler.plan.PactConnection; -import eu.stratosphere.pact.runtime.resettable.BlockResettableMutableObjectIterator; +import eu.stratosphere.pact.compiler.plan.EstimateProvider; +import eu.stratosphere.pact.compiler.plan.candidate.Channel; +import eu.stratosphere.pact.compiler.plan.candidate.PlanNode; + /** - * @author Stephan Ewen (stephan.ewen@tu-berlin.de) + * @author Stephan Ewen */ public abstract class CostEstimator { - public abstract void getRangePartitionCost(PactConnection conn, Costs costs); + public abstract void addRangePartitionCost(EstimateProvider estimates, Costs costs); - public abstract void getHashPartitioningCost(PactConnection conn, Costs costs); + public abstract void addHashPartitioningCost(EstimateProvider estimates, Costs costs); - public abstract void getBroadcastCost(PactConnection conn, Costs costs); + public abstract void addBroadcastCost(EstimateProvider estimates, int replicationFactor, Costs costs); // ------------------------------------------------------------------------ - public abstract void getLocalSortCost(OptimizerNode node, PactConnection input, Costs costs); - - public abstract void getLocalDoubleSortMergeCost(OptimizerNode node, PactConnection input1, PactConnection input2, - Costs costs); - - public abstract void getLocalSingleSortMergeCost(OptimizerNode node, PactConnection input1, PactConnection input2, - Costs costs); + public abstract void addLocalSortCost(EstimateProvider estimates, long memorySize, Costs costs); - public abstract void getLocalMergeCost(OptimizerNode node, PactConnection input1, PactConnection input2, - Costs costs); + public abstract void addLocalMergeCost(EstimateProvider estimates1, EstimateProvider estimates2, long memorySize, Costs costs); - public abstract void getLocalSortSelfNestedLoopCost(OptimizerNode node, PactConnection input, int bufferSize, Costs costs); - - public abstract void getLocalSelfNestedLoopCost(OptimizerNode node, PactConnection input, int bufferSize, Costs costs); + public abstract void addLocalSelfNestedLoopCost(EstimateProvider estimates, long bufferSize, Costs costs); - public abstract void getHybridHashCosts(OptimizerNode node, PactConnection buildSideInput, - PactConnection probeSideInput, Costs costs); + public abstract void addHybridHashCosts(EstimateProvider buildSide, EstimateProvider probeSide, long memorySize, Costs costs); - public abstract void getMainMemHashCosts(OptimizerNode node, PactConnection buildSideInput, - PactConnection probeSideInput, Costs costs); + public abstract void addStreamedNestedLoopsCosts(EstimateProvider outerSide, EstimateProvider innerSide, long bufferSize, Costs costs); - public abstract void getStreamedNestedLoopsCosts(OptimizerNode node, PactConnection outerSide, - PactConnection innerSide, int bufferSize, Costs costs); - - public abstract void getBlockNestedLoopsCosts(OptimizerNode node, PactConnection outerSide, PactConnection innerSide, - int blockSize, Costs costs); + public abstract void addBlockNestedLoopsCosts(EstimateProvider outerSide, EstimateProvider innerSide, long blockSize, Costs costs); // ------------------------------------------------------------------------ @@ -69,126 +55,106 @@ public abstract class CostEstimator { * This method computes the costs for an operator. It requires that all inputs are set and have a proper * ShipStrategy set, which is not equal to NONE. * - * @param n - * The node to compute the costs for. + * @param n The node to compute the costs for. */ - public void costOperator(OptimizerNode n) { - if (n.getIncomingConnections() == null) { - throw new CompilerException("Cannot compute costs on operator before incoming connections are set."); - } - + public void costOperator(PlanNode n) + { // initialize costs objects with currently unknown costs - Costs globCost = new Costs(); - Costs locCost = new Costs(); - - globCost.setNetworkCost(0); - globCost.setSecondaryStorageCost(0); + final Costs costs = new Costs(); + final long availableMemory = n.getTotalAvailableMemory(); - List incomingConnections = n.getIncomingConnections(); - - for (int i = 0; i < incomingConnections.size(); i++) { - - PactConnection connection = incomingConnections.get(i); + // add the shipping strategy costs + for (Iterator channels = n.getInputs(); channels.hasNext(); ) { + final Channel channel = channels.next(); - Costs tempGlobalCost = new Costs(); - - switch (connection.getShipStrategy().type()) { + switch (channel.getShipStrategy()) { case NONE: throw new CompilerException( "Cannot determine costs: Shipping strategy has not been set for an input."); case FORWARD: case PARTITION_LOCAL_HASH: - tempGlobalCost.setNetworkCost(0); - tempGlobalCost.setSecondaryStorageCost(0); break; case PARTITION_HASH: - getHashPartitioningCost(connection, tempGlobalCost); + addHashPartitioningCost(channel, costs); break; case PARTITION_RANGE: - getRangePartitionCost(connection, tempGlobalCost); + addRangePartitionCost(channel, costs); break; case BROADCAST: - getBroadcastCost(connection, tempGlobalCost); + addBroadcastCost(channel, channel.getReplicationFactor(), costs); break; case SFR: throw new CompilerException("Symmetric-Fragment-And-Replicate Strategy currently not supported."); default: - throw new CompilerException("Unknown shipping strategy for input: " + connection.getShipStrategy().name()); + throw new CompilerException("Unknown shipping strategy for input: " + channel.getShipStrategy()); } - - globCost.addCosts(tempGlobalCost); } - - PactConnection primConn = null; - PactConnection secConn = null; + Channel firstInput = null; + Channel secondInput = null; // get the inputs, if we have some { - if (incomingConnections.size() > 0) { - primConn = incomingConnections.get(0); - } - if (incomingConnections.size() > 1) { - secConn = incomingConnections.get(1); - } + Iterator channels = n.getInputs(); + if (channels.hasNext()) + firstInput = channels.next(); + if (channels.hasNext()) + secondInput = channels.next(); } // determine the local costs - locCost.setNetworkCost(0); switch (n.getLocalStrategy()) { case NONE: - locCost.setNetworkCost(0); - locCost.setSecondaryStorageCost(0); break; case COMBININGSORT: case SORT: - getLocalSortCost(n, primConn, locCost); + addLocalSortCost(firstInput, availableMemory, costs); break; case SORT_BOTH_MERGE: - getLocalDoubleSortMergeCost(n, primConn, secConn, locCost); + addLocalSortCost(firstInput, availableMemory / 2, costs); + addLocalSortCost(secondInput, availableMemory / 2, costs); + addLocalMergeCost(firstInput, secondInput, 0, costs); break; case SORT_FIRST_MERGE: - getLocalSingleSortMergeCost(n, primConn, secConn, locCost); + addLocalSortCost(firstInput, availableMemory, costs); + addLocalMergeCost(firstInput, secondInput, 0, costs); break; case SORT_SECOND_MERGE: - getLocalSingleSortMergeCost(n, secConn, primConn, locCost); + addLocalSortCost(secondInput, availableMemory, costs); + addLocalMergeCost(firstInput, secondInput, 0, costs); break; case MERGE: - getLocalMergeCost(n, primConn, secConn, locCost); + addLocalMergeCost(firstInput, secondInput, 0, costs); break; case SORT_SELF_NESTEDLOOP: - getLocalSortSelfNestedLoopCost(n, primConn, 10, locCost); + addLocalSortCost(firstInput, availableMemory, costs); + addLocalSelfNestedLoopCost(firstInput, 10, costs); break; case SELF_NESTEDLOOP: - getLocalSelfNestedLoopCost(n, primConn, 10, locCost); + addLocalSelfNestedLoopCost(firstInput, 10, costs); break; case HYBRIDHASH_FIRST: - getHybridHashCosts(n, primConn, secConn, locCost); + addHybridHashCosts(firstInput, secondInput, availableMemory, costs); break; case HYBRIDHASH_SECOND: - getHybridHashCosts(n, secConn, primConn, locCost); + addHybridHashCosts(secondInput, firstInput, availableMemory, costs); break; case NESTEDLOOP_BLOCKED_OUTER_FIRST: - getBlockNestedLoopsCosts(n, primConn, secConn, BlockResettableMutableObjectIterator.MIN_BUFFER_SIZE, locCost); + addBlockNestedLoopsCosts(firstInput, secondInput, availableMemory, costs); break; case NESTEDLOOP_BLOCKED_OUTER_SECOND: - getBlockNestedLoopsCosts(n, secConn, primConn, BlockResettableMutableObjectIterator.MIN_BUFFER_SIZE, locCost); + addBlockNestedLoopsCosts(secondInput, firstInput, availableMemory, costs); break; case NESTEDLOOP_STREAMED_OUTER_FIRST: - getStreamedNestedLoopsCosts(n, primConn, secConn, - 128 * 1024, locCost); + addStreamedNestedLoopsCosts(firstInput, secondInput, availableMemory, costs); break; case NESTEDLOOP_STREAMED_OUTER_SECOND: - getStreamedNestedLoopsCosts(n, secConn, primConn, - 128 * 1024, locCost); + addStreamedNestedLoopsCosts(secondInput, firstInput, availableMemory, costs); break; default: throw new CompilerException("Unknown local strategy: " + n.getLocalStrategy().name()); } - // add the costs and set them - globCost.addCosts(locCost); - n.setCosts(globCost); + n.setCosts(costs); } - } diff --git a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/costs/DefaultCostEstimator.java b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/costs/DefaultCostEstimator.java new file mode 100644 index 0000000000000000000000000000000000000000..d182f216310b8957bfe5d21f3f99f8f01667cc41 --- /dev/null +++ b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/costs/DefaultCostEstimator.java @@ -0,0 +1,120 @@ +/*********************************************************************************************************************** + * + * Copyright (C) 2010 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + **********************************************************************************************************************/ + +package eu.stratosphere.pact.compiler.costs; + +import eu.stratosphere.pact.compiler.Costs; +import eu.stratosphere.pact.compiler.plan.EstimateProvider; + +/** + * A default cost estimator that has access to basic size and cardinality estimates. + *

+ * For robustness reasons, we always assume that the whole data is shipped during a repartition step. We deviate from + * the typical estimate of (n - 1) / n (with n being the number of nodes), because for a parallelism + * of 1, that would yield a shipping of zero bytes. While this is usually correct, the runtime scheduling may still + * choose to move tasks to different nodes, so that we do not know that no data is shipped. + * + * @author Stephan Ewen + */ +public class DefaultCostEstimator extends CostEstimator +{ + @Override + public void addRangePartitionCost(EstimateProvider estimates, Costs costs) + { + final long dataSize = estimates.getEstimatedOutputSize(); + if (dataSize != -1) { + // Assume sampling of 10% of the data and spilling it to disk + final long sampled = (long) (dataSize * 1.1f); + // set shipping costs + costs.addNetworkCost(dataSize + sampled); + // we assume a two phase merge sort, so all in all 2 I/O operations per block + costs.addSecondaryStorageCost(2 * sampled); + } else { + // no costs known + costs.setNetworkCost(-1); + costs.setSecondaryStorageCost(-1); + } + } + + @Override + public void addHashPartitioningCost(EstimateProvider estimates, Costs costs) { + // conservative estimate: we need ship the whole data over the network to establish the + // partitioning. no disk costs. + final long estOutShipSize = estimates.getEstimatedOutputSize(); + if (estOutShipSize == -1) { + costs.addNetworkCost(-1); + } else { + costs.addNetworkCost(estOutShipSize); + } + } + + @Override + public void addBroadcastCost(EstimateProvider estimates, int replicationFactor, Costs costs) { + // assumption: we need ship the whole data over the network to each node. + final long estOutShipSize = estimates.getEstimatedOutputSize(); + if (estOutShipSize < 0) { + costs.setNetworkCost(-1); + } else { + costs.addNetworkCost(replicationFactor * estOutShipSize); + } + } + + @Override + public void addLocalSortCost(EstimateProvider estimates, long availableMemory, Costs costs) { + final long s = estimates.getEstimatedOutputSize(); + // we assume a two phase merge sort, so all in all 2 I/O operations per block + costs.addSecondaryStorageCost(s < 0 ? -1 : 2 * s); + } + + @Override + public void addLocalMergeCost(EstimateProvider input1, EstimateProvider input2, long availableMemory, Costs costs) { + } + + @Override + public void addLocalSelfNestedLoopCost(EstimateProvider estimates, long bufferSize, Costs costs) { + long is = estimates.getEstimatedOutputSize(); + long ic = estimates.getEstimatedNumRecords(); + long loops = ic == -1 ? 10 : ic / bufferSize; + costs.addSecondaryStorageCost(is == -1 ? -1 : (loops + 2) * is); + } + + @Override + public void addHybridHashCosts(EstimateProvider buildSideInput, EstimateProvider probeSideInput, long availableMemory, Costs costs) { + long bs = buildSideInput.getEstimatedOutputSize(); + long ps = probeSideInput.getEstimatedOutputSize(); + // half the table has to spill time 2 I/O + costs.addSecondaryStorageCost(bs < 0 || ps < 0 ? -1 : bs + ps); + } + + @Override + public void addStreamedNestedLoopsCosts(EstimateProvider outerSide, EstimateProvider innerSide, long bufferSize, Costs costs) { + long is = innerSide.getEstimatedOutputSize(); + long oc = outerSide.getEstimatedNumRecords(); + + // check whether the inner side can be cached + if (is > bufferSize) { + costs.addSecondaryStorageCost(is >= 0 && oc >= 0 ? oc * is : -1); + } + } + + @Override + public void addBlockNestedLoopsCosts(EstimateProvider outerSide, EstimateProvider innerSide, long blockSize, Costs costs) { + long is = innerSide.getEstimatedOutputSize(); + long os = outerSide.getEstimatedOutputSize(); + long loops = os < 0 ? 1000 : Math.max(os / blockSize, 1); + + costs.addSecondaryStorageCost(is == -1 ? -1 : loops * is); + } +} diff --git a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/costs/FixedSizeClusterCostEstimator.java b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/costs/FixedSizeClusterCostEstimator.java deleted file mode 100644 index 73e53f05831d5bf7913fe50d190afd5a3eee6cef..0000000000000000000000000000000000000000 --- a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/costs/FixedSizeClusterCostEstimator.java +++ /dev/null @@ -1,323 +0,0 @@ -/*********************************************************************************************************************** - * - * Copyright (C) 2010 by the Stratosphere project (http://stratosphere.eu) - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - * - **********************************************************************************************************************/ - -package eu.stratosphere.pact.compiler.costs; - -import eu.stratosphere.pact.common.contract.DataDistribution; -import eu.stratosphere.pact.compiler.Costs; -import eu.stratosphere.pact.compiler.plan.OptimizerNode; -import eu.stratosphere.pact.compiler.plan.PactConnection; - -/** - * A cost estimator that assumes a given number of nodes in order to estimate the costs of - * shipping strategies. - *

- * For robustness reasons, we always assume that the whole data is shipped during a repartition step. We deviate from - * the typical estimate of (n - 1) / n (with n being the number of nodes), because for a parallelism - * of 1, that would yield a shipping of zero bytes. While this is usually correct, the runtime scheduling may still - * choose to move tasks to different nodes, so that we do not know that no data is shipped. - * - * @author Stephan Ewen (stephan.ewen@tu-berlin.de) - */ -public class FixedSizeClusterCostEstimator extends CostEstimator { - - /** - * Creates a new cost estimator that assumes four nodes, unless - * the parameters of a contract indicate anything else. - * - */ - public FixedSizeClusterCostEstimator() { - } - - /* - * (non-Javadoc) - * @see eu.stratosphere.pact.compiler.costs.CostEstimator#getRangePartitionCost( - * eu.stratosphere.pact.compiler.plan.PactConnection, - * eu.stratosphere.pact.compiler.Costs) - */ - @Override - public void getRangePartitionCost(PactConnection conn, Costs costs) { - - Class distribution = null; - - if(distribution == null) { - - if(conn.getSourcePact().getEstimatedOutputSize() != -1) { - // Assume sampling of 10% of the data - long estOutShipSize = (long)(conn.getReplicationFactor() * conn.getSourcePact().getEstimatedOutputSize() * 1.1); - // set shipping costs - costs.setNetworkCost((long) (1.5f * estOutShipSize)); - // we assume a two phase merge sort, so all in all 2 I/O operations per block - costs.setSecondaryStorageCost(2 * estOutShipSize); - } else { - // no costs known - costs.setNetworkCost(-1); - costs.setSecondaryStorageCost(-1); - } - - // TODO: reactivate if data distribution becomes available -// } else { -// // If data distribution is given, no extra sampling has to be done => same cost as HashPartitioning -// -// if(conn.getSourcePact().getEstimatedOutputSize() != -1) { -// long estOutShipSize = (long) conn.getReplicationFactor() * conn.getSourcePact().getEstimatedOutputSize(); -// costs.setNetworkCost(estOutShipSize); -// } else { -// // no costs known -// costs.setNetworkCost(-1); -// } -// costs.setSecondaryStorageCost(0); - - } - } - - /* - * (non-Javadoc) - * @see eu.stratosphere.pact.compiler.costs.CostEstimator#getHashPartitioningCost( - * eu.stratosphere.pact.compiler.plan.PactConnection, - * eu.stratosphere.pact.compiler.Costs) - */ - @Override - public void getHashPartitioningCost(PactConnection conn, Costs costs) { - // conservative estimate: we need ship the whole data over the network to establish the - // partitioning. no disk costs. - final long estOutShipSize = conn.getReplicationFactor() * conn.getSourcePact().getEstimatedOutputSize(); - - if (estOutShipSize == -1) { - costs.setNetworkCost(-1); - } else { - costs.setNetworkCost(estOutShipSize); - } - - costs.setSecondaryStorageCost(0); - } - - /* - * (non-Javadoc) - * @see eu.stratosphere.pact.compiler.costs.CostEstimator#getBroadcastCost( - * eu.stratosphere.pact.compiler.plan.PactConnection, - * eu.stratosphere.pact.compiler.Costs) - */ - @Override - public void getBroadcastCost(PactConnection conn, Costs costs) { - // estimate: we need ship the whole data over the network to each node. - final int replicationFactor = conn.getReplicationFactor() < 1 ? 100 : conn.getReplicationFactor(); - final long estOutShipSize = replicationFactor * conn.getSourcePact().getEstimatedOutputSize(); - - if (estOutShipSize == -1) { - costs.setNetworkCost(-1); - } else { - costs.setNetworkCost(estOutShipSize); - } - - // no disk costs. - costs.setSecondaryStorageCost(0); - } - - /* - * (non-Javadoc) - * @see eu.stratosphere.pact.compiler.costs.CostEstimator#getLocalSortCost( - * eu.stratosphere.pact.compiler.plan.OptimizerNode, - * eu.stratosphere.pact.compiler.plan.PactConnection, - * eu.stratosphere.pact.compiler.Costs) - */ - @Override - public void getLocalSortCost(OptimizerNode node, PactConnection input, Costs costs) { - costs.setNetworkCost(0); - - long s = input.getSourcePact().getEstimatedOutputSize() * input.getReplicationFactor(); - // we assume a two phase merge sort, so all in all 2 I/O operations per block - costs.setSecondaryStorageCost(s < 0 ? -1 : 2 * s); - } - - /* - * (non-Javadoc) - * @see eu.stratosphere.pact.compiler.costs.CostEstimator#getLocalDoubleSortMergeCost( - * eu.stratosphere.pact.compiler.plan.OptimizerNode, - * eu.stratosphere.pact.compiler.plan.PactConnection, - * eu.stratosphere.pact.compiler.plan.PactConnection, - * eu.stratosphere.pact.compiler.Costs) - */ - @Override - public void getLocalDoubleSortMergeCost(OptimizerNode node, PactConnection input1, PactConnection input2, Costs costs) { - costs.setNetworkCost(0); - - long s1 = input1.getSourcePact().getEstimatedOutputSize() * input1.getReplicationFactor(); - long s2 = input2.getSourcePact().getEstimatedOutputSize() * input2.getReplicationFactor(); - - // we assume a two phase merge sort, so all in all 2 I/O operations per block for both sides - costs.setSecondaryStorageCost(s1 < 0 || s2 < 0 ? -1 : 2 * (s1 + s2)); - } - - /* - * (non-Javadoc) - * @see eu.stratosphere.pact.compiler.costs.CostEstimator#getLocalSingleSortMergeCost( - * eu.stratosphere.pact.compiler.plan.OptimizerNode, - * eu.stratosphere.pact.compiler.plan.PactConnection, - * eu.stratosphere.pact.compiler.plan.PactConnection, - * eu.stratosphere.pact.compiler.Costs) - */ - @Override - public void getLocalSingleSortMergeCost(OptimizerNode node, PactConnection unsortedInput, PactConnection sortedInput, Costs costs) { - costs.setNetworkCost(0); - - long s1 = unsortedInput.getSourcePact().getEstimatedOutputSize() * unsortedInput.getReplicationFactor(); - // we assume a two phase merge sort, so all in all 2 I/O operations per block for the unsorted input - costs.setSecondaryStorageCost(s1 < 0 ? -1 : 2 * s1); - } - - /* - * (non-Javadoc) - * @see eu.stratosphere.pact.compiler.costs.CostEstimator#getLocalMergeCost( - * eu.stratosphere.pact.compiler.plan.OptimizerNode, - * eu.stratosphere.pact.compiler.plan.PactConnection, - * eu.stratosphere.pact.compiler.plan.PactConnection, - * eu.stratosphere.pact.compiler.Costs) - */ - @Override - public void getLocalMergeCost(OptimizerNode node, PactConnection input1, PactConnection input2, Costs costs) { - costs.setNetworkCost(0); - - // inputs are sorted. No network and secondary storage costs produced - costs.setSecondaryStorageCost(0); - } - - /* - * (non-Javadoc) - * @see eu.stratosphere.pact.compiler.costs.CostEstimator#getLocalSortSelfNestedLoopCost( - * eu.stratosphere.pact.compiler.plan.OptimizerNode, - * eu.stratosphere.pact.compiler.plan.PactConnection, - * int, - * eu.stratosphere.pact.compiler.Costs) - */ - @Override - public void getLocalSortSelfNestedLoopCost(OptimizerNode node, PactConnection input, int bufferSize, Costs costs) { - costs.setNetworkCost(0); - - long is = input.getSourcePact().getEstimatedOutputSize() * input.getReplicationFactor(); - long ic = input.getSourcePact().getEstimatedNumRecords(); - long loops = ic < 0 ? 1000 : ic / bufferSize; - - // we assume a two phase merge sort, so all in all 2 I/O operations per block - // plus I/O for the SpillingResettableIterators: 2 for writing plus reading - costs.setSecondaryStorageCost(is < 0 ? -1 : (loops + 4) * is); - - } - - /* - * (non-Javadoc) - * @see eu.stratosphere.pact.compiler.costs.CostEstimator#getLocalSelfNestedLoopCost( - * eu.stratosphere.pact.compiler.plan.OptimizerNode, - * eu.stratosphere.pact.compiler.plan.PactConnection, - * int, - * eu.stratosphere.pact.compiler.Costs) - */ - @Override - public void getLocalSelfNestedLoopCost(OptimizerNode node, PactConnection input, int bufferSize, Costs costs) { - - long is = input.getSourcePact().getEstimatedOutputSize() * input.getReplicationFactor(); - long ic = input.getSourcePact().getEstimatedNumRecords(); - long loops = ic == -1 ? 10 : ic / bufferSize; - - costs.setSecondaryStorageCost(is == -1 ? -1 : (loops + 2) * is); - - } - - /* - * (non-Javadoc) - * @see eu.stratosphere.pact.compiler.costs.CostEstimator#getHybridHashCosts( - * eu.stratosphere.pact.compiler.plan.OptimizerNode, - * eu.stratosphere.pact.compiler.plan.PactConnection, - * eu.stratosphere.pact.compiler.plan.PactConnection, - * eu.stratosphere.pact.compiler.Costs) - */ - @Override - public void getHybridHashCosts(OptimizerNode node, PactConnection buildSideInput, PactConnection probeSideInput, - Costs costs) { - costs.setNetworkCost(0); - - long bs = buildSideInput.getSourcePact().getEstimatedOutputSize() * buildSideInput.getReplicationFactor(); - long ps = probeSideInput.getSourcePact().getEstimatedOutputSize() * probeSideInput.getReplicationFactor(); - - // we assume that the build side has to spill and requires one recursive repartitioning - // so 4 I/O operations per block on the build side, and 2 on the probe side - // NOTE: This is currently artificially expensive to prevent the compiler from using the hash-strategies, which are - // being reworked from in-memory and grace towards a gradually degrading hybrid hash join - costs.setSecondaryStorageCost(bs < 0 || ps < 0 ? -1 : 2 * bs + ps); - } - - /* - * (non-Javadoc) - * @see eu.stratosphere.pact.compiler.costs.CostEstimator#getMainMemHashCosts( - * eu.stratosphere.pact.compiler.plan.OptimizerNode, - * eu.stratosphere.pact.compiler.plan.PactConnection, - * eu.stratosphere.pact.compiler.plan.PactConnection, - * eu.stratosphere.pact.compiler.Costs) - */ - @Override - public void getMainMemHashCosts(OptimizerNode node, PactConnection buildSideInput, PactConnection probeSideInput, - Costs target) { - target.setNetworkCost(0); - target.setSecondaryStorageCost(0); - } - - /* - * (non-Javadoc) - * @see eu.stratosphere.pact.compiler.costs.CostEstimator#getStreamedNestedLoopsCosts( - * eu.stratosphere.pact.compiler.plan.OptimizerNode, - * eu.stratosphere.pact.compiler.plan.PactConnection, - * eu.stratosphere.pact.compiler.plan.PactConnection, - * eu.stratosphere.pact.compiler.Costs) - */ - @Override - public void getStreamedNestedLoopsCosts(OptimizerNode node, PactConnection outerSide, PactConnection innerSide, - int bufferSize, Costs costs) - { - costs.setNetworkCost(0); - - long is = innerSide.getSourcePact().getEstimatedOutputSize() * innerSide.getReplicationFactor(); - int repFac = innerSide.getReplicationFactor(); - long oc = outerSide.getSourcePact().getEstimatedNumRecords() * outerSide.getReplicationFactor(); - - // check whether the inner side can be cached - if (is < (bufferSize * repFac)) { - is = 0; - } - - costs.setSecondaryStorageCost(is >= 0 && oc >= 0 ? oc * is : -1); - } - - /* - * (non-Javadoc) - * @see eu.stratosphere.pact.compiler.costs.CostEstimator#getBlockNestedLoopsCosts( - * eu.stratosphere.pact.compiler.plan.OptimizerNode, - * eu.stratosphere.pact.compiler.plan.PactConnection, - * eu.stratosphere.pact.compiler.plan.PactConnection, - * int, - * eu.stratosphere.pact.compiler.Costs) - */ - @Override - public void getBlockNestedLoopsCosts(OptimizerNode node, PactConnection outerSide, PactConnection innerSide, - int blockSize, Costs costs) - { - costs.setNetworkCost(0); - - long is = innerSide.getSourcePact().getEstimatedOutputSize() * innerSide.getReplicationFactor(); - long os = outerSide.getSourcePact().getEstimatedOutputSize() * outerSide.getReplicationFactor(); - long loops = Math.max(os < 0 ? 1000 : os / blockSize, 1); - - costs.setSecondaryStorageCost(is == -1 ? -1 : loops * is); - } -} diff --git a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/DataSinkNode.java b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/DataSinkNode.java index 34c5639109da8c4f17da161ab54cf5484ef76b13..6e55a1fe0bbb1fb8bdc09816303c4980c616dc75 100644 --- a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/DataSinkNode.java +++ b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/DataSinkNode.java @@ -25,7 +25,6 @@ import eu.stratosphere.pact.common.contract.GenericDataSink; import eu.stratosphere.pact.common.contract.Ordering; import eu.stratosphere.pact.common.plan.Visitor; import eu.stratosphere.pact.compiler.CompilerException; -import eu.stratosphere.pact.compiler.Costs; import eu.stratosphere.pact.compiler.DataStatistics; import eu.stratosphere.pact.compiler.PartitionProperty; import eu.stratosphere.pact.compiler.costs.CostEstimator; @@ -228,45 +227,36 @@ public class DataSinkNode extends OptimizerNode { // range partitioned only or global sort // in both cases create a range partitioned only IP InterestingProperties partitioningProps = new InterestingProperties(); + partitioningProps.getGlobalProperties().setPartitioning(PartitionProperty.RANGE_PARTITIONED, partitioning); - estimator.getRangePartitionCost(this.input, partitioningProps.getMaximalCosts()); + estimator.addRangePartitionCost(this.input, partitioningProps.getMaximalCosts()); + this.input.addInterestingProperties(partitioningProps); + } else if (localOrder == null) { + this.input.setNoInterestingProperties(); } if (localOrder != null) { if (partitioning != null && localOrder.equals(partitioning)) { // global sort case: create IP for range partitioned and sorted InterestingProperties globalSortProps = new InterestingProperties(); + globalSortProps.getGlobalProperties().setPartitioning(PartitionProperty.RANGE_PARTITIONED, partitioning); - estimator.getRangePartitionCost(this.input, globalSortProps.getMaximalCosts()); + estimator.addRangePartitionCost(this.input, globalSortProps.getMaximalCosts()); + globalSortProps.getLocalProperties().setOrdering(partitioning); + estimator.addLocalSortCost(this.input, -1, globalSortProps.getMaximalCosts()); - Costs sortCosts = new Costs(); - estimator.getLocalSortCost(this, this.input, sortCosts); - globalSortProps.getMaximalCosts().addCosts(sortCosts); this.input.addInterestingProperties(globalSortProps); } else { // local order only + InterestingProperties localSortProps = new InterestingProperties(); + + localSortProps.getLocalProperties().setOrdering(partitioning); + estimator.addLocalSortCost(this.input, -1, localSortProps.getMaximalCosts()); + + this.input.addInterestingProperties(localSortProps); } - } else { - - - - Ordering localOrdering = getPactContract().getLocalOrder(); - if (localOrdering != null && localOrdering.equals(partitioning)) { - InterestingProperties i = partitioningProps.clone(); - i.getLocalProperties().setOrdering(partitioning); - this.input.addInterestingProperties(i); - } - } - else if (getPactContract().getLocalOrder() != null) { - InterestingProperties i = new InterestingProperties(); - i.getLocalProperties().setOrdering(getPactContract().getLocalOrder()); - estimator.getLocalSortCost(this, this.input, i.getMaximalCosts()); - this.input.addInterestingProperties(i); - - } else { - this.input.setNoInterestingProperties(); } } diff --git a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/EstimateProvider.java b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/EstimateProvider.java new file mode 100644 index 0000000000000000000000000000000000000000..77d82bb5fb6d4f55a7be92f29833b3fb6db94081 --- /dev/null +++ b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/EstimateProvider.java @@ -0,0 +1,47 @@ +/*********************************************************************************************************************** + * + * Copyright (C) 2012 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + **********************************************************************************************************************/ + +package eu.stratosphere.pact.compiler.plan; + +import java.util.Map; + +import eu.stratosphere.pact.common.util.FieldSet; + +/** + * + * + * @author Stephan Ewen + */ +public interface EstimateProvider +{ + /** + * Gets the estimated output size from this node. + * + * @return The estimated output size. + */ + long getEstimatedOutputSize(); + + /** + * Gets the estimated number of records in the output of this node. + * + * @return The estimated number of records. + */ + long getEstimatedNumRecords(); + + + Map getEstimatedCardinalities(); + + long getEstimatedCardinality(FieldSet cP); +} diff --git a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/InterestingProperties.java b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/InterestingProperties.java index 1b16189675b0d2d555ee34a6c3e00a5e4fcb4083..e5cf6c1ab19b496373024133356805559e145e57 100644 --- a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/InterestingProperties.java +++ b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/InterestingProperties.java @@ -98,17 +98,6 @@ public class InterestingProperties implements Cloneable return globalProps; } - /** - * Copies the maximal costs from the given costs object. - * - * @param c - * The costs object to copy. - */ - public void copyMaximalCosts(Costs c) { - this.maximalCosts.setNetworkCost(c.getNetworkCost()); - this.maximalCosts.setSecondaryStorageCost(c.getSecondaryStorageCost()); - } - /** * Checks, if the given InterestingProperties object has the same properties as this one. * This method is a lesser version of the equals(...) method, as it does not take the @@ -133,6 +122,22 @@ public class InterestingProperties implements Cloneable public boolean isMetBy(PlanNode node) { return globalProps.isMetBy(node.getGlobalProperties()) && localProps.isMetBy(node.getLocalProperties()); } + + public InterestingProperties filterByCodeAnnotations(OptimizerNode node, int input) { + GlobalProperties gp = this.globalProps.filterByNodesConstantSet(node, input); + LocalProperties lp = this.localProps.filterByNodesConstantSet(node, input); + + if (gp != this.globalProps || lp != this.localProps) { + if (gp == null && lp == null) { + return null; + } else { + return new InterestingProperties(this.maximalCosts, + gp == null ? new GlobalProperties() : gp, lp == null ? new LocalProperties() : lp); + } + } else { + return this; + } + } // ------------------------------------------------------------------------ @@ -156,29 +161,14 @@ public class InterestingProperties implements Cloneable */ @Override public boolean equals(Object obj) { - if (this == obj) - return true; - if (obj == null) - return false; - if (getClass() != obj.getClass()) - return false; - InterestingProperties other = (InterestingProperties) obj; - if (globalProps == null) { - if (other.globalProps != null) - return false; - } else if (!globalProps.equals(other.globalProps)) + if (obj != null && obj instanceof InterestingProperties) { + InterestingProperties other = (InterestingProperties) obj; + return this.globalProps.equals(other.globalProps) && + this.localProps.equals(other.localProps) && + this.maximalCosts.equals(other.maximalCosts); + } else { return false; - if (localProps == null) { - if (other.localProps != null) - return false; - } else if (!localProps.equals(other.localProps)) - return false; - if (maximalCosts == null) { - if (other.maximalCosts != null) - return false; - } else if (!maximalCosts.equals(other.maximalCosts)) - return false; - return true; + } } /* @@ -196,11 +186,9 @@ public class InterestingProperties implements Cloneable * @see java.lang.Object#clone() */ @Override - public InterestingProperties clone() - { -// return new InterestingProperties(maximalCosts.createCopy(), -// globalProps.createCopy(), localProps.createCopy()); - return null; + public InterestingProperties clone() { + return new InterestingProperties(this.maximalCosts.clone(), + this.globalProps.clone(), this.localProps.createCopy()); } // ------------------------------------------------------------------------ @@ -219,28 +207,23 @@ public class InterestingProperties implements Cloneable * @param toMerge * The properties that is added / merged into the previous collection. */ - public static void mergeUnionOfInterestingProperties(List properties, - InterestingProperties toMerge) { - boolean subsumed = false; - + public static void mergeUnionOfInterestingProperties( + List properties, InterestingProperties toMerge) + { // go through all existing property sets - for (InterestingProperties toCheck : properties) { + for (int i = 0; i < properties.size(); i++) { + InterestingProperties toCheck = properties.get(i); if (toCheck.hasEqualProperties(toMerge)) { - subsumed = true; // the properties are equal. keep the one with the higher maximal cost, // because it indicates, that the properties are worth more. if (toMerge.getMaximalCosts().compareTo(toCheck.getMaximalCosts()) > 0) { - toCheck.copyMaximalCosts(toMerge.getMaximalCosts()); + properties.set(i, toMerge); } - - break; + return; } } - - // if it was not subsumes, add it - if (!subsumed) { - properties.add(toMerge.clone()); - } + // if it was not subsumed, add it + properties.add(toMerge.clone()); } /** @@ -264,74 +247,31 @@ public class InterestingProperties implements Cloneable mergeUnionOfInterestingProperties(properties, candidate); } } - -// /** -// * Utility method that checks, how the given interesting properties that a node receives from its -// * successors, are relevant to its predecessors. That depends, of course, on the output contract, -// * as that determines which properties can be inferred to be preserved by the node. The returned -// * set will not contain interesting properties objects that are reduced to trivial properties, -// * i.e. where all properties have the default value, such as for example none for the -// * partitioning. -// * -// * @param props -// * The collection of interesting properties that a node receives from its successors. -// * @param contract -// * The output contract. -// * @return A collection with the interesting properties that are relevant with respect to the given output -// * contract. Contains the same objects as in the input set, with properties accordingly restricted. -// * Returns always a modifiable collection, even if no properties are preserved. -// */ -// public static final List filterByOutputContract(List props, -// OutputContract contract) { -// // if the output contract is NONE, it basically destroys all properties, -// // as they always refer to the key, and the key is potentially switched -// if (contract == OutputContract.None) { -// return new ArrayList(); -// } else { -// List preserved = new ArrayList(); -// -// // process all interesting properties -// for (InterestingProperties p : props) { -// boolean nonTrivial = p.getGlobalProperties().filterByOutputContract(contract); -// nonTrivial |= p.getLocalProperties().filterByOutputContract(contract); -// -// if (nonTrivial) { -// preserved.add(p); -// } -// } -// -// return preserved; -// } -// } - public static final List createInterestingPropertiesForInput(List props, - OptimizerNode node, int input) { - List preserved = new ArrayList(); + public static final List filterInterestingPropertiesForInput( + List props, OptimizerNode node, int input) + { + List preserved = null; for (InterestingProperties p : props) { -// GlobalProperties preservedGp = p.getGlobalProperties().createCopy(); -// LocalProperties preservedLp = p.getLocalProperties().createCopy(); -// boolean nonTrivial = preservedGp.filterByNodesConstantSet(node, input); -// nonTrivial |= preservedLp.filterByNodesConstantSet(node, input); + final InterestingProperties filteredProps = p.filterByCodeAnnotations(node, input); + if (filteredProps == null) { + continue; + } - GlobalProperties preservedGp = - p.getGlobalProperties().createInterestingGlobalProperties(node, input); - LocalProperties preservedLp = - p.getLocalProperties().createInterestingLocalProperties(node, input); - - if (preservedGp != null || preservedLp != null) { - if (preservedGp == null) { - preservedGp = new GlobalProperties(); - } - if (preservedLp == null) { - preservedLp = new LocalProperties(); - } - InterestingProperties newIp = new InterestingProperties(p.getMaximalCosts().clone(), preservedGp, preservedLp); - mergeUnionOfInterestingProperties(preserved, newIp); + final GlobalProperties topDownAdjustedGP = filteredProps.globalProps.createInterestingGlobalPropertiesTopDownSubset(node, input); + if (topDownAdjustedGP == null && filteredProps.localProps.isTrivial()) + continue; + + if (preserved == null) { + preserved = new ArrayList(); } + + final InterestingProperties toAdd = topDownAdjustedGP == filteredProps.getGlobalProperties() ? filteredProps : + new InterestingProperties(filteredProps.getMaximalCosts(), topDownAdjustedGP, filteredProps.localProps); + mergeUnionOfInterestingProperties(preserved, toAdd); } - return preserved; } } diff --git a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/MapNode.java b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/MapNode.java index 1e838d942dcbde99a352036157c7b6ce64c5dfe5..fa3cb5ba5a9cf6eb49c4542e1ddf07bdccbc09b4 100644 --- a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/MapNode.java +++ b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/MapNode.java @@ -24,7 +24,7 @@ import eu.stratosphere.pact.runtime.task.util.TaskConfig.LocalStrategy; /** * The Optimizer representation of a Map contract node. * - * @author Stephan Ewen (stephan.ewen@tu-berlin.de) + * @author Stephan Ewen */ public class MapNode extends SingleInputNode { @@ -100,13 +100,12 @@ public class MapNode extends SingleInputNode { // if so, propagate to the child. List thisNodesIntProps = getInterestingProperties(); - List props = InterestingProperties.createInterestingPropertiesForInput(thisNodesIntProps, - this, 0); + List props = InterestingProperties.filterInterestingPropertiesForInput(thisNodesIntProps, this, 0); - if (!props.isEmpty()) { - this.inConn.addAllInterestingProperties(props); - } else { + if (props.isEmpty()) { this.inConn.setNoInterestingProperties(); + } else { + this.inConn.addAllInterestingProperties(props); } } diff --git a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/OptimizerNode.java b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/OptimizerNode.java index 57724a4a3291a122563d4f2497128ecf0ac30f68..26a7e00385558a461d262eeb81971586417581d9 100644 --- a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/OptimizerNode.java +++ b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/OptimizerNode.java @@ -45,7 +45,7 @@ import eu.stratosphere.pact.runtime.task.util.TaskConfig.LocalStrategy; * @author Stephan Ewen * @author Fabian Hueske */ -public abstract class OptimizerNode implements Visitable +public abstract class OptimizerNode implements Visitable, EstimateProvider { // ------------------------------------------------------------------------ // Members diff --git a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/PactConnection.java b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/PactConnection.java index 4afd561126b94bb026853ee2f2adac70b81e4ff9..4affa9aedc752617a8880e32f484671d65d75886 100644 --- a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/PactConnection.java +++ b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/PactConnection.java @@ -19,7 +19,9 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Map; +import eu.stratosphere.pact.common.util.FieldSet; import eu.stratosphere.pact.compiler.CompilerException; import eu.stratosphere.pact.compiler.GlobalProperties; import eu.stratosphere.pact.compiler.LocalProperties; @@ -33,7 +35,7 @@ import eu.stratosphere.pact.runtime.shipping.ShipStrategy.ShipStrategyType; * The connections are also used by the optimization algorithm to propagate interesting properties from the sinks in the * direction of the sources. */ -public class PactConnection +public class PactConnection implements EstimateProvider { private final OptimizerNode sourcePact; // The source node of the connection @@ -223,6 +225,42 @@ public class PactConnection return PactConnection.getLocalPropertiesAfterConnection(this.sourcePact, this.targetPact, this.shipStrategy); } + // -------------------------------------------------------------------------------------------- + + /* (non-Javadoc) + * @see eu.stratosphere.pact.compiler.plan.EstimateProvider#getEstimatedOutputSize() + */ + @Override + public long getEstimatedOutputSize() { + return this.sourcePact.getEstimatedOutputSize(); + } + + /* (non-Javadoc) + * @see eu.stratosphere.pact.compiler.plan.EstimateProvider#getEstimatedNumRecords() + */ + @Override + public long getEstimatedNumRecords() { + return this.sourcePact.getEstimatedNumRecords(); + } + + /* (non-Javadoc) + * @see eu.stratosphere.pact.compiler.plan.EstimateProvider#getEstimatedCardinalities() + */ + @Override + public Map getEstimatedCardinalities() { + return this.sourcePact.getEstimatedCardinalities(); + } + + /* (non-Javadoc) + * @see eu.stratosphere.pact.compiler.plan.EstimateProvider#getEstimatedCardinality(eu.stratosphere.pact.common.util.FieldSet) + */ + @Override + public long getEstimatedCardinality(FieldSet cP) { + return this.sourcePact.getEstimatedCardinality(cP); + } + + // -------------------------------------------------------------------------------------------- + /* * (non-Javadoc) * @see java.lang.Object#toString() @@ -348,5 +386,4 @@ public class PactConnection // return lp; return null; } - } \ No newline at end of file diff --git a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/ReduceNode.java b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/ReduceNode.java index 318f24b400f7840702d4317ebfc90d91b0f7246b..0102ad0db8ca09989a65e305788838e3195e098b 100644 --- a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/ReduceNode.java +++ b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/ReduceNode.java @@ -166,39 +166,25 @@ public class ReduceNode extends SingleInputNode { public void computeInterestingPropertiesForInputs(CostEstimator estimator) { // check, if there is an output contract that tells us that certain properties are preserved. // if so, propagate to the child. - List thisNodesIntProps = getInterestingProperties(); - List props = InterestingProperties.createInterestingPropertiesForInput(thisNodesIntProps, - this, 0); + List inheritedIntProps = getInterestingProperties(); + List props = + InterestingProperties.filterInterestingPropertiesForInput(inheritedIntProps, this, 0); // add the first interesting properties: partitioned and grouped InterestingProperties ip1 = new InterestingProperties(); - ip1.getGlobalProperties().setPartitioning(PartitionProperty.ANY, (FieldList)this.keyList.clone()); - ip1.getLocalProperties().setGroupedFields(new FieldSet(this.keyList)); - - ip1.getMaximalCosts().setNetworkCost(0); - ip1.getMaximalCosts().setSecondaryStorageCost(0); - - Costs cost = new Costs(); - estimator.getHashPartitioningCost(this.inConn, cost); - ip1.getMaximalCosts().addCosts(cost); - cost = new Costs(); - estimator.getLocalSortCost(this, this.inConn, cost); - ip1.getMaximalCosts().addCosts(cost); + ip1.getGlobalProperties().setPartitioning(PartitionProperty.ANY, this.keys); + ip1.getLocalProperties().setGroupedFields(this.keys); + estimator.addHashPartitioningCost(this.inConn, ip1.getMaximalCosts()); + estimator.addLocalSortCost(this.inConn, -1, ip1.getMaximalCosts()); // add the second interesting properties: partitioned only InterestingProperties ip2 = new InterestingProperties(); - ip2.getGlobalProperties().setPartitioning(PartitionProperty.ANY, (FieldList)this.keyList.clone()); - - ip2.getMaximalCosts().setNetworkCost(0); - ip2.getMaximalCosts().setSecondaryStorageCost(0); - - estimator.getHashPartitioningCost(this.inConn, cost); - ip2.getMaximalCosts().addCosts(cost); + ip2.getGlobalProperties().setPartitioning(PartitionProperty.ANY, this.keys); + estimator.addHashPartitioningCost(this.inConn, ip2.getMaximalCosts()); InterestingProperties.mergeUnionOfInterestingProperties(props, ip1); InterestingProperties.mergeUnionOfInterestingProperties(props, ip2); - - inConn.addAllInterestingProperties(props); + this.inConn.addAllInterestingProperties(props); } @Override diff --git a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/SingleInputNode.java b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/SingleInputNode.java index 9e02aba65ba6689a6f9541ac005e7d09938c7efa..8dc3cd62aec53d938acb08be8a998dd9a79613d6 100644 --- a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/SingleInputNode.java +++ b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/SingleInputNode.java @@ -27,8 +27,11 @@ import eu.stratosphere.pact.common.contract.SingleInputContract; import eu.stratosphere.pact.common.plan.Visitor; import eu.stratosphere.pact.common.stubs.StubAnnotation.ConstantFields; import eu.stratosphere.pact.common.stubs.StubAnnotation.ConstantFieldsExcept; +import eu.stratosphere.pact.common.type.Key; import eu.stratosphere.pact.common.util.FieldSet; +import eu.stratosphere.pact.compiler.ColumnWithType; import eu.stratosphere.pact.compiler.CompilerException; +import eu.stratosphere.pact.compiler.OptimizerFieldSet; import eu.stratosphere.pact.compiler.PactCompiler; import eu.stratosphere.pact.compiler.costs.CostEstimator; import eu.stratosphere.pact.runtime.shipping.ShipStrategy.ShipStrategyType; @@ -49,7 +52,7 @@ public abstract class SingleInputNode extends OptimizerNode { protected FieldSet constantSet; // set of fields that are left unchanged by the stub protected FieldSet notConstantSet; // set of fields that are changed by the stub - protected final FieldSet keyList; // The set of key fields + protected final OptimizerFieldSet keys; // The set of key fields // ------------------------------ @@ -61,7 +64,15 @@ public abstract class SingleInputNode extends OptimizerNode { */ public SingleInputNode(SingleInputContract pactContract) { super(pactContract); - this.keyList = new FieldSet(pactContract.getKeyColumnNumbers(0)); + + this.keys = new OptimizerFieldSet(); + int[] keyPos = pactContract.getKeyColumnNumbers(0); + Class[] keyTypes = pactContract.getKeyClasses(); + if (keyPos != null) { + for (int i = 0; i < keyPos.length; i++) { + this.keys.add(new ColumnWithType(keyPos[i], keyTypes[i])); + } + } } // /** @@ -198,8 +209,8 @@ public abstract class SingleInputNode extends OptimizerNode { * * @return The key fields of this optimizer node. */ - public FieldSet getKeySet() { - return this.keyList; + public OptimizerFieldSet getKeySet() { + return this.keys; } // -------------------------------------------------------------------------------------------- diff --git a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/TwoInputNode.java b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/TwoInputNode.java index 608a7abbbcfd36a832e6a51eaf0203cb794a94ca..353a833d9b5b1a4f0d8fcf6b4f2a2415fef4f67e 100644 --- a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/TwoInputNode.java +++ b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/TwoInputNode.java @@ -35,6 +35,7 @@ import eu.stratosphere.pact.compiler.CompilerException; import eu.stratosphere.pact.compiler.Costs; import eu.stratosphere.pact.compiler.GlobalProperties; import eu.stratosphere.pact.compiler.LocalProperties; +import eu.stratosphere.pact.compiler.OptimizerFieldSet; import eu.stratosphere.pact.compiler.PactCompiler; import eu.stratosphere.pact.compiler.costs.CostEstimator; import eu.stratosphere.pact.runtime.shipping.ShipStrategy.BroadcastSS; @@ -45,19 +46,19 @@ import eu.stratosphere.pact.runtime.shipping.ShipStrategy.PartitionHashSS; * A node in the optimizer plan that represents a PACT with a two different inputs, such as MATCH or CROSS. * The two inputs are not substitutable in their sides. * - * @author Stephan Ewen (stephan.ewen@tu-berlin.de) + * @author Stephan Ewen */ public abstract class TwoInputNode extends OptimizerNode { private List cachedPlans; // a cache for the computed alternative plans - protected PactConnection input1 = null; // The first input edge + protected PactConnection input1; // The first input edge - protected PactConnection input2 = null; // The second input edge + protected PactConnection input2; // The second input edge - protected FieldList keySet1; // The set of key fields for the first input (order is relevant!) + protected OptimizerFieldSet keySet1; // The set of key fields for the first input - protected FieldList keySet2; // The set of key fields for the second input (order is relevant!) + protected OptimizerFieldSet keySet2; // The set of key fields for the second input // ------------- Stub Annotations @@ -78,83 +79,83 @@ public abstract class TwoInputNode extends OptimizerNode public TwoInputNode(DualInputContract pactContract) { super(pactContract); - this.keySet1 = new FieldList(pactContract.getKeyColumnNumbers(0)); - this.keySet2 = new FieldList(pactContract.getKeyColumnNumbers(1)); + this.keySet1 = new OptimizerFieldSet(); + this.keySet2 = new OptimizerFieldSet(); } - /** - * Copy constructor to create a copy of a node with different predecessors. The predecessors - * is assumed to be of the same type as in the template node and merely copies with different - * strategies, as they are created in the process of the plan enumeration. - * - * @param template - * The node to create a copy of. - * @param pred1 - * The new predecessor for the first input. - * @param pred2 - * The new predecessor for the second input. - * @param conn1 - * The old connection of the first input to copy properties from. - * @param conn2 - * The old connection of the second input to copy properties from. - * @param globalProps - * The global properties of this copy. - * @param localProps - * The local properties of this copy. - */ - protected TwoInputNode(TwoInputNode template, OptimizerNode pred1, OptimizerNode pred2, PactConnection conn1, - PactConnection conn2, GlobalProperties globalProps, LocalProperties localProps) - { - super(template, globalProps, localProps); - - this.constant1 = template.constant1; - this.constant2 = template.constant2; - this.keySet1 = template.keySet1; - this.keySet2 = template.keySet2; - - if(pred1 != null) { - this.input1 = new PactConnection(conn1, pred1, this); - } - - if(pred2 != null) { - this.input2 = new PactConnection(conn2, pred2, this); - } - - // merge the branchPlan maps according the the template's uncloseBranchesStack - if (template.openBranches != null) - { - if (this.branchPlan == null) { - this.branchPlan = new HashMap(8); - } - - for (UnclosedBranchDescriptor uc : template.openBranches) { - OptimizerNode brancher = uc.branchingNode; - OptimizerNode selectedCandidate = null; - - if(pred1 != null) { - if(pred1.branchPlan != null) { - // predecessor 1 has branching children, see if it got the branch we are looking for - selectedCandidate = pred1.branchPlan.get(brancher); - this.branchPlan.put(brancher, selectedCandidate); - } - } - - if(selectedCandidate == null && pred2 != null) { - if(pred2.branchPlan != null) { - // predecessor 2 has branching children, see if it got the branch we are looking for - selectedCandidate = pred2.branchPlan.get(brancher); - this.branchPlan.put(brancher, selectedCandidate); - } - } - - if (selectedCandidate == null) { - throw new CompilerException( - "Candidates for a node with open branches are missing information about the selected candidate "); - } - } - } - } +// /** +// * Copy constructor to create a copy of a node with different predecessors. The predecessors +// * is assumed to be of the same type as in the template node and merely copies with different +// * strategies, as they are created in the process of the plan enumeration. +// * +// * @param template +// * The node to create a copy of. +// * @param pred1 +// * The new predecessor for the first input. +// * @param pred2 +// * The new predecessor for the second input. +// * @param conn1 +// * The old connection of the first input to copy properties from. +// * @param conn2 +// * The old connection of the second input to copy properties from. +// * @param globalProps +// * The global properties of this copy. +// * @param localProps +// * The local properties of this copy. +// */ +// protected TwoInputNode(TwoInputNode template, OptimizerNode pred1, OptimizerNode pred2, PactConnection conn1, +// PactConnection conn2, GlobalProperties globalProps, LocalProperties localProps) +// { +// super(template, globalProps, localProps); +// +// this.constant1 = template.constant1; +// this.constant2 = template.constant2; +// this.keySet1 = template.keySet1; +// this.keySet2 = template.keySet2; +// +// if(pred1 != null) { +// this.input1 = new PactConnection(conn1, pred1, this); +// } +// +// if(pred2 != null) { +// this.input2 = new PactConnection(conn2, pred2, this); +// } +// +// // merge the branchPlan maps according the the template's uncloseBranchesStack +// if (template.openBranches != null) +// { +// if (this.branchPlan == null) { +// this.branchPlan = new HashMap(8); +// } +// +// for (UnclosedBranchDescriptor uc : template.openBranches) { +// OptimizerNode brancher = uc.branchingNode; +// OptimizerNode selectedCandidate = null; +// +// if(pred1 != null) { +// if(pred1.branchPlan != null) { +// // predecessor 1 has branching children, see if it got the branch we are looking for +// selectedCandidate = pred1.branchPlan.get(brancher); +// this.branchPlan.put(brancher, selectedCandidate); +// } +// } +// +// if(selectedCandidate == null && pred2 != null) { +// if(pred2.branchPlan != null) { +// // predecessor 2 has branching children, see if it got the branch we are looking for +// selectedCandidate = pred2.branchPlan.get(brancher); +// this.branchPlan.put(brancher, selectedCandidate); +// } +// } +// +// if (selectedCandidate == null) { +// throw new CompilerException( +// "Candidates for a node with open branches are missing information about the selected candidate "); +// } +// } +// } +// } // ------------------------------------------------------------------------ @@ -176,26 +177,6 @@ public abstract class TwoInputNode extends OptimizerNode public PactConnection getSecondInConn() { return this.input2; } - - /** - * Sets the PactConnection through which this node receives its first input. - * - * @param conn - * The first input connection. - */ - public void setFirstInConn(PactConnection conn) { - this.input1 = conn; - } - - /** - * Sets the PactConnection through which this node receives its second input. - * - * @param conn - * The second input connection. - */ - public void setSecondInConn(PactConnection conn) { - this.input2 = conn; - } /** * TODO @@ -326,27 +307,27 @@ public abstract class TwoInputNode extends OptimizerNode */ @Override final public List getAlternativePlans(CostEstimator estimator) { - // check if we have a cached version - if (this.cachedPlans != null) { - return this.cachedPlans; - } - - // step down to all producer nodes for first input and calculate alternative plans - List subPlans1 = this.getFirstPredNode().getAlternativePlans(estimator); - - // step down to all producer nodes for second input and calculate alternative plans - List subPlans2 = this.getSecondPredNode().getAlternativePlans(estimator); +// // check if we have a cached version +// if (this.cachedPlans != null) { +// return this.cachedPlans; +// } +// +// // step down to all producer nodes for first input and calculate alternative plans +// List subPlans1 = this.getFirstPredNode().getAlternativePlans(estimator); +// +// // step down to all producer nodes for second input and calculate alternative plans +// List subPlans2 = this.getSecondPredNode().getAlternativePlans(estimator); List outputPlans = new ArrayList(); - computeValidPlanAlternatives(subPlans1, subPlans2, estimator, outputPlans); - - // prune the plans - prunePlanAlternatives(outputPlans); - - // cache the result only if we have multiple outputs --> this function gets invoked multiple times - if (this.getOutConns() != null && this.getOutConns().size() > 1) { - this.cachedPlans = outputPlans; - } +// computeValidPlanAlternatives(subPlans1, subPlans2, estimator, outputPlans); +// +// // prune the plans +// prunePlanAlternatives(outputPlans); +// +// // cache the result only if we have multiple outputs --> this function gets invoked multiple times +// if (this.getOutConns() != null && this.getOutConns().size() > 1) { +// this.cachedPlans = outputPlans; +// } return outputPlans; } @@ -371,11 +352,7 @@ public abstract class TwoInputNode extends OptimizerNode * @return {@code true} if all values are valid, {@code false} otherwise */ protected boolean haveValidOutputEstimates(OptimizerNode subPlan) { - - if(subPlan.getEstimatedOutputSize() == -1) - return false; - else - return true; + return subPlan.getEstimatedOutputSize() != -1; } /* @@ -504,35 +481,40 @@ public abstract class TwoInputNode extends OptimizerNode * @param input The input for which key fields must be returned. * @return the key fields of the given input. */ - public FieldList getInputKeySet(int input) { + public OptimizerFieldSet getInputKeySet(int input) { switch(input) { - case 0: return keySet1; - case 1: return keySet2; - default: throw new IndexOutOfBoundsException(); + case 0: return keySet1; + case 1: return keySet2; + default: throw new IndexOutOfBoundsException(); } } - public boolean isFieldKept(int input, int fieldNumber) { - + /* (non-Javadoc) + * @see eu.stratosphere.pact.compiler.plan.OptimizerNode#isFieldConstant(int, int) + */ + @Override + public boolean isFieldConstant(int input, int fieldNumber) { switch(input) { case 0: if (this.constant1 == null) { if (this.notConstant1 == null) { return false; + } else { + return !this.notConstant1.contains(fieldNumber); } - return this.notConstant1.contains(fieldNumber) == false; + } else { + return this.constant1.contains(fieldNumber); } - - return this.constant1.contains(fieldNumber); case 1: if (this.constant2 == null) { if (this.notConstant2 == null) { return false; + } else { + return !this.notConstant2.contains(fieldNumber); } - return this.notConstant2.contains(fieldNumber) == false; + } else { + return this.constant2.contains(fieldNumber); } - - return this.constant2.contains(fieldNumber); default: throw new IndexOutOfBoundsException(); } diff --git a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/candidate/Channel.java b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/candidate/Channel.java index 2de97a077335ce084090bff1a64c8676e489f633..326a842f011c21e6bf5dc9237d7c44cf38083934 100644 --- a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/candidate/Channel.java +++ b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/candidate/Channel.java @@ -15,13 +15,16 @@ package eu.stratosphere.pact.compiler.plan.candidate; +import eu.stratosphere.pact.compiler.plan.EstimateProvider; +import eu.stratosphere.pact.runtime.shipping.ShipStrategy.ShipStrategyType; + /** * * * @author Stephan Ewen */ -public class Channel +public class Channel implements EstimateProvider { /** * Enumeration to indicate the mode of temporarily materializing the data that flows across a connection. @@ -38,6 +41,8 @@ public class Channel private final PlanNode target = null; + private ShipStrategyType shipStrategy; + private TempMode tempMode; private int replicationFactor; @@ -50,7 +55,7 @@ public class Channel * @return The source. */ public PlanNode getSource() { - return source; + return this.source; } /** @@ -59,7 +64,11 @@ public class Channel * @return The target. */ public PlanNode getTarget() { - return target; + return this.target; + } + + public ShipStrategyType getShipStrategy() { + return this.shipStrategy; } /** diff --git a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/candidate/PlanNode.java b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/candidate/PlanNode.java index ff490d31849fbb479cd822b94c8f2814360b8cd5..ad52645f9e78783878d2012927be162580deb6e4 100644 --- a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/candidate/PlanNode.java +++ b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/candidate/PlanNode.java @@ -154,10 +154,20 @@ public abstract class PlanNode implements Visitable } } + public int getDegreeOfParallelism() { + return 1; + } + + public long getTotalAvailableMemory() { + return 3 * 1024 * 1024 * getDegreeOfParallelism(); // mock: 3 MiBytes per task + } + // -------------------------------------------------------------------------------------------- // // -------------------------------------------------------------------------------------------- + public abstract Iterator getInputs(); + public abstract Iterator getPredecessors(); // -------------------------------------------------------------------------------------------- diff --git a/pact/pact-compiler/src/test/java/eu/stratosphere/pact/compiler/BranchingPlansCompilerTest.java b/pact/pact-compiler/src/test/java/eu/stratosphere/pact/compiler/BranchingPlansCompilerTest.java index 2333bafbe7090c203aba0b57be7310f27928f150..229c76d4d17be6c260c32c66b8933eaa22e05c92 100644 --- a/pact/pact-compiler/src/test/java/eu/stratosphere/pact/compiler/BranchingPlansCompilerTest.java +++ b/pact/pact-compiler/src/test/java/eu/stratosphere/pact/compiler/BranchingPlansCompilerTest.java @@ -41,7 +41,7 @@ import eu.stratosphere.pact.common.contract.MatchContract; import eu.stratosphere.pact.common.contract.ReduceContract; import eu.stratosphere.pact.common.plan.Plan; import eu.stratosphere.pact.common.type.base.PactInteger; -import eu.stratosphere.pact.compiler.costs.FixedSizeClusterCostEstimator; +import eu.stratosphere.pact.compiler.costs.DefaultCostEstimator; import eu.stratosphere.pact.compiler.jobgen.JobGraphGenerator; import eu.stratosphere.pact.compiler.plan.OptimizedPlan; import eu.stratosphere.pact.compiler.util.DummyCoGroupStub; @@ -83,7 +83,7 @@ public class BranchingPlansCompilerTest { // prepare the statistics DataStatistics dataStats = new DataStatistics(); - this.compiler = new PactCompiler(dataStats, new FixedSizeClusterCostEstimator(), dummyAddress); + this.compiler = new PactCompiler(dataStats, new DefaultCostEstimator(), dummyAddress); } catch (Exception ex) { ex.printStackTrace(); diff --git a/pact/pact-compiler/src/test/java/eu/stratosphere/pact/compiler/PartitionDOPChangeTest.java b/pact/pact-compiler/src/test/java/eu/stratosphere/pact/compiler/PartitionDOPChangeTest.java index 1e2430bead82bcf3932c27262e9e5ac09544693c..735cc86b0c0fd6acbeef2d8fba7911fedc1b661b 100644 --- a/pact/pact-compiler/src/test/java/eu/stratosphere/pact/compiler/PartitionDOPChangeTest.java +++ b/pact/pact-compiler/src/test/java/eu/stratosphere/pact/compiler/PartitionDOPChangeTest.java @@ -36,7 +36,7 @@ import eu.stratosphere.pact.common.contract.ReduceContract; import eu.stratosphere.pact.common.plan.Plan; import eu.stratosphere.pact.common.plan.Visitor; import eu.stratosphere.pact.common.type.base.PactInteger; -import eu.stratosphere.pact.compiler.costs.FixedSizeClusterCostEstimator; +import eu.stratosphere.pact.compiler.costs.DefaultCostEstimator; import eu.stratosphere.pact.compiler.jobgen.JobGraphGenerator; import eu.stratosphere.pact.compiler.plan.MatchNode; import eu.stratosphere.pact.compiler.plan.OptimizedPlan; @@ -75,7 +75,7 @@ public class PartitionDOPChangeTest { // prepare the statistics DataStatistics dataStats = new DataStatistics(); - this.compiler = new PactCompiler(dataStats, new FixedSizeClusterCostEstimator(), dummyAddress); + this.compiler = new PactCompiler(dataStats, new DefaultCostEstimator(), dummyAddress); } catch (Exception ex) { ex.printStackTrace(); diff --git a/pact/pact-compiler/src/test/java/eu/stratosphere/pact/compiler/UnionPropertyPropagationTest.java b/pact/pact-compiler/src/test/java/eu/stratosphere/pact/compiler/UnionPropertyPropagationTest.java index db8ecab42c36acc198ea98681c4e10575a882ef3..b7163d793323bd83bd8848d2388f25218fee4131 100644 --- a/pact/pact-compiler/src/test/java/eu/stratosphere/pact/compiler/UnionPropertyPropagationTest.java +++ b/pact/pact-compiler/src/test/java/eu/stratosphere/pact/compiler/UnionPropertyPropagationTest.java @@ -35,7 +35,7 @@ import eu.stratosphere.pact.common.contract.ReduceContract; import eu.stratosphere.pact.common.plan.Plan; import eu.stratosphere.pact.common.plan.Visitor; import eu.stratosphere.pact.common.type.base.PactInteger; -import eu.stratosphere.pact.compiler.costs.FixedSizeClusterCostEstimator; +import eu.stratosphere.pact.compiler.costs.DefaultCostEstimator; import eu.stratosphere.pact.compiler.jobgen.JobGraphGenerator; import eu.stratosphere.pact.compiler.plan.OptimizedPlan; import eu.stratosphere.pact.compiler.plan.OptimizerNode; @@ -73,7 +73,7 @@ public class UnionPropertyPropagationTest { // prepare the statistics DataStatistics dataStats = new DataStatistics(); - this.compiler = new PactCompiler(dataStats, new FixedSizeClusterCostEstimator(), dummyAddress); + this.compiler = new PactCompiler(dataStats, new DefaultCostEstimator(), dummyAddress); } catch (Exception ex) { ex.printStackTrace(); diff --git a/sopremo/sopremo-server/src/main/java/eu/stratosphere/sopremo/server/SopremoExecutionThread.java b/sopremo/sopremo-server/src/main/java/eu/stratosphere/sopremo/server/SopremoExecutionThread.java index b3a447566eba2bb34ebf1b8ac7d83ef24f2bb827..c0f2fd4d8b01276e1843d148661d503a9b1a1d12 100644 --- a/sopremo/sopremo-server/src/main/java/eu/stratosphere/sopremo/server/SopremoExecutionThread.java +++ b/sopremo/sopremo-server/src/main/java/eu/stratosphere/sopremo/server/SopremoExecutionThread.java @@ -29,7 +29,7 @@ import eu.stratosphere.nephele.util.StringUtils; import eu.stratosphere.pact.common.plan.Plan; import eu.stratosphere.pact.compiler.DataStatistics; import eu.stratosphere.pact.compiler.PactCompiler; -import eu.stratosphere.pact.compiler.costs.FixedSizeClusterCostEstimator; +import eu.stratosphere.pact.compiler.costs.DefaultCostEstimator; import eu.stratosphere.pact.compiler.jobgen.JobGraphGenerator; import eu.stratosphere.pact.compiler.plan.OptimizedPlan; import eu.stratosphere.sopremo.execution.ExecutionResponse.ExecutionState; @@ -154,7 +154,7 @@ public class SopremoExecutionThread implements Runnable { JobGraph getJobGraph(final Plan pactPlan) { PactCompiler compiler = - new PactCompiler(new DataStatistics(), new FixedSizeClusterCostEstimator(), this.jobManagerAddress); + new PactCompiler(new DataStatistics(), new DefaultCostEstimator(), this.jobManagerAddress); final OptimizedPlan optPlan = compiler.compile(pactPlan); JobGraphGenerator gen = new JobGraphGenerator();