diff --git a/pact/pact-common/src/main/java/eu/stratosphere/pact/common/contract/Ordering.java b/pact/pact-common/src/main/java/eu/stratosphere/pact/common/contract/Ordering.java index 82fb9033b95681f912e896fad252c974aa3a4d2f..5031ea7df32a22485d5056132dff7a0c4507d2ef 100644 --- a/pact/pact-common/src/main/java/eu/stratosphere/pact/common/contract/Ordering.java +++ b/pact/pact-common/src/main/java/eu/stratosphere/pact/common/contract/Ordering.java @@ -19,7 +19,6 @@ import java.util.ArrayList; import eu.stratosphere.pact.common.type.Key; import eu.stratosphere.pact.common.util.FieldList; -import eu.stratosphere.pact.common.util.FieldSet; /** * @@ -164,10 +163,6 @@ public class Ordering return true; } - public boolean groupsFieldSet(FieldSet fieldSet) { - return fieldSet.isValidSubset(this.indexes); - } - /** * Creates a new ordering the represents an ordering on a prefix of the fields. If the * exclusive index up to which to create the ordering is 0, then there is diff --git a/pact/pact-common/src/main/java/eu/stratosphere/pact/common/util/FieldSet.java b/pact/pact-common/src/main/java/eu/stratosphere/pact/common/util/FieldSet.java index 13bfb3264668f549034ad61b8bfd8c922e8045db..ee6fca801153636b482a06d1b5b2990fe5a0a1b6 100644 --- a/pact/pact-common/src/main/java/eu/stratosphere/pact/common/util/FieldSet.java +++ b/pact/pact-common/src/main/java/eu/stratosphere/pact/common/util/FieldSet.java @@ -97,6 +97,15 @@ public class FieldSet implements Iterable return this.collection.iterator(); } + public int[] toArray() { + int[] a = new int[this.collection.size()]; + int i = 0; + for (int col : this.collection) { + a[i++] = col; + } + return a; + } + // -------------------------------------------------------------------------------------------- /** diff --git a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/ColumnWithType.java b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/ColumnWithType.java deleted file mode 100644 index 4b4d17bc0a672a339a3a4b7490b0537d85cbecd1..0000000000000000000000000000000000000000 --- a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/ColumnWithType.java +++ /dev/null @@ -1,88 +0,0 @@ -/*********************************************************************************************************************** - * - * Copyright (C) 2012 by the Stratosphere project (http://stratosphere.eu) - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - * - **********************************************************************************************************************/ - -package eu.stratosphere.pact.compiler; - -import eu.stratosphere.pact.common.type.Key; - - -/** - * - */ -public class ColumnWithType -{ - private final Class columnType; - - private final int columnIndex; - - public ColumnWithType(int columnIndex, Class columnType) { - this.columnType = columnType; - this.columnIndex = columnIndex; - } - - /** - * Gets the columnType from this column. - * - * @return The column type. - */ - public Class getColumnType() { - return columnType; - } - - /** - * Gets the column index from this column. - * - * @return The column index. - */ - public int getColumnIndex() { - return columnIndex; - } - - /* (non-Javadoc) - * @see java.lang.Object#hashCode() - */ - @Override - public int hashCode() { - return this.columnIndex; - } - - /* (non-Javadoc) - * @see java.lang.Object#equals(java.lang.Object) - */ - @Override - public boolean equals(Object obj) - { - if (obj != null && obj instanceof ColumnWithType) { - final ColumnWithType cwt = (ColumnWithType) obj; - if (this.columnIndex == cwt.columnIndex) { - if (this.columnType != cwt.columnType) { - // DEBUG ONLY!!! Should never happen, if used properly. - throw new RuntimeException("Comparing columns with equal position but incompatible type."); - } - return true; - } - } - - return false; - } - - /* (non-Javadoc) - * @see java.lang.Object#toString() - */ - @Override - public String toString() { - return "Column " + this.columnIndex + " (" + this.columnType.getName() + ')'; - } -} \ No newline at end of file diff --git a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/CompilerException.java b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/CompilerException.java index 89003a2481139966807667bfafc7cf57e30f1bf1..2172bde6862682714923db6bb49cb89e70f468bd 100644 --- a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/CompilerException.java +++ b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/CompilerException.java @@ -18,7 +18,7 @@ package eu.stratosphere.pact.compiler; /** * An exception that is thrown by the pact compiler when encountering an illegal condition. * - * @author Stephan Ewen (stephan.ewen@tu-berlin.de) + * @author Stephan Ewen */ public class CompilerException extends RuntimeException { @@ -64,5 +64,4 @@ public class CompilerException extends RuntimeException { public CompilerException(String message, Throwable cause) { super(message, cause); } - } diff --git a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/GlobalProperties.java b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/GlobalProperties.java index 70cdeff093e484faa6d3c9810464a2f195d968ee..abb77a9f3584d7d5f1c44f268180be9a7a87469f 100644 --- a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/GlobalProperties.java +++ b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/GlobalProperties.java @@ -17,7 +17,10 @@ package eu.stratosphere.pact.compiler; import eu.stratosphere.pact.common.contract.Order; import eu.stratosphere.pact.common.contract.Ordering; +import eu.stratosphere.pact.common.util.FieldSet; import eu.stratosphere.pact.compiler.plan.OptimizerNode; +import eu.stratosphere.pact.compiler.plan.candidate.Channel; +import eu.stratosphere.pact.runtime.shipping.ShipStrategyType; /** * This class represents global properties of the data. Global properties are properties that @@ -28,9 +31,9 @@ import eu.stratosphere.pact.compiler.plan.OptimizerNode; */ public final class GlobalProperties implements Cloneable { - private PartitionProperty partitioning; // the type partitioning + private PartitioningProperty partitioning; // the type partitioning - private OptimizerFieldSet partitioningFields; // the fields which are partitioned + private FieldSet partitioningFields; // the fields which are partitioned private Ordering ordering; // order of the partitioned fields, if it is an ordered (range) range partitioning @@ -40,52 +43,65 @@ public final class GlobalProperties implements Cloneable * Initializes the global properties with no partitioning. */ public GlobalProperties() { - this.partitioning = PartitionProperty.NONE; - } - - /** - * @param partitioning - * @param ordering - */ - public GlobalProperties(PartitionProperty partitioning, Ordering ordering) { - this.partitioning = partitioning; - this.ordering = ordering; + this.partitioning = PartitioningProperty.RANDOM; } /** - * @param partitioning - * @param partitioningFields + * @param partitioning2 + * @param newFields */ - public GlobalProperties(PartitionProperty partitioning, OptimizerFieldSet partitioningFields) { + private GlobalProperties(PartitioningProperty partitioning, FieldSet fields) { this.partitioning = partitioning; - this.partitioningFields = partitioningFields; + this.partitioningFields = fields; } - + // -------------------------------------------------------------------------------------------- /** * Sets the partitioning property for the global properties. * - * @param partitioning - * The new partitioning to set. + * @param partitioning The new partitioning to set. + * @param partitionedFields */ - public void setPartitioning(PartitionProperty partitioning, OptimizerFieldSet partitionedFields) { - this.partitioning = partitioning; + public void setHashPartitioned(FieldSet partitionedFields) { + this.partitioning = PartitioningProperty.HASH_PARTITIONED; this.partitioningFields = partitionedFields; + this.ordering = null; } - public void setPartitioning(PartitionProperty partitioning, Ordering ordering) { - this.partitioning = partitioning; + public void setRangePartitioned(Ordering ordering) { + this.partitioning = PartitioningProperty.RANGE_PARTITIONED; this.ordering = ordering; + this.partitioningFields = null; } + + public void setAnyPartitioning(FieldSet partitionedFields) { + this.partitioning = PartitioningProperty.ANY_PARTITIONING; + this.partitioningFields = partitionedFields; + this.ordering = null; + } + + public void setRandomDistribution() { + this.partitioning = PartitioningProperty.RANDOM; + this.partitioningFields = null; + this.ordering = null; + } + + public void setFullyReplicated() { + this.partitioning = PartitioningProperty.FULL_REPLICATION; + this.partitioningFields = null; + this.ordering = null; + } + + /** * Gets the partitioning property. * * @return The partitioning property. */ - public PartitionProperty getPartitioning() { + public PartitioningProperty getPartitioning() { return partitioning; } @@ -94,7 +110,7 @@ public final class GlobalProperties implements Cloneable * * @return The partitioning fields. */ - public OptimizerFieldSet getPartitionedFields() { + public FieldSet getPartitionedFields() { return this.partitioningFields; } @@ -111,14 +127,14 @@ public final class GlobalProperties implements Cloneable * Checks, if the properties in this object are trivial, i.e. only standard values. */ public boolean isTrivial() { - return partitioning == PartitionProperty.NONE; + return partitioning == PartitioningProperty.RANDOM; } /** * This method resets the properties to a state where no properties are given. */ public void reset() { - this.partitioning = PartitionProperty.NONE; + this.partitioning = PartitioningProperty.RANDOM; this.ordering = null; this.partitioningFields = null; } @@ -140,8 +156,8 @@ public final class GlobalProperties implements Cloneable } } } else if (this.partitioningFields != null) { - for (ColumnWithType col : this.partitioningFields) { - if (!node.isFieldConstant(input, col.getColumnIndex())) { + for (int colIndex : this.partitioningFields) { + if (!node.isFieldConstant(input, colIndex)) { return null; } } @@ -163,17 +179,17 @@ public final class GlobalProperties implements Cloneable else if (partitioningFields != null) { boolean allIn = true; boolean atLeasOneIn = false; - for (ColumnWithType col : this.partitioningFields) { - boolean res = node.isFieldConstant(input, col.getColumnIndex()); + for (int col : this.partitioningFields) { + boolean res = node.isFieldConstant(input, col); allIn &= res; atLeasOneIn |= res; } if (allIn) { return this; } else if (atLeasOneIn) { - OptimizerFieldSet newFields = new OptimizerFieldSet(); - for (ColumnWithType nc : this.partitioningFields) { - if (node.isFieldConstant(input, nc.getColumnIndex())) { + FieldSet newFields = new FieldSet(); + for (Integer nc : this.partitioningFields) { + if (node.isFieldConstant(input, nc)) { newFields.add(nc); } } @@ -196,12 +212,16 @@ public final class GlobalProperties implements Cloneable */ public boolean isMetBy(GlobalProperties other) { - if (this.partitioning == PartitionProperty.NONE) { + if (this.partitioning == PartitioningProperty.FULL_REPLICATION || other.partitioning == PartitioningProperty.FULL_REPLICATION) { + return other.partitioning == this.partitioning; + } + + if (this.partitioning == PartitioningProperty.RANDOM) { return true; } - if (this.partitioning == PartitionProperty.ANY) { - if (other.partitioning == PartitionProperty.NONE) { + if (this.partitioning == PartitioningProperty.ANY_PARTITIONING) { + if (other.partitioning == PartitioningProperty.RANDOM) { return false; } } else if (other.partitioning != this.partitioning) { @@ -233,6 +253,30 @@ public final class GlobalProperties implements Cloneable throw new RuntimeException("Found a partitioning property, but no fields."); } } + + /** + * Parameterizes the ship strategy fields of a channel such that the channel produces the desired global properties. + * + * @param channel The channel to parameterize. + */ + public void parameterizeChannel(Channel channel) + { + if (this.partitioning == null || this.partitioning == PartitioningProperty.RANDOM) { + channel.setShipStrategy(ShipStrategyType.FORWARD); + } else switch (this.partitioning) { + case FULL_REPLICATION: + channel.setShipStrategy(ShipStrategyType.BROADCAST); + case ANY_PARTITIONING: + case HASH_PARTITIONED: + channel.setShipStrategy(ShipStrategyType.PARTITION_HASH, Utils.createOrderedFromSet(this.partitioningFields)); + break; + case RANGE_PARTITIONED: + channel.setShipStrategy(ShipStrategyType.PARTITION_RANGE, this.ordering.getInvolvedIndexes(), this.ordering.getFieldSortDirections()); + break; + default: + throw new CompilerException(); + } + } // ------------------------------------------------------------------------ diff --git a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/LocalProperties.java b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/LocalProperties.java index 2dc8e517ead344f0d0f419c3ed95f42cc4632615..b5afe7167aa673a5ce17f9bd47f66799bd0653f9 100644 --- a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/LocalProperties.java +++ b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/LocalProperties.java @@ -19,6 +19,8 @@ import eu.stratosphere.pact.common.contract.Ordering; import eu.stratosphere.pact.common.util.FieldList; import eu.stratosphere.pact.common.util.FieldSet; import eu.stratosphere.pact.compiler.plan.OptimizerNode; +import eu.stratosphere.pact.compiler.plan.candidate.Channel; +import eu.stratosphere.pact.runtime.task.util.TaskConfig.LocalStrategy; /** * This class represents local properties of the data. A local property is a property that exists @@ -28,9 +30,9 @@ import eu.stratosphere.pact.compiler.plan.OptimizerNode; */ public final class LocalProperties implements Cloneable { - private Ordering ordering; // order inside a partition, null if not ordered + private Ordering ordering; // order inside a partition, null if not ordered - private OptimizerFieldSet groupedFields; // fields by which the stream is grouped. null if not grouped. + private FieldSet groupedFields; // fields by which the stream is grouped. null if not grouped. private FieldSet uniqueFields; // fields whose value combination is unique in the stream @@ -52,7 +54,7 @@ public final class LocalProperties implements Cloneable * @param groupedFields The grouped fields for these local properties. * @param uniqueFields The unique fields for these local properties. */ - private LocalProperties(Ordering ordering, OptimizerFieldSet groupedFields, FieldSet uniqueFields) { + private LocalProperties(Ordering ordering, FieldSet groupedFields, FieldSet uniqueFields) { this.ordering = ordering; this.groupedFields = groupedFields; this.uniqueFields = uniqueFields; @@ -84,7 +86,7 @@ public final class LocalProperties implements Cloneable * * @return The grouped fields, or null if nothing is grouped. */ - public OptimizerFieldSet getGroupedFields() { + public FieldSet getGroupedFields() { return this.groupedFields; } @@ -93,7 +95,7 @@ public final class LocalProperties implements Cloneable * * @param groupedFields The fields that are grouped in these data properties. */ - public void setGroupedFields(OptimizerFieldSet groupedFields) { + public void setGroupedFields(FieldSet groupedFields) { this.groupedFields = groupedFields; } @@ -226,11 +228,29 @@ public final class LocalProperties implements Cloneable if (this.uniqueFields != null) { // we demand field uniqueness - throw new RuntimeException("Uniqueness as a required property is not supported."); + throw new CompilerException("Uniqueness as a required property is not supported."); } return true; } + + /** + * Parameterizes the local strategy fields of a channel such that the channel produces the desired local properties. + * + * @param channel The channel to parameterize. + */ + public void parameterizeChannel(Channel channel) + { + if (this.uniqueFields != null) { + throw new CompilerException("Uniqueness as a required property is not supported."); + } + + if (this.ordering != null) { + channel.setLocalStrategy(LocalStrategy.SORT, this.ordering.getInvolvedIndexes(), this.ordering.getFieldSortDirections()); + } else if (this.groupedFields != null) { + channel.setLocalStrategy(LocalStrategy.SORT, Utils.createOrderedFromSet(this.groupedFields)); + } + } // ------------------------------------------------------------------------ @@ -286,23 +306,14 @@ public final class LocalProperties implements Cloneable public LocalProperties clone() { LocalProperties newProps = new LocalProperties(); if (this.ordering != null) { - newProps.ordering = this.ordering.clone(); + newProps.ordering = this.ordering.clone(); } if (this.groupedFields != null) { - newProps.groupedFields = this.groupedFields.clone(); + newProps.groupedFields = this.groupedFields.clone(); } if (this.uniqueFields != null) { - newProps.uniqueFields = this.uniqueFields.clone(); + newProps.uniqueFields = this.uniqueFields.clone(); } return newProps; } - - /** - * Convenience method to create copies without the cloning exception. - * - * @return A perfect deep copy of this object. - */ - public final LocalProperties createCopy() { - return this.clone(); - } } diff --git a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/OptimizerFieldList.java b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/OptimizerFieldList.java deleted file mode 100644 index 8b0d89d6574a9d28ca565ff26d5d0bd4d85c3899..0000000000000000000000000000000000000000 --- a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/OptimizerFieldList.java +++ /dev/null @@ -1,115 +0,0 @@ -/*********************************************************************************************************************** - * - * Copyright (C) 2012 by the Stratosphere project (http://stratosphere.eu) - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - * - **********************************************************************************************************************/ - -package eu.stratosphere.pact.compiler; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; - -import eu.stratosphere.pact.common.contract.Ordering; - - -/** - * - */ -public class OptimizerFieldList extends OptimizerFieldSet -{ - public OptimizerFieldList() { - super(); - } - - // -------------------------------------------------------------------------------------------- - - public ColumnWithType get(int pos) { - return get().get(pos); - } - - // -------------------------------------------------------------------------------------------- - - /* (non-Javadoc) - * @see eu.stratosphere.pact.common.util.FieldSet#isValidSubset(eu.stratosphere.pact.common.util.FieldSet) - */ - @Override - public boolean isValidSubset(OptimizerFieldSet set) { - if (set instanceof OptimizerFieldList) { - return (isValidSubset((OptimizerFieldList) set)); - } else { - return false; - } - } - - public boolean isValidSubset(OptimizerFieldList list) { - if (list.size() > size()) { - return false; - } - final List myList = get(); - final List theirList = list.get(); - for (int i = 0; i < theirList.size(); i++) { - ColumnWithType myInt = myList.get(i); - ColumnWithType theirInt = theirList.get(i); - if (!myInt.equals(theirInt)) { - return false; - } - } - return true; - } - - public boolean isValidUnorderedPrefix(OptimizerFieldSet set) { - if (set.size() > size()) { - return false; - } - - List list = get(); - for (int i = 0; i < set.size(); i++) { - if (!set.contains(list.get(i))) { - return false; - } - } - return true; - } - - // -------------------------------------------------------------------------------------------- - - /* (non-Javadoc) - * @see eu.stratosphere.pact.common.util.AbstractFieldSet#initCollection() - */ - @Override - protected Collection initCollection() { - return new ArrayList(); - } - - /* (non-Javadoc) - * @see eu.stratosphere.pact.common.util.FieldSet#getDescriptionPrefix() - */ - @Override - protected String getDescriptionPrefix() { - return "Field List"; - } - - private List get() { - return (List) this.collection; - } - - // -------------------------------------------------------------------------------------------- - - public static OptimizerFieldList getFromOrdering(Ordering ordering) { - final OptimizerFieldList list = new OptimizerFieldList(); - for (int i = 0; i < ordering.getNumberOfFields(); i++) { - list.add(ordering.getFieldNumber(i), ordering.getType(i)); - } - return list; - } -} diff --git a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/OptimizerFieldSet.java b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/OptimizerFieldSet.java deleted file mode 100644 index 514d2523c0cb58c3ed3dbedbc7a8297de291655f..0000000000000000000000000000000000000000 --- a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/OptimizerFieldSet.java +++ /dev/null @@ -1,158 +0,0 @@ -/*********************************************************************************************************************** - * - * Copyright (C) 2012 by the Stratosphere project (http://stratosphere.eu) - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - * - **********************************************************************************************************************/ - -package eu.stratosphere.pact.compiler; - -import java.util.Collection; -import java.util.HashSet; -import java.util.Iterator; - -import eu.stratosphere.pact.common.type.Key; - - -/** - * - */ -public class OptimizerFieldSet implements Iterable -{ - protected final Collection collection; - - // -------------------------------------------------------------------------------------------- - - public OptimizerFieldSet() { - this.collection = initCollection(); - } - - // -------------------------------------------------------------------------------------------- - - public void add(ColumnWithType columnIndex) { - this.collection.add(columnIndex); - } - - public void add(int columnIndex, Class columnType) { - add(new ColumnWithType(columnIndex, columnType)); - } - - public void addAll(Collection columnIndexes) { - this.collection.addAll(columnIndexes); - } - - public void addAll(OptimizerFieldSet set) { - for (ColumnWithType i : set) { - add(i); - } - } - - public boolean contains(ColumnWithType columnIndex) { - return this.collection.contains(columnIndex); - } - - public int size() { - return this.collection.size(); - } - - /* (non-Javadoc) - * @see java.lang.Iterable#iterator() - */ - @Override - public Iterator iterator() { - return this.collection.iterator(); - } - - // -------------------------------------------------------------------------------------------- - - /** - * Checks if the given set of fields is a valid subset of this set of fields. For unordered - * sets, this is the case if all of the given set's fields are also part of this field. - *

- * Subclasses that describe field sets where the field order matters must override this method - * to implement a field ordering sensitive check. - * - * @param set The set that is a candidate subset. - * @return True, if the given set is a subset of this set, false otherwise. - */ - public boolean isValidSubset(OptimizerFieldSet set) { - if (set.size() > size()) { - return false; - } - for (ColumnWithType i : set) { - if (!contains(i)) { - return false; - } - } - return true; - } - - // -------------------------------------------------------------------------------------------- - - /* (non-Javadoc) - * @see java.lang.Object#hashCode() - */ - @Override - public int hashCode() { - return this.collection.hashCode(); - } - - /* (non-Javadoc) - * @see java.lang.Object#equals(java.lang.Object) - */ - @Override - public boolean equals(Object obj) { - if (obj == null) { - return false; - } - if (obj instanceof OptimizerFieldSet) { - return this.collection.equals(((OptimizerFieldSet) obj).collection); - } else { - return false; - } - } - - /* (non-Javadoc) - * @see java.lang.Object#toString() - */ - @Override - public String toString() { - StringBuilder bld = new StringBuilder(); - bld.append(getDescriptionPrefix()).append(' ').append('['); - for (ColumnWithType i : this.collection) { - bld.append(i); - bld.append(','); - bld.append(' '); - } - bld.setLength(bld.length() - 2); - bld.append(']'); - return bld.toString(); - } - - /* (non-Javadoc) - * @see java.lang.Object#clone() - */ - public OptimizerFieldSet clone() { - OptimizerFieldSet set = new OptimizerFieldSet(); - set.addAll(this.collection); - return set; - } - - // -------------------------------------------------------------------------------------------- - - protected Collection initCollection() { - return new HashSet(); - } - - protected String getDescriptionPrefix() { - return "Field Set"; - } -} diff --git a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/PactCompiler.java b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/PactCompiler.java index 47ac974acbec033b398d69b28bfc2d619d4d349c..9225274f0e494a19ac9cc17592d93206bd8330f3 100644 --- a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/PactCompiler.java +++ b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/PactCompiler.java @@ -57,6 +57,7 @@ import eu.stratosphere.pact.compiler.plan.OptimizedPlan; import eu.stratosphere.pact.compiler.plan.OptimizerNode; import eu.stratosphere.pact.compiler.plan.ReduceNode; import eu.stratosphere.pact.compiler.plan.SinkJoiner; +import eu.stratosphere.pact.compiler.plan.candidate.PlanNode; /** * The optimizer that takes the user specified pact plan and creates an optimized plan that contains @@ -670,13 +671,13 @@ public class PactCompiler { BranchesVisitor branchingVisitor = new BranchesVisitor(); rootNode.accept(branchingVisitor); -// -// // the final step is now to generate the actual plan alternatives -// List bestPlan = rootNode.getAlternativePlans(this.costEstimator); -// -// if (bestPlan.size() != 1) { -// throw new CompilerException("Error in compiler: more than one best plan was created!"); -// } + + // the final step is now to generate the actual plan alternatives + List bestPlan = rootNode.getAlternativePlans(this.costEstimator); + + if (bestPlan.size() != 1) { + throw new CompilerException("Error in compiler: more than one best plan was created!"); + } // // // check if the best plan's root is a data sink (single sink plan) // // if so, directly take it. if it is a sink joiner node, get its contained sinks @@ -899,7 +900,7 @@ public class PactCompiler { // that computation must happen during the last descend. if (node.haveAllOutputConnectionInterestingProperties() && node.getInterestingProperties() == null) { - node.computeInterestingProperties(); + node.computeUnionOfInterestingPropertiesFromSuccessors(); node.computeInterestingPropertiesForInputs(this.estimator); return true; } else { diff --git a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/PartitionProperty.java b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/PartitioningProperty.java similarity index 67% rename from pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/PartitionProperty.java rename to pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/PartitioningProperty.java index 537d12b609a61d7947fb4459fa04fd358d9fad81..196718790159ca93694f22d0807cb298af2b3bbc 100644 --- a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/PartitionProperty.java +++ b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/PartitioningProperty.java @@ -16,15 +16,16 @@ package eu.stratosphere.pact.compiler; /** - * An enumeration tracking the different types of partitioning. + * An enumeration tracking the different types of sharding strategies. * - * @author Stephan Ewen (stephan.ewen@tu-berlin.de) + * @author Stephan Ewen */ -public enum PartitionProperty { +public enum PartitioningProperty { + /** - * Constant indicating no partitioning. + * Constant indicating no particular partitioning (i.e. random) */ - NONE, + RANDOM, /** * Constant indicating a hash partitioning. @@ -39,17 +40,41 @@ public enum PartitionProperty { /** * Constant indicating any not further specified partitioning. */ - ANY; + ANY_PARTITIONING, + + /** + * Constant indicating full replication of the data. + */ + FULL_REPLICATION; /** * Checks, if this property represents in fact a partitioning. That is, - * whether this property is not equal to PartitionProperty.NONE. + * whether this property is not equal to PartitionProperty.FULL_REPLICATION. * - * @return True, if this enum constant is unequal to PartitionProperty.NONE, + * @return True, if this enum constant is unequal to PartitionProperty.FULL_REPLICATION, * false otherwise. */ public boolean isPartitioned() { - return this != PartitionProperty.NONE; + return this != FULL_REPLICATION; + } + + /** + * Checks, if this property represents a full replication. + * + * @return True, if this enum constant is equal to PartitionProperty.FULL_REPLICATION, + * false otherwise. + */ + public boolean isReplication() { + return this == FULL_REPLICATION; + } + + /** + * Checks if this property presents a partitioning that is not random, but on a partitioning key. + * + * @return True, if the data is partitioned on a key. + */ + public boolean isPartitionedOnKey() { + return isPartitioned() && this != RANDOM; } /** @@ -63,17 +88,6 @@ public enum PartitionProperty { * @return True, if this enum constant is a re-computable partitioning. */ public boolean isComputablyPartitioned() { - return this == HASH_PARTITIONED || this == PartitionProperty.RANGE_PARTITIONED; - } - - /** - * Checks whether this partition property is compatible with another one. - * - * @param other - * The other partition property to check against. - * @return True, if this partition property is compatible with the other, false if not. - */ - public boolean isCompatibleWith(PartitionProperty other) { - return other == this && this != ANY && this != NONE; + return this == HASH_PARTITIONED || this == RANGE_PARTITIONED; } } diff --git a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/Utils.java b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/Utils.java new file mode 100644 index 0000000000000000000000000000000000000000..df7ed6e5207340342088339e5ef7152fc8cb0368 --- /dev/null +++ b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/Utils.java @@ -0,0 +1,43 @@ +/*********************************************************************************************************************** + * + * Copyright (C) 2012 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + **********************************************************************************************************************/ + +package eu.stratosphere.pact.compiler; + +import java.util.Arrays; + +import eu.stratosphere.pact.common.util.FieldList; +import eu.stratosphere.pact.common.util.FieldSet; + + +/** + * + * + * @author Stephan Ewen + */ +public class Utils +{ + public static final FieldList createOrderedFromSet(FieldSet set) { + final int[] cols = set.toArray(); + Arrays.sort(cols); + return new FieldList(cols); + } + + // -------------------------------------------------------------------------------------------- + + /** + * No instantiation. + */ + private Utils() {} +} diff --git a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/costs/CostEstimator.java b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/costs/CostEstimator.java index 138d765759dc66cf037ba198656b6509af029eee..41a9431ad3f2e8472c6c235e70d953c6c3a96d29 100644 --- a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/costs/CostEstimator.java +++ b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/costs/CostEstimator.java @@ -83,8 +83,6 @@ public abstract class CostEstimator { case BROADCAST: addBroadcastCost(channel, channel.getReplicationFactor(), costs); break; - case SFR: - throw new CompilerException("Symmetric-Fragment-And-Replicate Strategy currently not supported."); default: throw new CompilerException("Unknown shipping strategy for input: " + channel.getShipStrategy()); } diff --git a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/jobgen/JSONGenerator.java b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/jobgen/JSONGenerator.java index 507ee1760d68fb2eb2d35c65791b948a0b6a2868..009ab5213383757445c3ae4240395b3e8628072c 100644 --- a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/jobgen/JSONGenerator.java +++ b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/jobgen/JSONGenerator.java @@ -30,7 +30,7 @@ import eu.stratosphere.pact.common.util.FieldSet; import eu.stratosphere.pact.compiler.CompilerException; import eu.stratosphere.pact.compiler.GlobalProperties; import eu.stratosphere.pact.compiler.LocalProperties; -import eu.stratosphere.pact.compiler.PartitionProperty; +import eu.stratosphere.pact.compiler.PartitioningProperty; import eu.stratosphere.pact.compiler.plan.OptimizedPlan; import eu.stratosphere.pact.compiler.plan.OptimizerNode; import eu.stratosphere.pact.compiler.plan.PactConnection; @@ -345,7 +345,7 @@ public class JSONGenerator implements Visitor { this.jsonString.append(",\n\t\t\"global_properties\": [\n"); addProperty(jsonString, "Partitioning", gp.getPartitioning().name(), true); - if (gp.getPartitioning() != PartitionProperty.NONE) { + if (gp.getPartitioning() != PartitioningProperty.NONE) { addProperty(jsonString, "Partitioned on", gp.getPartitionedFields().toString(), false); } if (gp.getOrdering() != null) { diff --git a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/CoGroupNode.java b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/CoGroupNode.java index 1bcb9c29f143b25271616dfabf3c8e8e830462b0..436fbf71b2108f9b7f6147d72b7265b0d15aaea8 100644 --- a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/CoGroupNode.java +++ b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/CoGroupNode.java @@ -33,7 +33,7 @@ import eu.stratosphere.pact.compiler.DataStatistics; import eu.stratosphere.pact.compiler.GlobalProperties; import eu.stratosphere.pact.compiler.LocalProperties; import eu.stratosphere.pact.compiler.PactCompiler; -import eu.stratosphere.pact.compiler.PartitionProperty; +import eu.stratosphere.pact.compiler.PartitioningProperty; import eu.stratosphere.pact.compiler.costs.CostEstimator; import eu.stratosphere.pact.runtime.shipping.ShipStrategy; import eu.stratosphere.pact.runtime.shipping.ShipStrategy.ForwardSS; @@ -204,7 +204,7 @@ public class CoGroupNode extends TwoInputNode { } // partition and any order - p.getGlobalProperties().setPartitioning(PartitionProperty.ANY, (FieldList)keyFields.clone()); + p.getGlobalProperties().setPartitioning(PartitioningProperty.ANY, (FieldList)keyFields.clone()); Ordering ordering = new Ordering(); for (Integer index : getPactContract().getKeyColumnNumbers(inputNum)) { @@ -221,7 +221,7 @@ public class CoGroupNode extends TwoInputNode { // partition only p = new InterestingProperties(); - p.getGlobalProperties().setPartitioning(PartitionProperty.ANY, (FieldList)keyFields.clone()); + p.getGlobalProperties().setPartitioning(PartitioningProperty.ANY, (FieldList)keyFields.clone()); estimator.getHashPartitioningCost(input, p.getMaximalCosts()); InterestingProperties.mergeUnionOfInterestingProperties(target, p); } @@ -277,14 +277,14 @@ public class CoGroupNode extends TwoInputNode { // 2 alternatives: // 1) re-partition 2 the same way as 1 // 2) re-partition 1 the same way as 2 - if (gp1.getPartitioning() == PartitionProperty.HASH_PARTITIONED - && gp2.getPartitioning() == PartitionProperty.RANGE_PARTITIONED) { + if (gp1.getPartitioning() == PartitioningProperty.HASH_PARTITIONED + && gp2.getPartitioning() == PartitioningProperty.RANGE_PARTITIONED) { createCoGroupAlternative(outputPlans, subPlan1, subPlan2, new ForwardSS(), new PartitionHashSS(this.keySet2), estimator); createCoGroupAlternative(outputPlans, subPlan1, subPlan2, new PartitionRangeSS(this.keySet1), new ForwardSS(), estimator); - } else if (gp1.getPartitioning() == PartitionProperty.RANGE_PARTITIONED - && gp2.getPartitioning() == PartitionProperty.HASH_PARTITIONED) { + } else if (gp1.getPartitioning() == PartitioningProperty.RANGE_PARTITIONED + && gp2.getPartitioning() == PartitioningProperty.HASH_PARTITIONED) { createCoGroupAlternative(outputPlans, subPlan1, subPlan2, new ForwardSS(), new PartitionRangeSS(this.keySet2), estimator); createCoGroupAlternative(outputPlans, subPlan1, subPlan2, new PartitionHashSS(this.keySet1), @@ -307,7 +307,7 @@ public class CoGroupNode extends TwoInputNode { // we create an additional plan with a range partitioning // if this is not already a range partitioning - if (gp1.getPartitioning() != PartitionProperty.RANGE_PARTITIONED) { + if (gp1.getPartitioning() != PartitioningProperty.RANGE_PARTITIONED) { // createCoGroupAlternative(outputPlans, predList1, predList2, ShipStrategy.PARTITION_RANGE, // ShipStrategy.PARTITION_RANGE, estimator); } @@ -317,12 +317,12 @@ public class CoGroupNode extends TwoInputNode { // add two plans: // 1) make input 2 the same partitioning as input 1 // 2) partition both inputs with a different partitioning function (hash <-> range) - if (partitioningIsOnRightFields(gp1, 0) && gp1.getPartitioning() == PartitionProperty.HASH_PARTITIONED) { + if (partitioningIsOnRightFields(gp1, 0) && gp1.getPartitioning() == PartitioningProperty.HASH_PARTITIONED) { createCoGroupAlternative(outputPlans, subPlan1, subPlan2, ss1, new PartitionHashSS(this.keySet2), estimator); // createCoGroupAlternative(outputPlans, predList1, predList2, ShipStrategy.PARTITION_RANGE, // ShipStrategy.PARTITION_RANGE, estimator); - } else if (partitioningIsOnRightFields(gp1, 0) && gp1.getPartitioning() == PartitionProperty.RANGE_PARTITIONED) { + } else if (partitioningIsOnRightFields(gp1, 0) && gp1.getPartitioning() == PartitioningProperty.RANGE_PARTITIONED) { createCoGroupAlternative(outputPlans, subPlan1, subPlan2, ss1, new PartitionRangeSS(this.keySet2), estimator); createCoGroupAlternative(outputPlans, subPlan1, subPlan2, new PartitionHashSS(this.keySet1), @@ -339,12 +339,12 @@ public class CoGroupNode extends TwoInputNode { // add two plans: // 1) make input 1 the same partitioning as input 2 // 2) partition both inputs with a different partitioning function (hash <-> range) - if (partitioningIsOnRightFields(gp2, 1) && gp2.getPartitioning() == PartitionProperty.HASH_PARTITIONED) { + if (partitioningIsOnRightFields(gp2, 1) && gp2.getPartitioning() == PartitioningProperty.HASH_PARTITIONED) { createCoGroupAlternative(outputPlans, subPlan1, subPlan2, new PartitionHashSS(this.keySet1), ss2, estimator); // createCoGroupAlternative(outputPlans, predList1, predList2, ShipStrategy.PARTITION_RANGE, // ShipStrategy.PARTITION_RANGE, estimator); - } else if (partitioningIsOnRightFields(gp2, 1) && gp2.getPartitioning() == PartitionProperty.RANGE_PARTITIONED) { + } else if (partitioningIsOnRightFields(gp2, 1) && gp2.getPartitioning() == PartitioningProperty.RANGE_PARTITIONED) { createCoGroupAlternative(outputPlans, subPlan1, subPlan2, new PartitionRangeSS(this.keySet1), ss2, estimator); createCoGroupAlternative(outputPlans, subPlan1, subPlan2, new PartitionHashSS(this.keySet1), @@ -375,10 +375,10 @@ public class CoGroupNode extends TwoInputNode { case FORWARD: if (partitioningIsOnRightFields(gp2, 1) && gp2.getPartitioning().isPartitioned()) { // adapt to the partitioning - if (gp2.getPartitioning() == PartitionProperty.HASH_PARTITIONED) { + if (gp2.getPartitioning() == PartitioningProperty.HASH_PARTITIONED) { //TODO check other input for partitioining ss1 = new PartitionHashSS(this.keySet1); - } else if (gp2.getPartitioning() == PartitionProperty.RANGE_PARTITIONED) { + } else if (gp2.getPartitioning() == PartitioningProperty.RANGE_PARTITIONED) { ss1 = new PartitionRangeSS(this.keySet1); } else { throw new CompilerException(); @@ -389,11 +389,11 @@ public class CoGroupNode extends TwoInputNode { } break; case PARTITION_HASH: - ss1 = (partitioningIsOnSameSubkey(gp1.getPartitionedFields(), this.keySet2) && gp1.getPartitioning() == PartitionProperty.HASH_PARTITIONED) ? new ForwardSS() + ss1 = (partitioningIsOnSameSubkey(gp1.getPartitionedFields(), this.keySet2) && gp1.getPartitioning() == PartitioningProperty.HASH_PARTITIONED) ? new ForwardSS() : new PartitionHashSS(this.keySet1); break; case PARTITION_RANGE: - ss1 = (partitioningIsOnRightFields(gp1, 0) && gp1.getPartitioning() == PartitionProperty.RANGE_PARTITIONED) ? new ForwardSS() + ss1 = (partitioningIsOnRightFields(gp1, 0) && gp1.getPartitioning() == PartitioningProperty.RANGE_PARTITIONED) ? new ForwardSS() : new PartitionRangeSS(this.keySet1); break; default: @@ -417,9 +417,9 @@ public class CoGroupNode extends TwoInputNode { case FORWARD: if (partitioningIsOnRightFields(gp1, 0) && gp1.getPartitioning().isPartitioned()) { // adapt to the partitioning - if (gp1.getPartitioning() == PartitionProperty.HASH_PARTITIONED) { + if (gp1.getPartitioning() == PartitioningProperty.HASH_PARTITIONED) { ss2 = new PartitionHashSS(this.keySet2); - } else if (gp1.getPartitioning() == PartitionProperty.RANGE_PARTITIONED) { + } else if (gp1.getPartitioning() == PartitioningProperty.RANGE_PARTITIONED) { ss2 = new PartitionRangeSS(this.keySet2); } else { throw new CompilerException(); @@ -430,11 +430,11 @@ public class CoGroupNode extends TwoInputNode { } break; case PARTITION_HASH: - ss2 = (partitioningIsOnSameSubkey(this.keySet1, gp2.getPartitionedFields()) && gp2.getPartitioning() == PartitionProperty.HASH_PARTITIONED) ? new ForwardSS() + ss2 = (partitioningIsOnSameSubkey(this.keySet1, gp2.getPartitionedFields()) && gp2.getPartitioning() == PartitioningProperty.HASH_PARTITIONED) ? new ForwardSS() : new PartitionHashSS(this.keySet2); break; case PARTITION_RANGE: - ss2 = (partitioningIsOnRightFields(gp2, 1) && gp2.getPartitioning() == PartitionProperty.RANGE_PARTITIONED) ? new ForwardSS() + ss2 = (partitioningIsOnRightFields(gp2, 1) && gp2.getPartitioning() == PartitioningProperty.RANGE_PARTITIONED) ? new ForwardSS() : new PartitionRangeSS(this.keySet2); break; default: @@ -740,7 +740,7 @@ public class CoGroupNode extends TwoInputNode { throw new CompilerException("Invalid input number "+inputNum+" for CoGroup."); } - if (gp.getPartitioning() == PartitionProperty.RANGE_PARTITIONED) { + if (gp.getPartitioning() == PartitioningProperty.RANGE_PARTITIONED) { return keyFields.equals(partitionedFields); } diff --git a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/DataSinkNode.java b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/DataSinkNode.java index 6e55a1fe0bbb1fb8bdc09816303c4980c616dc75..a7fc9cf5106d2c4f1d46d77e14da81cbf79b829d 100644 --- a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/DataSinkNode.java +++ b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/DataSinkNode.java @@ -26,7 +26,7 @@ import eu.stratosphere.pact.common.contract.Ordering; import eu.stratosphere.pact.common.plan.Visitor; import eu.stratosphere.pact.compiler.CompilerException; import eu.stratosphere.pact.compiler.DataStatistics; -import eu.stratosphere.pact.compiler.PartitionProperty; +import eu.stratosphere.pact.compiler.PartitioningProperty; import eu.stratosphere.pact.compiler.costs.CostEstimator; import eu.stratosphere.pact.runtime.task.util.TaskConfig.LocalStrategy; @@ -228,7 +228,7 @@ public class DataSinkNode extends OptimizerNode { // in both cases create a range partitioned only IP InterestingProperties partitioningProps = new InterestingProperties(); - partitioningProps.getGlobalProperties().setPartitioning(PartitionProperty.RANGE_PARTITIONED, partitioning); + partitioningProps.getGlobalProperties().setPartitioning(PartitioningProperty.RANGE_PARTITIONED, partitioning); estimator.addRangePartitionCost(this.input, partitioningProps.getMaximalCosts()); this.input.addInterestingProperties(partitioningProps); @@ -241,7 +241,7 @@ public class DataSinkNode extends OptimizerNode { // global sort case: create IP for range partitioned and sorted InterestingProperties globalSortProps = new InterestingProperties(); - globalSortProps.getGlobalProperties().setPartitioning(PartitionProperty.RANGE_PARTITIONED, partitioning); + globalSortProps.getGlobalProperties().setPartitioning(PartitioningProperty.RANGE_PARTITIONED, partitioning); estimator.addRangePartitionCost(this.input, globalSortProps.getMaximalCosts()); globalSortProps.getLocalProperties().setOrdering(partitioning); diff --git a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/DataSourceNode.java b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/DataSourceNode.java index ae40f1b68ad2e37469810945ee1f231a3c1b868a..a7d3db850691b345826d80890b025f0f2ba047f5 100644 --- a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/DataSourceNode.java +++ b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/DataSourceNode.java @@ -15,6 +15,7 @@ package eu.stratosphere.pact.compiler.plan; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; @@ -27,19 +28,24 @@ import eu.stratosphere.pact.common.generic.io.InputFormat; import eu.stratosphere.pact.common.io.statistics.BaseStatistics; import eu.stratosphere.pact.common.plan.Visitor; import eu.stratosphere.pact.common.util.FieldSet; +import eu.stratosphere.pact.compiler.Costs; import eu.stratosphere.pact.compiler.DataStatistics; import eu.stratosphere.pact.compiler.PactCompiler; import eu.stratosphere.pact.compiler.costs.CostEstimator; +import eu.stratosphere.pact.compiler.plan.candidate.PlanNode; +import eu.stratosphere.pact.compiler.plan.candidate.SourcePlanNode; import eu.stratosphere.pact.runtime.task.util.TaskConfig.LocalStrategy; /** - * The Optimizer representation of a data source. + * The optimizer's internal representation of a data source. * * @author Stephan Ewen */ public class DataSourceNode extends OptimizerNode { - private long inputSize; //the size of the input in bytes + private List candidate; // the candidate (there can only be one) for this node + + private long inputSize; //the size of the input in bytes /** * Creates a new DataSourceNode for the given contract. @@ -249,38 +255,23 @@ public class DataSourceNode extends OptimizerNode * @see eu.stratosphere.pact.compiler.plan.OptimizerNode#computeAlternativePlans() */ @Override - public List getAlternativePlans(CostEstimator estimator) { -// if (this.cachedPlans != null) { -// return this.cachedPlans; -// } -// -// GlobalProperties gp = new GlobalProperties(); -// LocalProperties lp = new LocalProperties(); -// -// // first, compute the properties of the output -//// if (getOutputContract() == OutputContract.UniqueKey) { -//// gp.setKeyUnique(true); -//// gp.setPartitioning(PartitionProperty.ANY); -//// -//// lp.setKeyUnique(true); -//// lp.setKeysGrouped(true); -//// } -// -// DataSourceNode candidate = new DataSourceNode(this, gp, lp); -// -// // compute the costs -// candidate.setCosts(new Costs(0, this.inputSize)); -// -// // since there is only a single plan for the data-source, return a list with that element only -// List plans = new ArrayList(1); -// plans.add(candidate); -// -// if (isBranching()) { -// this.cachedPlans = plans; -// } -// -// return plans; - return null; + public List getAlternativePlans(CostEstimator estimator) { + if (this.candidate != null) { + return this.candidate; + } + + SourcePlanNode candidate = new SourcePlanNode(this); + candidate.updatePropertiesWithUniqueSets(getUniqueFields()); + candidate.setCosts(new Costs(0, this.inputSize)); + + // since there is only a single plan for the data-source, return a list with that element only + List plans = new ArrayList(1); + plans.add(candidate); + + if (isBranching()) { + this.candidate = plans; + } + return plans; } diff --git a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/InterestingProperties.java b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/InterestingProperties.java index e5cf6c1ab19b496373024133356805559e145e57..51c6bd248705df8c4e2e877fb4b5197b237760a0 100644 --- a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/InterestingProperties.java +++ b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/InterestingProperties.java @@ -21,7 +21,10 @@ import java.util.List; import eu.stratosphere.pact.compiler.Costs; import eu.stratosphere.pact.compiler.GlobalProperties; import eu.stratosphere.pact.compiler.LocalProperties; +import eu.stratosphere.pact.compiler.plan.candidate.Channel; import eu.stratosphere.pact.compiler.plan.candidate.PlanNode; +import eu.stratosphere.pact.runtime.shipping.ShipStrategyType; +import eu.stratosphere.pact.runtime.task.util.TaskConfig.LocalStrategy; /** * The interesting properties that a node in the optimizer plan hands to its predecessors. It has the @@ -46,7 +49,7 @@ public class InterestingProperties implements Cloneable */ public InterestingProperties() { // instantiate the maximal costs to the possible maximum - this.maximalCosts = new Costs(Long.MAX_VALUE, Long.MAX_VALUE); + this.maximalCosts = new Costs(0, 0); this.globalProps = new GlobalProperties(); this.localProps = new LocalProperties(); @@ -123,12 +126,15 @@ public class InterestingProperties implements Cloneable return globalProps.isMetBy(node.getGlobalProperties()) && localProps.isMetBy(node.getLocalProperties()); } - public InterestingProperties filterByCodeAnnotations(OptimizerNode node, int input) { - GlobalProperties gp = this.globalProps.filterByNodesConstantSet(node, input); - LocalProperties lp = this.localProps.filterByNodesConstantSet(node, input); + + + public InterestingProperties filterByCodeAnnotations(OptimizerNode node, int input) + { + final GlobalProperties gp = this.globalProps.filterByNodesConstantSet(node, input); + final LocalProperties lp = this.localProps.filterByNodesConstantSet(node, input); if (gp != this.globalProps || lp != this.localProps) { - if (gp == null && lp == null) { + if ((gp == null || gp.isTrivial()) && (lp == null || lp.isTrivial())) { return null; } else { return new InterestingProperties(this.maximalCosts, @@ -138,6 +144,24 @@ public class InterestingProperties implements Cloneable return this; } } + + public Channel createChannelRealizingProperties(PlanNode inputNode) { + final Channel c = new Channel(inputNode); + + if (this.globalProps.isMetBy(inputNode.getGlobalProperties())) { + c.setShipStrategy(ShipStrategyType.FORWARD); + } else { + this.globalProps.parameterizeChannel(c); + } + + final LocalProperties lps = c.getLocalPropertiesAfterShippingOnly(); + if (this.localProps.isMetBy(lps)) { + c.setLocalStrategy(LocalStrategy.NONE); + } else { + this.localProps.parameterizeChannel(c); + } + return c; + } // ------------------------------------------------------------------------ @@ -188,7 +212,7 @@ public class InterestingProperties implements Cloneable @Override public InterestingProperties clone() { return new InterestingProperties(this.maximalCosts.clone(), - this.globalProps.clone(), this.localProps.createCopy()); + this.globalProps.clone(), this.localProps.clone()); } // ------------------------------------------------------------------------ @@ -223,7 +247,7 @@ public class InterestingProperties implements Cloneable } } // if it was not subsumed, add it - properties.add(toMerge.clone()); + properties.add(toMerge); } /** @@ -249,6 +273,12 @@ public class InterestingProperties implements Cloneable } + /** + * @param props + * @param node + * @param input + * @return + */ public static final List filterInterestingPropertiesForInput( List props, OptimizerNode node, int input) { @@ -272,6 +302,6 @@ public class InterestingProperties implements Cloneable new InterestingProperties(filteredProps.getMaximalCosts(), topDownAdjustedGP, filteredProps.localProps); mergeUnionOfInterestingProperties(preserved, toAdd); } - return preserved; + return preserved == null ? new ArrayList() : preserved; } } diff --git a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/MapNode.java b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/MapNode.java index fa3cb5ba5a9cf6eb49c4542e1ddf07bdccbc09b4..643c9b90122b42ed7bed43be91bbf314b2ef221e 100644 --- a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/MapNode.java +++ b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/MapNode.java @@ -19,15 +19,18 @@ import java.util.List; import eu.stratosphere.pact.common.contract.MapContract; import eu.stratosphere.pact.compiler.costs.CostEstimator; +import eu.stratosphere.pact.compiler.plan.candidate.Channel; +import eu.stratosphere.pact.compiler.plan.candidate.PlanNode; +import eu.stratosphere.pact.compiler.plan.candidate.SingleInputPlanNode; import eu.stratosphere.pact.runtime.task.util.TaskConfig.LocalStrategy; /** - * The Optimizer representation of a Map contract node. + * The optimizer's internal representation of a Map contract node. * * @author Stephan Ewen */ -public class MapNode extends SingleInputNode { - +public class MapNode extends SingleInputNode +{ /** * Creates a new MapNode for the given contract. * @@ -39,28 +42,6 @@ public class MapNode extends SingleInputNode { setLocalStrategy(LocalStrategy.NONE); } -// /** -// * Copy constructor to create a copy a MapNode with a different predecessor. The predecessor -// * is assumed to be of the same type and merely a copy with different strategies, as they -// * are created in the process of the plan enumeration. -// * -// * @param template -// * The node to create a copy of. -// * @param pred -// * The new predecessor. -// * @param conn -// * The old connection to copy properties from. -// * @param globalProps -// * The global properties of this copy. -// * @param localProps -// * The local properties of this copy. -// */ -// protected MapNode(MapNode template, OptimizerNode pred, PactConnection conn, GlobalProperties globalProps, -// LocalProperties localProps) { -// super(template, pred, conn, globalProps, localProps); -// setLocalStrategy(LocalStrategy.NONE); -// } - /** * Gets the contract object for this map node. * @@ -109,60 +90,26 @@ public class MapNode extends SingleInputNode { } } - /* - * (non-Javadoc) - * @see eu.stratosphere.pact.compiler.plan.SingleInputNode#computeValidPlanAlternatives(java.util.List, eu.stratosphere.pact.compiler.costs.CostEstimator, java.util.List) + /* (non-Javadoc) + * @see eu.stratosphere.pact.compiler.plan.SingleInputNode#createPlanAlternatives(java.util.List, java.util.List) */ @Override - protected void computeValidPlanAlternatives(List altSubPlans, CostEstimator estimator, List outputPlans) { -// -// // we have to check if all input ShipStrategies are the same or at least compatible -// ShipStrategy ss = new NoneSS(); -// -// // check hint shipping strategy -// ShipStrategy hintSS = this.inConn.getShipStrategy(); -// if(hintSS.type() == ShipStrategyType.BROADCAST || hintSS.type() == ShipStrategyType.SFR) -// // invalid strategy: we do not produce an alternative node -// return; -// else -// ss = hintSS; -// -// // if no hint for a strategy was provided, we use the default -// if(ss.type() == ShipStrategyType.NONE) -// ss = new ForwardSS(); -// -// for(OptimizerNode subPlan : altSubPlans) { -// -// GlobalProperties gp = PactConnection.getGlobalPropertiesAfterConnection(subPlan, this, 0, ss); -// LocalProperties lp = PactConnection.getLocalPropertiesAfterConnection(subPlan, this, ss); -// -// MapNode nMap = new MapNode(this, subPlan, this.inConn, gp, lp); -// nMap.inConn.setShipStrategy(ss); -// -// // now, the properties (copied from the inputs) are filtered by the output contracts -// nMap.getGlobalProperties().filterByNodesConstantSet(this, 0); -// nMap.getLocalProperties().filterByNodesConstantSet(this, 0); -// -// // copy the cumulative costs and set the costs of the map itself to zero -// estimator.costOperator(nMap); -// -// outputPlans.add(nMap); -// } + protected void createPlanAlternatives(List inputs, List outputPlans) + { + for (Channel c : inputs) { + outputPlans.add(new SingleInputPlanNode(this, c, this.localStrategy)); + } } - /** * Computes the number of stub calls. * * @return the number of stub calls. */ protected long computeNumberOfStubCalls() { - - if(this.getPredNode() != null) - return this.getPredNode().estimatedNumRecords; + if (getPredecessorNode() != null) + return getPredecessorNode().estimatedNumRecords; else return -1; - } - } diff --git a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/MatchNode.java b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/MatchNode.java index 4777a14a5130dc0229dfe64703a1f66eba9a6e83..e28fe6d0652db7d43365a866bcee8a33ffa46b5b 100644 --- a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/MatchNode.java +++ b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/MatchNode.java @@ -32,7 +32,7 @@ import eu.stratosphere.pact.compiler.Costs; import eu.stratosphere.pact.compiler.GlobalProperties; import eu.stratosphere.pact.compiler.LocalProperties; import eu.stratosphere.pact.compiler.PactCompiler; -import eu.stratosphere.pact.compiler.PartitionProperty; +import eu.stratosphere.pact.compiler.PartitioningProperty; import eu.stratosphere.pact.compiler.costs.CostEstimator; import eu.stratosphere.pact.runtime.shipping.ShipStrategy; import eu.stratosphere.pact.runtime.shipping.ShipStrategy.BroadcastSS; @@ -75,10 +75,6 @@ public class MatchNode extends TwoInputNode { setLocalStrategy(LocalStrategy.HYBRIDHASH_FIRST); } else if (PactCompiler.HINT_LOCAL_STRATEGY_HASH_BUILD_SECOND.equals(localStrategy)) { setLocalStrategy(LocalStrategy.HYBRIDHASH_SECOND); - } else if (PactCompiler.HINT_LOCAL_STRATEGY_INMEM_HASH_BUILD_FIRST.equals(localStrategy)) { - setLocalStrategy(LocalStrategy.MMHASH_FIRST); - } else if (PactCompiler.HINT_LOCAL_STRATEGY_INMEM_HASH_BUILD_SECOND.equals(localStrategy)) { - setLocalStrategy(LocalStrategy.MMHASH_SECOND); } else if (PactCompiler.HINT_LOCAL_STRATEGY_SORT_SELF_NESTEDLOOP.equals(localStrategy)) { setLocalStrategy(LocalStrategy.SORT_SELF_NESTEDLOOP); } else if (PactCompiler.HINT_LOCAL_STRATEGY_SELF_NESTEDLOOP.equals(localStrategy)) { @@ -91,30 +87,30 @@ public class MatchNode extends TwoInputNode { } } - /** - * Copy constructor to create a copy of a node with different predecessors. The predecessors - * is assumed to be of the same type as in the template node and merely copies with different - * strategies, as they are created in the process of the plan enumeration. - * - * @param template - * The node to create a copy of. - * @param pred1 - * The new predecessor for the first input. - * @param pred2 - * The new predecessor for the second input. - * @param conn1 - * The old connection of the first input to copy properties from. - * @param conn2 - * The old connection of the second input to copy properties from. - * @param globalProps - * The global properties of this copy. - * @param localProps - * The local properties of this copy. - */ - protected MatchNode(MatchNode template, OptimizerNode pred1, OptimizerNode pred2, PactConnection conn1, - PactConnection conn2, GlobalProperties globalProps, LocalProperties localProps) { - super(template, pred1, pred2, conn1, conn2, globalProps, localProps); - } +// /** +// * Copy constructor to create a copy of a node with different predecessors. The predecessors +// * is assumed to be of the same type as in the template node and merely copies with different +// * strategies, as they are created in the process of the plan enumeration. +// * +// * @param template +// * The node to create a copy of. +// * @param pred1 +// * The new predecessor for the first input. +// * @param pred2 +// * The new predecessor for the second input. +// * @param conn1 +// * The old connection of the first input to copy properties from. +// * @param conn2 +// * The old connection of the second input to copy properties from. +// * @param globalProps +// * The global properties of this copy. +// * @param localProps +// * The local properties of this copy. +// */ +// protected MatchNode(MatchNode template, OptimizerNode pred1, OptimizerNode pred2, PactConnection conn1, +// PactConnection conn2, GlobalProperties globalProps, LocalProperties localProps) { +// super(template, pred1, pred2, conn1, conn2, globalProps, localProps); +// } // ------------------------------------------------------------------------ @@ -150,8 +146,6 @@ public class MatchNode extends TwoInputNode { case MERGE: return 1; case HYBRIDHASH_FIRST: return 1; case HYBRIDHASH_SECOND: return 1; - case MMHASH_FIRST: return 1; - case MMHASH_SECOND: return 1; case SORT_SELF_NESTEDLOOP: return 2; case SELF_NESTEDLOOP: return 1; default: return 0; @@ -237,7 +231,7 @@ public class MatchNode extends TwoInputNode { } // partition and any order - p.getGlobalProperties().setPartitioning(PartitionProperty.ANY, (FieldList)keys.clone()); + p.getGlobalProperties().setPartitioning(PartitioningProperty.ANY, (FieldList)keys.clone()); Ordering ordering = new Ordering(); for (Integer index : getPactContract().getKeyColumnNumbers(inputNum)) { @@ -254,7 +248,7 @@ public class MatchNode extends TwoInputNode { // partition only p = new InterestingProperties(); - p.getGlobalProperties().setPartitioning(PartitionProperty.ANY, (FieldList)keys.clone()); + p.getGlobalProperties().setPartitioning(PartitioningProperty.ANY, (FieldList)keys.clone()); estimator.getHashPartitioningCost(input, p.getMaximalCosts()); InterestingProperties.mergeUnionOfInterestingProperties(target, p); } @@ -353,17 +347,17 @@ public class MatchNode extends TwoInputNode { // 1) re-partition 2 the same way as 1 // 2) re-partition 1 the same way as 2 - if (gp1.getPartitioning() == PartitionProperty.HASH_PARTITIONED) { + if (gp1.getPartitioning() == PartitioningProperty.HASH_PARTITIONED) { createLocalAlternatives(outputPlans, subPlan1, subPlan2, new ForwardSS(), new PartitionHashSS(this.keySet2), estimator); - } else if (gp1.getPartitioning() == PartitionProperty.RANGE_PARTITIONED) { + } else if (gp1.getPartitioning() == PartitioningProperty.RANGE_PARTITIONED) { createLocalAlternatives(outputPlans, subPlan1, subPlan2, new ForwardSS(), new PartitionRangeSS(this.keySet2), estimator); } - if (gp2.getPartitioning() == PartitionProperty.HASH_PARTITIONED) { + if (gp2.getPartitioning() == PartitioningProperty.HASH_PARTITIONED) { createLocalAlternatives(outputPlans, subPlan1, subPlan2, new PartitionHashSS(this.keySet1), new ForwardSS(), estimator); - } else if (gp2.getPartitioning() == PartitionProperty.RANGE_PARTITIONED) { + } else if (gp2.getPartitioning() == PartitioningProperty.RANGE_PARTITIONED) { createLocalAlternatives(outputPlans, subPlan1, subPlan2, new PartitionRangeSS(this.keySet1), new ForwardSS(), estimator); } @@ -384,7 +378,7 @@ public class MatchNode extends TwoInputNode { // we create an additional plan with a range partitioning // if this is not already a range partitioning - if (gp1.getPartitioning() != PartitionProperty.RANGE_PARTITIONED) { + if (gp1.getPartitioning() != PartitioningProperty.RANGE_PARTITIONED) { createLocalAlternatives(outputPlans, subPlan1, subPlan2, new PartitionRangeSS(this.keySet1), new PartitionRangeSS(this.keySet2), estimator); } @@ -394,12 +388,12 @@ public class MatchNode extends TwoInputNode { // add two plans: // 1) make input 2 the same partitioning as input 1 // 2) partition both inputs with a different partitioning function (hash <-> range) - if (partitioningIsOnRightFields(gp1, 0) && gp1.getPartitioning() == PartitionProperty.HASH_PARTITIONED) { + if (partitioningIsOnRightFields(gp1, 0) && gp1.getPartitioning() == PartitioningProperty.HASH_PARTITIONED) { createLocalAlternatives(outputPlans, subPlan1, subPlan2, ss1, new PartitionHashSS(this.keySet2), estimator); // createLocalAlternatives(outputPlans, predList1, predList2, ShipStrategy.PARTITION_RANGE, // ShipStrategy.PARTITION_RANGE, estimator); - } else if (partitioningIsOnRightFields(gp1, 0) && gp1.getPartitioning() == PartitionProperty.RANGE_PARTITIONED) { + } else if (partitioningIsOnRightFields(gp1, 0) && gp1.getPartitioning() == PartitioningProperty.RANGE_PARTITIONED) { createLocalAlternatives(outputPlans, subPlan1, subPlan2, ss1, new PartitionRangeSS(this.keySet2), estimator); createLocalAlternatives(outputPlans, subPlan1, subPlan2, new PartitionHashSS(this.keySet1), @@ -415,12 +409,12 @@ public class MatchNode extends TwoInputNode { // add two plans: // 1) make input 1 the same partitioning as input 2 // 2) partition both inputs with a different partitioning function (hash <-> range) - if (partitioningIsOnRightFields(gp2, 1) && gp2.getPartitioning() == PartitionProperty.HASH_PARTITIONED) { + if (partitioningIsOnRightFields(gp2, 1) && gp2.getPartitioning() == PartitioningProperty.HASH_PARTITIONED) { createLocalAlternatives(outputPlans, subPlan1, subPlan2, new PartitionHashSS(this.keySet1), ss2, estimator); // createLocalAlternatives(outputPlans, predList1, predList2, ShipStrategy.PARTITION_RANGE, // ShipStrategy.PARTITION_RANGE, estimator); - } else if (partitioningIsOnRightFields(gp2, 1) && gp2.getPartitioning() == PartitionProperty.RANGE_PARTITIONED) { + } else if (partitioningIsOnRightFields(gp2, 1) && gp2.getPartitioning() == PartitioningProperty.RANGE_PARTITIONED) { createLocalAlternatives(outputPlans, subPlan1, subPlan2, new PartitionRangeSS(this.keySet1), ss2, estimator); createLocalAlternatives(outputPlans, subPlan1, subPlan2, new PartitionHashSS(this.keySet1), @@ -465,9 +459,9 @@ public class MatchNode extends TwoInputNode { case FORWARD: if (partitioningIsOnRightFields(gp2, 1) && gp2.getPartitioning().isPartitioned()) { // adapt to the partitioning - if (gp2.getPartitioning() == PartitionProperty.HASH_PARTITIONED) { + if (gp2.getPartitioning() == PartitioningProperty.HASH_PARTITIONED) { ss1 = new PartitionHashSS(this.keySet1); - } else if (gp2.getPartitioning() == PartitionProperty.RANGE_PARTITIONED) { + } else if (gp2.getPartitioning() == PartitioningProperty.RANGE_PARTITIONED) { ss1 = new PartitionRangeSS(this.keySet1); } else { throw new CompilerException(); @@ -478,11 +472,11 @@ public class MatchNode extends TwoInputNode { } break; case PARTITION_HASH: - ss1 = (partitioningIsOnSameSubkey(gp1.getPartitionedFields(), this.keySet2) && gp1.getPartitioning() == PartitionProperty.HASH_PARTITIONED) ? new ForwardSS() + ss1 = (partitioningIsOnSameSubkey(gp1.getPartitionedFields(), this.keySet2) && gp1.getPartitioning() == PartitioningProperty.HASH_PARTITIONED) ? new ForwardSS() : new PartitionHashSS(this.keySet1); break; case PARTITION_RANGE: - ss1 = (partitioningIsOnRightFields(gp1, 0) && gp1.getPartitioning() == PartitionProperty.RANGE_PARTITIONED) ? new ForwardSS() + ss1 = (partitioningIsOnRightFields(gp1, 0) && gp1.getPartitioning() == PartitioningProperty.RANGE_PARTITIONED) ? new ForwardSS() : new PartitionRangeSS(this.keySet1); break; default: @@ -510,9 +504,9 @@ public class MatchNode extends TwoInputNode { case FORWARD: if (partitioningIsOnRightFields(gp1, 0) && gp1.getPartitioning().isPartitioned()) { // adapt to the partitioning - if (gp1.getPartitioning() == PartitionProperty.HASH_PARTITIONED) { + if (gp1.getPartitioning() == PartitioningProperty.HASH_PARTITIONED) { ss2 = new PartitionHashSS(this.keySet2); - } else if (gp1.getPartitioning() == PartitionProperty.RANGE_PARTITIONED) { + } else if (gp1.getPartitioning() == PartitioningProperty.RANGE_PARTITIONED) { ss2 = new PartitionRangeSS(this.keySet2); } else { throw new CompilerException(); @@ -523,11 +517,11 @@ public class MatchNode extends TwoInputNode { } break; case PARTITION_HASH: - ss2 = (partitioningIsOnSameSubkey(this.keySet1, gp2.getPartitionedFields()) && partitioningIsOnRightFields(gp2, 1) && gp2.getPartitioning() == PartitionProperty.HASH_PARTITIONED) ? new ForwardSS() + ss2 = (partitioningIsOnSameSubkey(this.keySet1, gp2.getPartitionedFields()) && partitioningIsOnRightFields(gp2, 1) && gp2.getPartitioning() == PartitioningProperty.HASH_PARTITIONED) ? new ForwardSS() : new PartitionHashSS(this.keySet2); break; case PARTITION_RANGE: - ss2 = (partitioningIsOnRightFields(gp2, 1) && gp2.getPartitioning() == PartitionProperty.RANGE_PARTITIONED) ? new ForwardSS() + ss2 = (partitioningIsOnRightFields(gp2, 1) && gp2.getPartitioning() == PartitioningProperty.RANGE_PARTITIONED) ? new ForwardSS() : new PartitionRangeSS(this.keySet2); break; default: @@ -1037,7 +1031,7 @@ public class MatchNode extends TwoInputNode { default: throw new CompilerException("Invalid input number "+inputNum+" for Match."); } - if (gp.getPartitioning() == PartitionProperty.RANGE_PARTITIONED) { + if (gp.getPartitioning() == PartitioningProperty.RANGE_PARTITIONED) { return keyFields.equals(partitionedFields); } diff --git a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/OptimizerNode.java b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/OptimizerNode.java index 26a7e00385558a461d262eeb81971586417581d9..1fb5f9d4179981f694d131134ab82875d4f517e2 100644 --- a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/OptimizerNode.java +++ b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/OptimizerNode.java @@ -35,6 +35,7 @@ import eu.stratosphere.pact.common.util.FieldSet; import eu.stratosphere.pact.compiler.CompilerException; import eu.stratosphere.pact.compiler.DataStatistics; import eu.stratosphere.pact.compiler.costs.CostEstimator; +import eu.stratosphere.pact.compiler.plan.candidate.PlanNode; import eu.stratosphere.pact.compiler.util.PactType; import eu.stratosphere.pact.runtime.task.util.TaskConfig.LocalStrategy; @@ -211,7 +212,7 @@ public abstract class OptimizerNode implements Visitable, Estimat * The cost estimator used to estimate the costs of each plan alternative. * @return A list containing all plan alternatives. */ - public abstract List getAlternativePlans(CostEstimator estimator); + public abstract List getAlternativePlans(CostEstimator estimator); /** * This method implements the visit of a depth-first graph traversing visitor. Implementors must first @@ -498,21 +499,32 @@ public abstract class OptimizerNode implements Visitable, Estimat * This method returns copies of the original interesting properties objects and * leaves the original objects, contained by the connections, unchanged. */ - public void computeInterestingProperties() + public void computeUnionOfInterestingPropertiesFromSuccessors() { List conns = getOutgoingConnections(); - - List props = null; - for (PactConnection conn : conns) { - List ips = conn.getInterestingProperties(); - if (ips.size() > 0) { - if (props == null) { - props = new ArrayList(); + if (conns.size() == 0) { + // no incoming, we have none ourselves + this.intProps = Collections.emptyList(); + } else if (conns.size() == 1) { + // one incoming, no need to make a union, just take them + List ips = conns.get(0).getInterestingProperties(); + this.intProps = ips.isEmpty() ? + Collections.emptyList() : + new ArrayList(ips); + } else { + // union them + List props = null; + for (PactConnection conn : conns) { + List ips = conn.getInterestingProperties(); + if (ips.size() > 0) { + if (props == null) { + props = new ArrayList(); + } + InterestingProperties.mergeUnionOfInterestingProperties(props, ips); } - InterestingProperties.mergeUnionOfInterestingProperties(props, ips); } + this.intProps = (props == null || props.isEmpty()) ? Collections.emptyList() : props; } - this.intProps = (props == null || props.isEmpty()) ? Collections.emptyList() : props; } /** @@ -714,8 +726,10 @@ public abstract class OptimizerNode implements Visitable, Estimat { if (this.outgoingConnections.size() == 1) { // return our own stack of open branches, because nothing is added - if (this.openBranches == null) return null; - return new ArrayList(this.openBranches); + if (this.openBranches == null) + return null; + else + return new ArrayList(this.openBranches); } else if (this.outgoingConnections.size() > 1) { // we branch add a branch info to the stack @@ -749,11 +763,12 @@ public abstract class OptimizerNode implements Visitable, Estimat protected void removeClosedBranches(List openList) { - if (openList == null || openList.isEmpty() || closedBranchingNodes == null || closedBranchingNodes.isEmpty()) return; + if (openList == null || openList.isEmpty() || this.closedBranchingNodes == null || this.closedBranchingNodes.isEmpty()) + return; Iterator it = openList.iterator(); while (it.hasNext()) { - if (closedBranchingNodes.contains(it.next().getBranchingNode())) { + if (this.closedBranchingNodes.contains(it.next().getBranchingNode())) { //this branch was already closed --> remove it from the list it.remove(); } @@ -761,7 +776,8 @@ public abstract class OptimizerNode implements Visitable, Estimat } protected void addClosedBranches(Set alreadyClosed) { - if (alreadyClosed == null || alreadyClosed.isEmpty()) return; + if (alreadyClosed == null || alreadyClosed.isEmpty()) + return; if (this.closedBranchingNodes == null) this.closedBranchingNodes = new HashSet(alreadyClosed); else diff --git a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/PactConnection.java b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/PactConnection.java index 4affa9aedc752617a8880e32f484671d65d75886..7df6c92e169e1a75824b10f17d9500f20f3d34ef 100644 --- a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/PactConnection.java +++ b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/PactConnection.java @@ -25,7 +25,7 @@ import eu.stratosphere.pact.common.util.FieldSet; import eu.stratosphere.pact.compiler.CompilerException; import eu.stratosphere.pact.compiler.GlobalProperties; import eu.stratosphere.pact.compiler.LocalProperties; -import eu.stratosphere.pact.runtime.shipping.ShipStrategy.ShipStrategyType; +import eu.stratosphere.pact.runtime.shipping.ShipStrategyType; /** * A connection between to PACTs. Represents a channel together with a data shipping @@ -78,29 +78,6 @@ public class PactConnection implements EstimateProvider this.shipStrategy = shipStrategy; } -// /** -// * Creates a copy of the given connection, but sets as source and target the given nodes. -// * This constructor is intended as a partial-copy constructor for the enumeration of -// * plans in the optimization phase. -// * -// * @param template -// * The connection to copy the properties from. -// * @param source -// * The reference to the source node. -// * @param target -// * The reference to the target node. -// */ -// public PactConnection(PactConnection template, OptimizerNode source, OptimizerNode target) { -// this.sourcePact = source; -// this.targetPact = target; -// -// this.shipStrategy = template.shipStrategy; -// this.replicationFactor = template.replicationFactor; -// this.tempMode = template.tempMode; -// -// this.interestingProps = template.interestingProps; -// } - /** * Gets the source of the connection. * @@ -294,62 +271,6 @@ public class PactConnection implements EstimateProvider return buf.toString(); } - /** - * Gets the global properties of the source's output after it crossed a pact connection with - * the given shipping strategy. - * Global properties are maintained on FORWARD connections. - * If a partitioning happens, then a partitioning property exists afterwards. - * A BROADCAST connection destroys the key uniqueness. - *

- * If the shipping strategy has not yet been determined, the properties of the connections source are returned. - * - * @return The properties of the data after this channel. - */ - public static GlobalProperties getGlobalPropertiesAfterConnection(OptimizerNode source, OptimizerNode target, int targetInputNum, ShipStrategyType shipMode) { -// GlobalProperties gp = source.getGlobalPropertiesForParent(target); -// -// switch (shipMode.type()) { -// case BROADCAST: -// gp.reset(); -// break; -// case PARTITION_RANGE: -// gp.setPartitioning(PartitionProperty.RANGE_PARTITIONED, ((PartitionShipStrategy)shipMode).getPartitionFields()); -// gp.setOrdering(null); -// break; -// case PARTITION_HASH: -// gp.setPartitioning(PartitionProperty.HASH_PARTITIONED, ((PartitionShipStrategy)shipMode).getPartitionFields()); -// gp.setOrdering(null); -// break; -// case FORWARD: -// if (source.getDegreeOfParallelism() > target.getDegreeOfParallelism()) { -// gp.setOrdering(null); -// } -// -// if (gp.getPartitioning() == PartitionProperty.NONE) { -// if (source.getUniqueFields().size() > 0) { -// FieldList partitionedFields = new FieldList(); -// //TODO maintain a list of partitioned fields in global properties -// //Up to now: only add first unique fieldset -// for (Integer field : source.getUniqueFields().iterator().next()) { -// partitionedFields.add(field); -// } -// gp.setPartitioning(PartitionProperty.ANY, partitionedFields); -// } -// } -// -// // nothing else changes -// break; -// case NONE: -// throw new CompilerException( -// "Cannot determine properties after connection, id shipping strategy is not set."); -// case SFR: -// default: - throw new CompilerException("Unsupported shipping strategy: " + shipMode.name()); -// } -// -// return gp; - } - /** * Gets the local properties of the sources output after it crossed a pact connection with the given * strategy. Local properties are only maintained on FORWARD connections. diff --git a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/ReduceNode.java b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/ReduceNode.java index 0102ad0db8ca09989a65e305788838e3195e098b..62102fcb7ce50d57367df7214072474c6d07a245 100644 --- a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/ReduceNode.java +++ b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/ReduceNode.java @@ -21,15 +21,14 @@ import java.util.List; import eu.stratosphere.nephele.configuration.Configuration; import eu.stratosphere.pact.common.contract.CompilerHints; import eu.stratosphere.pact.common.contract.ReduceContract; -import eu.stratosphere.pact.common.util.FieldList; import eu.stratosphere.pact.common.util.FieldSet; import eu.stratosphere.pact.compiler.CompilerException; -import eu.stratosphere.pact.compiler.Costs; import eu.stratosphere.pact.compiler.DataStatistics; import eu.stratosphere.pact.compiler.GlobalProperties; import eu.stratosphere.pact.compiler.PactCompiler; -import eu.stratosphere.pact.compiler.PartitionProperty; +import eu.stratosphere.pact.compiler.PartitioningProperty; import eu.stratosphere.pact.compiler.costs.CostEstimator; +import eu.stratosphere.pact.compiler.plan.candidate.PlanNode; import eu.stratosphere.pact.runtime.task.util.TaskConfig.LocalStrategy; /** @@ -172,14 +171,14 @@ public class ReduceNode extends SingleInputNode { // add the first interesting properties: partitioned and grouped InterestingProperties ip1 = new InterestingProperties(); - ip1.getGlobalProperties().setPartitioning(PartitionProperty.ANY, this.keys); + ip1.getGlobalProperties().setPartitioning(PartitioningProperty.ANY, this.keys); ip1.getLocalProperties().setGroupedFields(this.keys); estimator.addHashPartitioningCost(this.inConn, ip1.getMaximalCosts()); estimator.addLocalSortCost(this.inConn, -1, ip1.getMaximalCosts()); // add the second interesting properties: partitioned only InterestingProperties ip2 = new InterestingProperties(); - ip2.getGlobalProperties().setPartitioning(PartitionProperty.ANY, this.keys); + ip2.getGlobalProperties().setPartitioning(PartitioningProperty.ANY, this.keys); estimator.addHashPartitioningCost(this.inConn, ip2.getMaximalCosts()); InterestingProperties.mergeUnionOfInterestingProperties(props, ip1); @@ -188,8 +187,8 @@ public class ReduceNode extends SingleInputNode { } @Override - protected void computeValidPlanAlternatives(List altSubPlans, - CostEstimator estimator, List outputPlans) + protected void computeValidPlanAlternatives(List altSubPlans, + CostEstimator estimator, List outputPlans) { // // FieldSet keySet = new FieldSet(getPactContract().getKeyColumnNumbers(0)); @@ -319,7 +318,7 @@ public class ReduceNode extends SingleInputNode { if(this.getPredNode() != null) { // return key count of predecessor - return this.getPredNode().getEstimatedCardinality(new FieldSet(this.keyList)); + return this.getPredNode().getEstimatedCardinality(this.keys); } else return -1; } @@ -420,15 +419,14 @@ public class ReduceNode extends SingleInputNode { @Override public List

createUniqueFieldsForNode() { - if (this.keyList != null) { - for (int keyField : this.keyList) { + if (this.keys != null) { + for (int keyField : this.keys) { if (!isFieldConstant(0, keyField)) { return null; } } - return Collections.singletonList(new FieldSet(keyList)); + return Collections.singletonList(this.keys); } return null; } - } diff --git a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/SingleInputNode.java b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/SingleInputNode.java index 8dc3cd62aec53d938acb08be8a998dd9a79613d6..156ad347375b3667907cc7c786ea86a89e0e3d7e 100644 --- a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/SingleInputNode.java +++ b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/SingleInputNode.java @@ -27,96 +27,49 @@ import eu.stratosphere.pact.common.contract.SingleInputContract; import eu.stratosphere.pact.common.plan.Visitor; import eu.stratosphere.pact.common.stubs.StubAnnotation.ConstantFields; import eu.stratosphere.pact.common.stubs.StubAnnotation.ConstantFieldsExcept; -import eu.stratosphere.pact.common.type.Key; import eu.stratosphere.pact.common.util.FieldSet; -import eu.stratosphere.pact.compiler.ColumnWithType; import eu.stratosphere.pact.compiler.CompilerException; -import eu.stratosphere.pact.compiler.OptimizerFieldSet; import eu.stratosphere.pact.compiler.PactCompiler; import eu.stratosphere.pact.compiler.costs.CostEstimator; -import eu.stratosphere.pact.runtime.shipping.ShipStrategy.ShipStrategyType; +import eu.stratosphere.pact.compiler.plan.candidate.Channel; +import eu.stratosphere.pact.compiler.plan.candidate.PlanNode; +import eu.stratosphere.pact.runtime.shipping.ShipStrategyType; +import eu.stratosphere.pact.runtime.task.util.TaskConfig.LocalStrategy; /** - * A node in the optimizer plan that represents a PACT with a single input. + * A node in the optimizer's program representation for a PACT with a single input. * * @author Stephan Ewen */ -public abstract class SingleInputNode extends OptimizerNode { - - // ------------- Node Connection +public abstract class SingleInputNode extends OptimizerNode +{ + protected PactConnection inConn; // the input of the node - protected PactConnection inConn; // the input of the node - - // ------------- Stub Annotations + protected FieldSet constantSet; // set of fields that are left unchanged by the stub + protected FieldSet notConstantSet; // set of fields that are changed by the stub - protected FieldSet constantSet; // set of fields that are left unchanged by the stub - protected FieldSet notConstantSet; // set of fields that are changed by the stub + protected final FieldSet keys; // The set of key fields - protected final OptimizerFieldSet keys; // The set of key fields + private List cachedPlans; - // ------------------------------ + // -------------------------------------------------------------------------------------------- /** * Creates a new node with a single input for the optimizer plan. * - * @param pactContract - * The PACT that the node represents. + * @param pactContract The PACT that the node represents. */ public SingleInputNode(SingleInputContract pactContract) { super(pactContract); - - this.keys = new OptimizerFieldSet(); - int[] keyPos = pactContract.getKeyColumnNumbers(0); - Class[] keyTypes = pactContract.getKeyClasses(); - if (keyPos != null) { - for (int i = 0; i < keyPos.length; i++) { - this.keys.add(new ColumnWithType(keyPos[i], keyTypes[i])); - } - } + this.keys = new FieldSet(pactContract.getKeyColumnNumbers(0)); } -// /** -// * Copy constructor to create a copy of a node with a different predecessor. The predecessor -// * is assumed to be of the same type and merely a copy with different strategies, as they -// * are created in the process of the plan enumeration. -// * -// * @param template -// * The node to create a copy of. -// * @param predNode -// * The new predecessor. -// * @param inConn -// * The old connection to copy properties from. -// * @param globalProps -// * The global properties of this copy. -// * @param localProps -// * The local properties of this copy. -// */ -// protected SingleInputNode(SingleInputNode template, OptimizerNode predNode, PactConnection inConn, -// GlobalProperties globalProps, LocalProperties localProps) { -// super(template, globalProps, localProps); -// -// // copy annotations -// this.constantSet = template.constantSet; -// -// // copy key set -// this.keyList = template.keyList; -// -// // copy input connection -// this.inConn = new PactConnection(inConn, predNode, this); -// -// if (this.branchPlan == null) { -// this.branchPlan = predNode.branchPlan; -// } else if (predNode.branchPlan != null) { -// this.branchPlan.putAll(predNode.branchPlan); -// } -// } - /** * Gets the PactConnection through which this node receives its input. * * @return The input connection. */ - public PactConnection getInConn() { + public PactConnection getIncomingConnection() { return this.inConn; } @@ -126,7 +79,7 @@ public abstract class SingleInputNode extends OptimizerNode { * @param conn * The input connection to set. */ - public void setInConn(PactConnection inConn) { + public void setIncomingConnection(PactConnection inConn) { this.inConn = inConn; } @@ -135,8 +88,8 @@ public abstract class SingleInputNode extends OptimizerNode { * * @return The predecessor of this node. */ - public OptimizerNode getPredNode() { - if(this.inConn != null) { + public OptimizerNode getPredecessorNode() { + if (this.inConn != null) { return this.inConn.getSourcePact(); } else { return null; @@ -174,7 +127,7 @@ public abstract class SingleInputNode extends OptimizerNode { } // create the connection and add it PactConnection conn = new PactConnection(pred, this); - this.setInConn(conn); + setIncomingConnection(conn); pred.addOutgoingConnection(conn); // see if an internal hint dictates the strategy to use @@ -209,7 +162,7 @@ public abstract class SingleInputNode extends OptimizerNode { * * @return The key fields of this optimizer node. */ - public OptimizerFieldSet getKeySet() { + public FieldSet getKeySet() { return this.keys; } @@ -222,39 +175,53 @@ public abstract class SingleInputNode extends OptimizerNode { * @see eu.stratosphere.pact.compiler.plan.OptimizerNode#getAlternativePlans() */ @Override - final public List getAlternativePlans(CostEstimator estimator) { -// // check if we have a cached version -// if (this.cachedPlans != null) { -// return this.cachedPlans; -// } -// -// // calculate alternative subplans for predecessor -// List subPlans = this.getPredNode().getAlternativePlans(estimator); + final public List getAlternativePlans(CostEstimator estimator) { + // check if we have a cached version + if (this.cachedPlans != null) { + return this.cachedPlans; + } - List outputPlans = new ArrayList(); -// -// computeValidPlanAlternatives(subPlans, estimator, outputPlans); -// -// // prune the plans -// prunePlanAlternatives(outputPlans); -// -// // cache the result only if we have multiple outputs --> this function gets invoked multiple times -// if (this.getOutConns() != null && this.getOutConns().size() > 1) { -// this.cachedPlans = outputPlans; -// } + // calculate alternative subplans for predecessor + List subPlans = getPredecessorNode().getAlternativePlans(estimator); + List candidates = new ArrayList(subPlans.size()); + + List ips = this.inConn.getInterestingProperties(); + for (PlanNode p : subPlans) { + if (ips.isEmpty()) { + // create a simple forwarding channel + Channel c = new Channel(p); + c.setShipStrategy(ShipStrategyType.FORWARD); + c.setLocalStrategy(LocalStrategy.NONE); + candidates.add(c); + } else { + for (InterestingProperties ip : ips) { + // create a channel that realizes the properties + candidates.add(ip.createChannelRealizingProperties(p)); + } + } + } + + List outputPlans = new ArrayList(); + createPlanAlternatives(candidates, outputPlans); + + // prune the plans + prunePlanAlternatives(outputPlans); + + // cache the result only if we have multiple outputs --> this function gets invoked multiple times + if (isBranching()) { + this.cachedPlans = outputPlans; + } return outputPlans; } /** - * Takes a list with all subplans and produces alternative plans for the current node + * Takes a list with all sub-plans and produces alternative plans for the current node. * - * @param altSubPlans Alternative subplans - * @param estimator Cost estimator to be used - * @param outputPlans The generated output plans + * @param inputs The different input alternatives for the current node. + * @param outputPlans The generated output plan candidates. */ - protected abstract void computeValidPlanAlternatives(List altSubPlans, - CostEstimator estimator, List outputPlans); + protected abstract void createPlanAlternatives(List inputs, List outputPlans); // -------------------------------------------------------------------------------------------- // Branch Handling @@ -270,17 +237,17 @@ public abstract class SingleInputNode extends OptimizerNode { return; } - addClosedBranches(this.getPredNode().closedBranchingNodes); + addClosedBranches(getPredecessorNode().closedBranchingNodes); List result = new ArrayList(); // TODO: check if merge of lists is really necessary - result = mergeLists(result, this.getPredNode().getBranchesForParent(this)); + result = mergeLists(result, getPredecessorNode().getBranchesForParent(this)); this.openBranches = result; } // -------------------------------------------------------------------------------------------- - // Stub Annotation Handling + // Estimates Computation // -------------------------------------------------------------------------------------------- /** @@ -301,8 +268,8 @@ public abstract class SingleInputNode extends OptimizerNode { final long numRecords = computeNumberOfStubCalls(); long outputSize = 0; - if (this.getPredNode() != null) { - outputSize = this.getPredNode().estimatedOutputSize; + if (getPredecessorNode() != null) { + outputSize = getPredecessorNode().estimatedOutputSize; } // compute width only if we have information @@ -368,8 +335,8 @@ public abstract class SingleInputNode extends OptimizerNode { @Override public void accept(Visitor visitor) { if (visitor.preVisit(this)) { - if (this.getPredNode() != null) { - this.getPredNode().accept(visitor); + if (getPredecessorNode() != null) { + getPredecessorNode().accept(visitor); } else { throw new CompilerException(); } diff --git a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/UnionNode.java b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/UnionNode.java index 13d555bd0496f2e2c3a5c889fc1897bf441bb400..d04a47e96f99cdecb8764a755ca4dce22c89dcfe 100644 --- a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/UnionNode.java +++ b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/UnionNode.java @@ -32,7 +32,7 @@ import eu.stratosphere.pact.common.util.FieldSet; import eu.stratosphere.pact.compiler.DataStatistics; import eu.stratosphere.pact.compiler.GlobalProperties; import eu.stratosphere.pact.compiler.LocalProperties; -import eu.stratosphere.pact.compiler.PartitionProperty; +import eu.stratosphere.pact.compiler.PartitioningProperty; import eu.stratosphere.pact.compiler.costs.CostEstimator; import eu.stratosphere.pact.runtime.shipping.ShipStrategy.ForwardSS; import eu.stratosphere.pact.runtime.task.util.TaskConfig.LocalStrategy; @@ -232,10 +232,10 @@ public class UnionNode extends OptimizerNode { // only property which would survive is a hash partitioning on every input GlobalProperties gpForInput = alternative.getGlobalPropertiesForParent(this); - if (index == 0 && gpForInput.getPartitioning() == PartitionProperty.HASH_PARTITIONED) { + if (index == 0 && gpForInput.getPartitioning() == PartitioningProperty.HASH_PARTITIONED) { newPartitionedFieldsInCommon = gpForInput.getPartitionedFields(); } - else if (gpForInput.getPartitioning() != PartitionProperty.HASH_PARTITIONED + else if (gpForInput.getPartitioning() != PartitioningProperty.HASH_PARTITIONED || gpForInput.getPartitionedFields().equals(partitionedFieldsInCommon) == false) { newPartitionedFieldsInCommon = null; } @@ -250,7 +250,7 @@ public class UnionNode extends OptimizerNode { GlobalProperties gp = new GlobalProperties(); if (newPartitionedFieldsInCommon != null) { - gp.setPartitioning(PartitionProperty.HASH_PARTITIONED, newPartitionedFieldsInCommon); + gp.setPartitioning(PartitioningProperty.HASH_PARTITIONED, newPartitionedFieldsInCommon); } UnionNode unionNode = new UnionNode(this, subplanStack, gp, new LocalProperties()); unionNode.branchPlan = newBranchPlan; diff --git a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/candidate/Channel.java b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/candidate/Channel.java index 326a842f011c21e6bf5dc9237d7c44cf38083934..120846b37ee909211281c28de3a7e52652ee0a8d 100644 --- a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/candidate/Channel.java +++ b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/candidate/Channel.java @@ -15,12 +15,21 @@ package eu.stratosphere.pact.compiler.plan.candidate; -import eu.stratosphere.pact.compiler.plan.EstimateProvider; -import eu.stratosphere.pact.runtime.shipping.ShipStrategy.ShipStrategyType; +import java.util.HashMap; +import java.util.Map; +import eu.stratosphere.pact.common.contract.Order; +import eu.stratosphere.pact.common.contract.Ordering; +import eu.stratosphere.pact.common.util.FieldList; +import eu.stratosphere.pact.common.util.FieldSet; +import eu.stratosphere.pact.compiler.CompilerException; +import eu.stratosphere.pact.compiler.GlobalProperties; +import eu.stratosphere.pact.compiler.LocalProperties; +import eu.stratosphere.pact.compiler.plan.EstimateProvider; +import eu.stratosphere.pact.runtime.shipping.ShipStrategyType; +import eu.stratosphere.pact.runtime.task.util.TaskConfig.LocalStrategy; /** - * * * @author Stephan Ewen */ @@ -37,11 +46,25 @@ public class Channel implements EstimateProvider // -------------------------------------------------------------------------------------------- - private final PlanNode source = null; + private PlanNode source; + + private PlanNode target; + + private ShipStrategyType shipStrategy = ShipStrategyType.NONE; + + private LocalStrategy localStrategy = LocalStrategy.NONE; + + private FieldList shipKeys; - private final PlanNode target = null; + private FieldList localKeys; - private ShipStrategyType shipStrategy; + private boolean[] shipSortOrder; + + private boolean[] localSortOrder; + + private GlobalProperties globalProps; + + private LocalProperties localProps; private TempMode tempMode; @@ -49,6 +72,12 @@ public class Channel implements EstimateProvider // -------------------------------------------------------------------------------------------- + public Channel(PlanNode sourceNode) { + this.source = sourceNode; + } + + // -------------------------------------------------------------------------------------------- + /** * Gets the source of this Channel. * @@ -67,10 +96,43 @@ public class Channel implements EstimateProvider return this.target; } + public void setShipStrategy(ShipStrategyType strategy) { + setShipStrategy(strategy, null, null); + } + + public void setShipStrategy(ShipStrategyType strategy, FieldList keys) { + setShipStrategy(strategy, keys, null); + } + + public void setShipStrategy(ShipStrategyType strategy, FieldList keys, boolean[] sortDirection) { + this.shipStrategy = strategy; + this.shipKeys = keys; + this.shipSortOrder = sortDirection; + } + + public void setLocalStrategy(LocalStrategy strategy) { + setLocalStrategy(strategy, null, null); + } + + public void setLocalStrategy(LocalStrategy strategy, FieldList keys) { + setLocalStrategy(strategy, keys, null); + } + + public void setLocalStrategy(LocalStrategy strategy, FieldList keys, boolean[] sortDirection) { + this.localStrategy = strategy; + this.localKeys = keys; + this.localSortOrder = sortDirection; + } + public ShipStrategyType getShipStrategy() { return this.shipStrategy; } + public FieldList getShipStrategyKeys() { + return this.shipKeys; + } + + /** * Returns the TempMode of the Connection. NONE if the connection is not temped, * TEMP_SENDER_SIDE if the connection is temped on the sender node, and @@ -100,4 +162,119 @@ public class Channel implements EstimateProvider public int getReplicationFactor() { return this.replicationFactor; } + + // -------------------------------------------------------------------------------------------- + + /* (non-Javadoc) + * @see eu.stratosphere.pact.compiler.plan.EstimateProvider#getEstimatedOutputSize() + */ + @Override + public long getEstimatedOutputSize() { + return this.source.template.getEstimatedOutputSize() * this.replicationFactor; + } + + /* (non-Javadoc) + * @see eu.stratosphere.pact.compiler.plan.EstimateProvider#getEstimatedNumRecords() + */ + @Override + public long getEstimatedNumRecords() { + return this.source.template.getEstimatedNumRecords() * this.replicationFactor; + } + + /* (non-Javadoc) + * @see eu.stratosphere.pact.compiler.plan.EstimateProvider#getEstimatedCardinalities() + */ + @Override + public Map getEstimatedCardinalities() { + final Map m = this.source.template.getEstimatedCardinalities(); + final Map res = new HashMap(); + for (Map.Entry entry : m.entrySet()) { + res.put(entry.getKey(), entry.getValue() * this.replicationFactor); + } + return res; + } + + /* (non-Javadoc) + * @see eu.stratosphere.pact.compiler.plan.EstimateProvider#getEstimatedCardinality(eu.stratosphere.pact.common.util.FieldSet) + */ + @Override + public long getEstimatedCardinality(FieldSet cP) { + return this.source.template.getEstimatedCardinality(cP) * this.replicationFactor; + } + + // -------------------------------------------------------------------------------------------- + + + public GlobalProperties getGlobalProperties() { + if (this.globalProps == null) { + this.globalProps = this.source.getGlobalProperties().clone(); + switch (this.shipStrategy) { + case BROADCAST: + this.globalProps.setFullyReplicated(); + break; + case PARTITION_HASH: + case PARTITION_LOCAL_HASH: + this.globalProps.setHashPartitioned(this.shipKeys); + break; + case PARTITION_RANGE: + case PARTITION_LOCAL_RANGE: + Ordering o = new Ordering(); + for (int i = 0; i < this.shipKeys.size(); i++) { + o.appendOrdering(this.shipKeys.get(i), null, this.shipSortOrder == null || this.shipSortOrder[i] ? Order.ASCENDING : Order.DESCENDING); + } + this.globalProps.setRangePartitioned(o); + break; + case FORWARD: + break; + case NONE: + throw new CompilerException("Cannot produce GlobalProperties before ship strategy is set."); + } + } + + return this.globalProps; + } + + public LocalProperties getLocalProperties() { + if (this.localProps == null) { + this.localProps = getLocalPropertiesAfterShippingOnly().clone(); + switch (this.localStrategy) { + case NONE: + break; + case SORT: + Ordering o = new Ordering(); + for (int i = 0; i < this.localKeys.size(); i++) { + o.appendOrdering(this.localKeys.get(i), null, this.localSortOrder == null || this.localSortOrder[i] ? Order.ASCENDING : Order.DESCENDING); + } + this.localProps.setOrdering(o); + this.localProps.setGroupedFields(o.getInvolvedIndexes()); + break; + default: + throw new CompilerException("Unsupported local strategy for channel."); + } + } + + return this.localProps; + } + + public LocalProperties getLocalPropertiesAfterShippingOnly() { + if (this.shipStrategy == ShipStrategyType.FORWARD) { + return this.source.getLocalProperties(); + } else { + final LocalProperties props = this.source.getLocalProperties().clone(); + switch (this.shipStrategy) { + case BROADCAST: + case PARTITION_HASH: + case PARTITION_RANGE: + case PARTITION_LOCAL_HASH: + case PARTITION_LOCAL_RANGE: + props.reset(); + break; + case NONE: + throw new CompilerException("ShipStrategy has not yet been set."); + default: + throw new CompilerException(); + } + return props; + } + } } diff --git a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/candidate/PlanNode.java b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/candidate/PlanNode.java index ad52645f9e78783878d2012927be162580deb6e4..e42cddd20f80f3cbd4f200c792bcb82352edefac 100644 --- a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/candidate/PlanNode.java +++ b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/candidate/PlanNode.java @@ -15,55 +15,69 @@ package eu.stratosphere.pact.compiler.plan.candidate; -import java.util.ArrayList; import java.util.Iterator; -import java.util.List; import java.util.Map; +import java.util.Set; import eu.stratosphere.pact.common.plan.Visitable; -import eu.stratosphere.pact.compiler.CompilerException; +import eu.stratosphere.pact.common.util.FieldSet; import eu.stratosphere.pact.compiler.Costs; import eu.stratosphere.pact.compiler.GlobalProperties; import eu.stratosphere.pact.compiler.LocalProperties; +import eu.stratosphere.pact.compiler.PactCompiler; import eu.stratosphere.pact.compiler.plan.OptimizerNode; import eu.stratosphere.pact.runtime.task.util.TaskConfig.LocalStrategy; - /** * * @author Stephan Ewen */ public abstract class PlanNode implements Visitable { - protected final OptimizerNode template = null; - + protected final OptimizerNode template; - protected final LocalProperties localProps; // local properties of the data produced by this node - - protected final GlobalProperties globalProps; // global properties of the data produced by this node + private final LocalStrategy localStrategy; // The local strategy (sorting / hashing, ...) - private final LocalStrategy localStrategy; // The local strategy (sorting / hashing, ...) + protected LocalProperties localProps; // local properties of the data produced by this node + + protected GlobalProperties globalProps; // global properties of the data produced by this node protected Map branchPlan; // the actual plan alternative chosen at a branch point - protected PlanNode lastJoinedBranchNode; // the node with latest branch (node with multiple outputs) - // that both children share and that is at least partially joined + protected PlanNode lastJoinedBranchNode; // the node with latest branch (node with multiple outputs) + // that both children share and that is at least partially joined - private Costs nodeCosts; // the costs incurred by this node + private Costs nodeCosts; // the costs incurred by this node - private Costs cumulativeCosts; // the cumulative costs of all operators in the sub-tree of this node + private Costs cumulativeCosts; // the cumulative costs of all operators in the sub-tree - private int memoryPerTask; // the amount of memory dedicated to each task, in MiBytes + private int memoryPerTask; // the amount of memory dedicated to each task, in MiBytes - protected boolean pFlag; // flag for the internal pruning algorithm + protected boolean pFlag; // flag for the internal pruning algorithm // -------------------------------------------------------------------------------------------- + public PlanNode(OptimizerNode template) { + this(template, template.getLocalStrategy()); + } + + public PlanNode(OptimizerNode template, LocalStrategy strategy) { + this(template, strategy, new LocalProperties(), new GlobalProperties()); + } + + public PlanNode(OptimizerNode template, LocalStrategy strategy, LocalProperties localProps, GlobalProperties globalProps) + { + this.template = template; + this.localStrategy = strategy == null ? LocalStrategy.NONE : strategy; + this.localProps = localProps; + this.globalProps = globalProps; + } + // -------------------------------------------------------------------------------------------- // Accessors @@ -155,7 +169,7 @@ public abstract class PlanNode implements Visitable } public int getDegreeOfParallelism() { - return 1; + return this.template.getDegreeOfParallelism(); } public long getTotalAvailableMemory() { @@ -163,7 +177,7 @@ public abstract class PlanNode implements Visitable } // -------------------------------------------------------------------------------------------- - // + // Input and Predecessors // -------------------------------------------------------------------------------------------- public abstract Iterator getInputs(); @@ -174,188 +188,205 @@ public abstract class PlanNode implements Visitable // Branching and Pruning // -------------------------------------------------------------------------------------------- - /** - * Checks whether to candidate plans for the sub-plan of this node are comparable. The two - * alternative plans are comparable, if - * a) There is no branch in the sub-plan of this node - * b) Both candidates have the same candidate as the child at the last open branch. - * - * @param subPlan1 - * @param subPlan2 - * @return - */ - protected boolean areBranchCompatible(PlanNode subPlan1, PlanNode subPlan2) - { - if (subPlan1 == null || subPlan2 == null) - throw new CompilerException("SubPlans may not be null."); - - // if there is no open branch, the children are always compatible. - // in most plans, that will be the dominant case - if (this.lastJoinedBranchNode == null) { - return true; - } - - final PlanNode nodeToCompare = subPlan1.branchPlan.get(this.lastJoinedBranchNode); - return nodeToCompare == subPlan2.branchPlan.get(this.lastJoinedBranchNode); - } +// /** +// * Checks whether to candidate plans for the sub-plan of this node are comparable. The two +// * alternative plans are comparable, if +// * a) There is no branch in the sub-plan of this node +// * b) Both candidates have the same candidate as the child at the last open branch. +// * +// * @param subPlan1 +// * @param subPlan2 +// * @return +// */ +// protected boolean areBranchCompatible(PlanNode subPlan1, PlanNode subPlan2) +// { +// if (subPlan1 == null || subPlan2 == null) +// throw new CompilerException("SubPlans may not be null."); +// +// // if there is no open branch, the children are always compatible. +// // in most plans, that will be the dominant case +// if (this.lastJoinedBranchNode == null) { +// return true; +// } +// +// final PlanNode nodeToCompare = subPlan1.branchPlan.get(this.lastJoinedBranchNode); +// return nodeToCompare == subPlan2.branchPlan.get(this.lastJoinedBranchNode); +// } +// +// /** +// * Takes the given list of plans that are candidates for this node in the final plan and retains for each distinct +// * set of interesting properties only the cheapest plan. +// * +// * @param plans +// * The plans to prune. +// */ +// public void prunePlanAlternatives(List plans) { +// // shortcut for the case that there is only one plan +// if (plans.size() == 1) { +// return; +// } +// +// // if we have unjoined branches, split the list of plans such that only those +// // with the same candidates at the branch points are compared +// // otherwise, we may end up with the case that no compatible plans are found at +// // nodes that join +// if (this.openBranches == null) { +// prunePlansWithCommonBranchAlternatives(plans); +// } else { +// // TODO brute force still +// List result = new ArrayList(); +// List turn = new ArrayList(); +// +// while (!plans.isEmpty()) { +// turn.clear(); +// T determiner = plans.remove(plans.size() - 1); +// turn.add(determiner); +// +// for (int k = plans.size() - 1; k >= 0; k--) { +// boolean equal = true; +// T toCheck = plans.get(k); +// +// for (int b = 0; b < this.openBranches.size(); b++) { +// OptimizerNode brancher = this.openBranches.get(b).branchingNode; +// OptimizerNode cand1 = determiner.branchPlan.get(brancher); +// OptimizerNode cand2 = toCheck.branchPlan.get(brancher); +// if (cand1 != cand2) { +// equal = false; +// break; +// } +// } +// +// if (equal) { +// turn.add(plans.remove(k)); +// } +// } +// +// // now that we have only plans with the same branch alternatives, prune! +// if (turn.size() > 1) { +// prunePlansWithCommonBranchAlternatives(turn); +// } +// result.addAll(turn); +// } +// +// // after all turns are complete +// plans.clear(); +// plans.addAll(result); +// } +// } +// +// private final void prunePlansWithCommonBranchAlternatives(List plans) { +// List> toKeep = new ArrayList>(this.intProps.size()); // for each interesting property, which plans +// // are cheapest +// for (int i = 0; i < this.intProps.size(); i++) { +// toKeep.add(null); +// } +// +// T cheapest = null; // the overall cheapest plan +// +// // go over all plans from the list +// for (T candidate : plans) { +// // check if that plan is the overall cheapest +// if (cheapest == null || (cheapest.getCumulativeCosts().compareTo(candidate.getCumulativeCosts()) > 0)) { +// cheapest = candidate; +// } +// +// // find the interesting properties that this plan matches +// for (int i = 0; i < this.intProps.size(); i++) { +// if (this.intProps.get(i).isMetBy(candidate)) { +// // the candidate meets them +// if (toKeep.get(i) == null) { +// // first one to meet the interesting properties, so store it +// List l = new ArrayList(2); +// l.add(candidate); +// toKeep.set(i, l); +// } else { +// // others met that one before +// // see if that one is more expensive and not more general than +// // one of the others. If so, drop it. +// List l = toKeep.get(i); +// boolean met = false; +// boolean replaced = false; +// +// for (int k = 0; k < l.size(); k++) { +// T other = l.get(k); +// +// // check if the candidate is both cheaper and at least as general +// if (other.getGlobalProperties().isMetBy(candidate.getGlobalProperties()) +// && other.getLocalProperties().isMetBy(candidate.getLocalProperties()) +// && other.getCumulativeCosts().compareTo(candidate.getCumulativeCosts()) > 0) { +// // replace that one with the candidate +// l.set(k, replaced ? null : candidate); +// replaced = true; +// met = true; +// } else { +// // check if the previous plan is more general and not more expensive than the candidate +// met |= (candidate.getGlobalProperties().isMetBy(other.getGlobalProperties()) +// && candidate.getLocalProperties().isMetBy(other.getLocalProperties()) && candidate +// .getCumulativeCosts().compareTo(other.getCumulativeCosts()) >= 0); +// } +// } +// +// if (!met) { +// l.add(candidate); +// } +// } +// } +// } +// } +// +// // all plans are set now +// plans.clear(); +// +// // add the cheapest plan +// if (cheapest != null) { +// plans.add(cheapest); +// cheapest.pFlag = true; // remember that that plan is in the set +// } +// +// Costs cheapestCosts = cheapest.cumulativeCosts; +// +// // add all others, which are optimal for some interesting properties +// for (int i = 0; i < toKeep.size(); i++) { +// List l = toKeep.get(i); +// +// if (l != null) { +// Costs maxDelta = this.intProps.get(i).getMaximalCosts(); +// +// for (T plan : l) { +// if (plan != null && !plan.pFlag) { +// plan.pFlag = true; +// +// // check, if that plan is not more than the delta above the costs of the +// if (!cheapestCosts.isOtherMoreThanDeltaAbove(plan.getCumulativeCosts(), maxDelta)) { +// plans.add(plan); +// } +// } +// } +// } +// } +// +// // reset the flags +// for (T p : plans) { +// p.pFlag = false; +// } +// } - /** - * Takes the given list of plans that are candidates for this node in the final plan and retains for each distinct - * set of interesting properties only the cheapest plan. - * - * @param plans - * The plans to prune. - */ - public void prunePlanAlternatives(List plans) { - // shortcut for the case that there is only one plan - if (plans.size() == 1) { - return; - } - - // if we have unjoined branches, split the list of plans such that only those - // with the same candidates at the branch points are compared - // otherwise, we may end up with the case that no compatible plans are found at - // nodes that join - if (this.openBranches == null) { - prunePlansWithCommonBranchAlternatives(plans); - } else { - // TODO brute force still - List result = new ArrayList(); - List turn = new ArrayList(); - - while (!plans.isEmpty()) { - turn.clear(); - T determiner = plans.remove(plans.size() - 1); - turn.add(determiner); - - for (int k = plans.size() - 1; k >= 0; k--) { - boolean equal = true; - T toCheck = plans.get(k); - - for (int b = 0; b < this.openBranches.size(); b++) { - OptimizerNode brancher = this.openBranches.get(b).branchingNode; - OptimizerNode cand1 = determiner.branchPlan.get(brancher); - OptimizerNode cand2 = toCheck.branchPlan.get(brancher); - if (cand1 != cand2) { - equal = false; - break; - } - } - - if (equal) { - turn.add(plans.remove(k)); - } - } - - // now that we have only plans with the same branch alternatives, prune! - if (turn.size() > 1) { - prunePlansWithCommonBranchAlternatives(turn); - } - result.addAll(turn); - } - - // after all turns are complete - plans.clear(); - plans.addAll(result); - } - } + // -------------------------------------------------------------------------------------------- + // Miscellaneous + // -------------------------------------------------------------------------------------------- - private final void prunePlansWithCommonBranchAlternatives(List plans) { - List> toKeep = new ArrayList>(this.intProps.size()); // for each interesting property, which plans - // are cheapest - for (int i = 0; i < this.intProps.size(); i++) { - toKeep.add(null); - } - - T cheapest = null; // the overall cheapest plan - - // go over all plans from the list - for (T candidate : plans) { - // check if that plan is the overall cheapest - if (cheapest == null || (cheapest.getCumulativeCosts().compareTo(candidate.getCumulativeCosts()) > 0)) { - cheapest = candidate; - } - - // find the interesting properties that this plan matches - for (int i = 0; i < this.intProps.size(); i++) { - if (this.intProps.get(i).isMetBy(candidate)) { - // the candidate meets them - if (toKeep.get(i) == null) { - // first one to meet the interesting properties, so store it - List l = new ArrayList(2); - l.add(candidate); - toKeep.set(i, l); - } else { - // others met that one before - // see if that one is more expensive and not more general than - // one of the others. If so, drop it. - List l = toKeep.get(i); - boolean met = false; - boolean replaced = false; - - for (int k = 0; k < l.size(); k++) { - T other = l.get(k); - - // check if the candidate is both cheaper and at least as general - if (other.getGlobalProperties().isMetBy(candidate.getGlobalProperties()) - && other.getLocalProperties().isMetBy(candidate.getLocalProperties()) - && other.getCumulativeCosts().compareTo(candidate.getCumulativeCosts()) > 0) { - // replace that one with the candidate - l.set(k, replaced ? null : candidate); - replaced = true; - met = true; - } else { - // check if the previous plan is more general and not more expensive than the candidate - met |= (candidate.getGlobalProperties().isMetBy(other.getGlobalProperties()) - && candidate.getLocalProperties().isMetBy(other.getLocalProperties()) && candidate - .getCumulativeCosts().compareTo(other.getCumulativeCosts()) >= 0); - } - } - - if (!met) { - l.add(candidate); - } - } - } - } - } - - // all plans are set now - plans.clear(); - - // add the cheapest plan - if (cheapest != null) { - plans.add(cheapest); - cheapest.pFlag = true; // remember that that plan is in the set - } - - Costs cheapestCosts = cheapest.cumulativeCosts; - - // add all others, which are optimal for some interesting properties - for (int i = 0; i < toKeep.size(); i++) { - List l = toKeep.get(i); - - if (l != null) { - Costs maxDelta = this.intProps.get(i).getMaximalCosts(); - - for (T plan : l) { - if (plan != null && !plan.pFlag) { - plan.pFlag = true; - - // check, if that plan is not more than the delta above the costs of the - if (!cheapestCosts.isOtherMoreThanDeltaAbove(plan.getCumulativeCosts(), maxDelta)) { - plans.add(plan); - } - } - } - } + public void updatePropertiesWithUniqueSets(Set
uniqueFieldCombinations) + { + if (uniqueFieldCombinations.isEmpty()) { + return; + } else if (uniqueFieldCombinations.size() > 1) { + PactCompiler.LOG.warn("Node has multiple unique field combinations. " + + "The compiler can currently exploit only the first one as a hint."); } - - // reset the flags - for (T p : plans) { - p.pFlag = false; + + final FieldSet unique = uniqueFieldCombinations.iterator().next(); + if (this.localProps.getUniqueFields() == null) { + this.localProps.setUniqueFields(unique); } } - - // -------------------------------------------------------------------------------------------- } diff --git a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/candidate/SingleInputPlanNode.java b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/candidate/SingleInputPlanNode.java index 4d12e9f36619c5282347923ddf7cc6f69cb0cea3..08818d9ac04fd23a14f2d13db8fe03fff5833725 100644 --- a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/candidate/SingleInputPlanNode.java +++ b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/candidate/SingleInputPlanNode.java @@ -21,6 +21,8 @@ import java.util.NoSuchElementException; import eu.stratosphere.pact.common.plan.Visitor; import eu.stratosphere.pact.compiler.CompilerException; import eu.stratosphere.pact.compiler.Costs; +import eu.stratosphere.pact.compiler.plan.OptimizerNode; +import eu.stratosphere.pact.runtime.task.util.TaskConfig.LocalStrategy; /** @@ -28,13 +30,16 @@ import eu.stratosphere.pact.compiler.Costs; * * @author Stephan Ewen */ -public abstract class SingleInputPlanNode extends PlanNode +public class SingleInputPlanNode extends PlanNode { protected final Channel input; // -------------------------------------------------------------------------------------------- - protected SingleInputPlanNode(Channel input) { + + public SingleInputPlanNode(OptimizerNode template, Channel input, LocalStrategy localStrategy) + { + super(template, localStrategy); this.input = input; } @@ -93,4 +98,30 @@ public abstract class SingleInputPlanNode extends PlanNode } }; } + + /* (non-Javadoc) + * @see eu.stratosphere.pact.compiler.plan.candidate.PlanNode#getInputs() + */ + @Override + public Iterator getInputs() { + return new Iterator() { + private boolean hasLeft = true; + @Override + public boolean hasNext() { + return this.hasLeft; + } + @Override + public Channel next() { + if (this.hasLeft) { + this.hasLeft = false; + return SingleInputPlanNode.this.input; + } else + throw new NoSuchElementException(); + } + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } } diff --git a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/candidate/SourcePlanNode.java b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/candidate/SourcePlanNode.java new file mode 100644 index 0000000000000000000000000000000000000000..6b596738e830ad2e13c062ee995b67f641741ca0 --- /dev/null +++ b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/candidate/SourcePlanNode.java @@ -0,0 +1,68 @@ +/*********************************************************************************************************************** + * + * Copyright (C) 2012 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + **********************************************************************************************************************/ + +package eu.stratosphere.pact.compiler.plan.candidate; + +import java.util.Collections; +import java.util.Iterator; + +import eu.stratosphere.pact.common.plan.Visitor; +import eu.stratosphere.pact.compiler.plan.OptimizerNode; +import eu.stratosphere.pact.runtime.task.util.TaskConfig.LocalStrategy; + +/** + * Plan candidate node for data flow sources that have no input and no special strategies. + * + * @author Stephan Ewen + */ +public class SourcePlanNode extends PlanNode +{ + /** + * Constructs a new source candidate node that uses NONE as its local strategy. + * + * @param template The template optimizer node that this candidate is created for. + */ + public SourcePlanNode(OptimizerNode template) { + super(template, LocalStrategy.NONE); + } + + // -------------------------------------------------------------------------------------------- + + /* (non-Javadoc) + * @see eu.stratosphere.pact.common.plan.Visitable#accept(eu.stratosphere.pact.common.plan.Visitor) + */ + @Override + public void accept(Visitor visitor) { + if (visitor.preVisit(this)) { + visitor.postVisit(this); + } + } + + /* (non-Javadoc) + * @see eu.stratosphere.pact.compiler.plan.candidate.PlanNode#getPredecessors() + */ + @Override + public Iterator getPredecessors() { + return Collections.emptyList().iterator(); + } + + /* (non-Javadoc) + * @see eu.stratosphere.pact.compiler.plan.candidate.PlanNode#getInputs() + */ + @Override + public Iterator getInputs() { + return Collections.emptyList().iterator(); + } +} diff --git a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/OutputEmitter.java b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/OutputEmitter.java index e560cfb879aeaabdeef36777ca636cdd009a5329..1cc88d869effdde05eefaa139fdf5b6113dd3f65 100644 --- a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/OutputEmitter.java +++ b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/OutputEmitter.java @@ -18,7 +18,7 @@ package eu.stratosphere.pact.runtime.shipping; import eu.stratosphere.nephele.io.ChannelSelector; import eu.stratosphere.pact.common.generic.types.TypeComparator; import eu.stratosphere.pact.runtime.plugable.SerializationDelegate; -import eu.stratosphere.pact.runtime.shipping.ShipStrategy.ShipStrategyType; +import eu.stratosphere.pact.runtime.shipping.ShipStrategyType; /** * @author Erik Nijkamp diff --git a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/PactRecordOutputEmitter.java b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/PactRecordOutputEmitter.java index 932768eebe91bfcc63b1ad86a493023c70e98126..b0dc8ded15b279212e89316f49767b39aa0ac08b 100644 --- a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/PactRecordOutputEmitter.java +++ b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/PactRecordOutputEmitter.java @@ -20,7 +20,7 @@ import eu.stratosphere.pact.common.contract.DataDistribution; import eu.stratosphere.pact.common.type.Key; import eu.stratosphere.pact.common.type.PactRecord; import eu.stratosphere.pact.runtime.plugable.PactRecordComparator; -import eu.stratosphere.pact.runtime.shipping.ShipStrategy.ShipStrategyType; +import eu.stratosphere.pact.runtime.shipping.ShipStrategyType; /** * @author Erik Nijkamp diff --git a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/ShipStrategy.java b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/ShipStrategy.java deleted file mode 100644 index da80f6faebdb0359233f4998f64bbe8d7b9944d8..0000000000000000000000000000000000000000 --- a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/ShipStrategy.java +++ /dev/null @@ -1,120 +0,0 @@ -/*********************************************************************************************************************** - * - * Copyright (C) 2010 by the Stratosphere project (http://stratosphere.eu) - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - * - **********************************************************************************************************************/ - -package eu.stratosphere.pact.runtime.shipping; - -import eu.stratosphere.pact.common.util.FieldList; - -/** - * Enumeration defining the different shipping types of the output, such as local forward, re-partitioning by hash, - * or re-partitioning by range. - */ -public abstract class ShipStrategy { - - public enum ShipStrategyType { - FORWARD, - PARTITION_HASH, - PARTITION_LOCAL_HASH, - PARTITION_RANGE, - PARTITION_LOCAL_RANGE, - BROADCAST, - SFR, - NONE - } - - private ShipStrategyType type; - - private ShipStrategy(ShipStrategyType type) { - this.type = type; - } - - public ShipStrategyType type() { - return this.type; - } - - public String name() { - return this.type.name(); - } - - // ------------------ SHIP STRATEGIES --------------- - - public static class ForwardSS extends ShipStrategy { - - public ForwardSS() { super(ShipStrategyType.FORWARD); } - } - - public static class BroadcastSS extends ShipStrategy { - - public BroadcastSS() { super(ShipStrategyType.BROADCAST); } - } - - public static class SFRSS extends ShipStrategy { - - public SFRSS() { super(ShipStrategyType.SFR); } - } - - public static class NoneSS extends ShipStrategy { - - public NoneSS() { super(ShipStrategyType.NONE); } - } - - public static abstract class PartitionShipStrategy extends ShipStrategy { - - private FieldList partitionFields; - - private PartitionShipStrategy(ShipStrategyType type, FieldList partitionFields) { - super(type); - this.partitionFields = partitionFields; - } - - public FieldList getPartitionFields() { - return this.partitionFields; - } - - } - - public static class PartitionHashSS extends PartitionShipStrategy { - - public PartitionHashSS(FieldList partitionFields) { - super(ShipStrategyType.PARTITION_HASH, partitionFields); - } - - } - - public static class PartitionLocalHashSS extends PartitionShipStrategy { - - public PartitionLocalHashSS(FieldList partitionFields) { - super(ShipStrategyType.PARTITION_LOCAL_HASH, partitionFields); - } - - } - - public static class PartitionRangeSS extends PartitionShipStrategy { - - public PartitionRangeSS(FieldList partitionFields) { - super(ShipStrategyType.PARTITION_RANGE, partitionFields); - } - - } - - public static class PartitionLocalRangeSS extends PartitionShipStrategy { - - public PartitionLocalRangeSS(FieldList partitionFields) { - super(ShipStrategyType.PARTITION_LOCAL_RANGE, partitionFields); - } - - } - -} \ No newline at end of file diff --git a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/ShipStrategyType.java b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/ShipStrategyType.java new file mode 100644 index 0000000000000000000000000000000000000000..35ad5088bafed24ef9582e8a0e6967945aaeaa82 --- /dev/null +++ b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/ShipStrategyType.java @@ -0,0 +1,33 @@ +/*********************************************************************************************************************** + * + * Copyright (C) 2010 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + **********************************************************************************************************************/ + +package eu.stratosphere.pact.runtime.shipping; + +/** + * Enumeration defining the different shipping types of the output, such as local forward, re-partitioning by hash, + * or re-partitioning by range. + * + * @author Stephan Ewen + */ +public enum ShipStrategyType +{ + NONE, + FORWARD, + PARTITION_HASH, + PARTITION_LOCAL_HASH, + PARTITION_RANGE, + PARTITION_LOCAL_RANGE, + BROADCAST, +} \ No newline at end of file diff --git a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/task/RegularPactTask.java b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/task/RegularPactTask.java index f16ebbc52cea3d365b250b3a5277ddd32c365497..8b62e0f721dd7a36cc0bbeb581cba36625e3ecf7 100644 --- a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/task/RegularPactTask.java +++ b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/task/RegularPactTask.java @@ -58,7 +58,7 @@ import eu.stratosphere.pact.runtime.shipping.OutputCollector; import eu.stratosphere.pact.runtime.shipping.OutputEmitter; import eu.stratosphere.pact.runtime.shipping.PactRecordOutputCollector; import eu.stratosphere.pact.runtime.shipping.PactRecordOutputEmitter; -import eu.stratosphere.pact.runtime.shipping.ShipStrategy.ShipStrategyType; +import eu.stratosphere.pact.runtime.shipping.ShipStrategyType; import eu.stratosphere.pact.runtime.task.chaining.ChainedDriver; import eu.stratosphere.pact.runtime.task.chaining.ExceptionInChainedStubException; import eu.stratosphere.pact.runtime.task.util.NepheleReaderIterator; diff --git a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/task/util/TaskConfig.java b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/task/util/TaskConfig.java index 6938de05346c528e06f1472e9e13be1e69b97395..e2d1556073a400ddd6199d3194d7b74d98e18c99 100644 --- a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/task/util/TaskConfig.java +++ b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/task/util/TaskConfig.java @@ -32,7 +32,7 @@ import eu.stratosphere.pact.common.generic.types.TypePairComparatorFactory; import eu.stratosphere.pact.common.generic.types.TypeSerializerFactory; import eu.stratosphere.pact.common.stubs.Stub; import eu.stratosphere.pact.common.util.InstantiationUtil; -import eu.stratosphere.pact.runtime.shipping.ShipStrategy.ShipStrategyType; +import eu.stratosphere.pact.runtime.shipping.ShipStrategyType; import eu.stratosphere.pact.runtime.task.PactDriver; import eu.stratosphere.pact.runtime.task.chaining.ChainedDriver; diff --git a/pact/pact-runtime/src/test/java/eu/stratosphere/pact/runtime/task/util/OutputEmitterTest.java b/pact/pact-runtime/src/test/java/eu/stratosphere/pact/runtime/task/util/OutputEmitterTest.java index dabea0ab0938f861a7079f9b7dc87db6333aaf1a..3070762c341cb551ba7df6e43595c7678afc85df 100644 --- a/pact/pact-runtime/src/test/java/eu/stratosphere/pact/runtime/task/util/OutputEmitterTest.java +++ b/pact/pact-runtime/src/test/java/eu/stratosphere/pact/runtime/task/util/OutputEmitterTest.java @@ -40,7 +40,7 @@ import eu.stratosphere.pact.common.type.base.PactInteger; import eu.stratosphere.pact.common.type.base.PactString; import eu.stratosphere.pact.runtime.plugable.PactRecordComparator; import eu.stratosphere.pact.runtime.shipping.PactRecordOutputEmitter; -import eu.stratosphere.pact.runtime.shipping.ShipStrategy.ShipStrategyType; +import eu.stratosphere.pact.runtime.shipping.ShipStrategyType; public class OutputEmitterTest extends TestCase { diff --git a/pact/pact-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/TaskTestBase.java b/pact/pact-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/TaskTestBase.java index 07536ff361283989703feb8a626e7ff1600676c6..5913e62d6882f8d4c62e611bbc472a705f1db35d 100644 --- a/pact/pact-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/TaskTestBase.java +++ b/pact/pact-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/TaskTestBase.java @@ -32,7 +32,7 @@ import eu.stratosphere.pact.common.io.FileOutputFormat; import eu.stratosphere.pact.common.stubs.Stub; import eu.stratosphere.pact.common.type.PactRecord; import eu.stratosphere.pact.common.util.MutableObjectIterator; -import eu.stratosphere.pact.runtime.shipping.ShipStrategy.ShipStrategyType; +import eu.stratosphere.pact.runtime.shipping.ShipStrategyType; import eu.stratosphere.pact.runtime.task.DataSinkTask; import eu.stratosphere.pact.runtime.task.DataSourceTask; import eu.stratosphere.pact.runtime.task.PactDriver;