提交 47c14935 编写于 作者: S sewen

Optimizer Refactoring continued.

WARNING: Broken
上级 2414e7f5
......@@ -19,7 +19,6 @@ import java.util.ArrayList;
import eu.stratosphere.pact.common.type.Key;
import eu.stratosphere.pact.common.util.FieldList;
import eu.stratosphere.pact.common.util.FieldSet;
/**
*
......@@ -164,10 +163,6 @@ public class Ordering
return true;
}
public boolean groupsFieldSet(FieldSet fieldSet) {
return fieldSet.isValidSubset(this.indexes);
}
/**
* Creates a new ordering the represents an ordering on a prefix of the fields. If the
* exclusive index up to which to create the ordering is <code>0</code>, then there is
......
......@@ -97,6 +97,15 @@ public class FieldSet implements Iterable<Integer>
return this.collection.iterator();
}
public int[] toArray() {
int[] a = new int[this.collection.size()];
int i = 0;
for (int col : this.collection) {
a[i++] = col;
}
return a;
}
// --------------------------------------------------------------------------------------------
/**
......
......@@ -18,7 +18,7 @@ package eu.stratosphere.pact.compiler;
/**
* An exception that is thrown by the pact compiler when encountering an illegal condition.
*
* @author Stephan Ewen (stephan.ewen@tu-berlin.de)
* @author Stephan Ewen
*/
public class CompilerException extends RuntimeException {
......@@ -64,5 +64,4 @@ public class CompilerException extends RuntimeException {
public CompilerException(String message, Throwable cause) {
super(message, cause);
}
}
......@@ -17,7 +17,10 @@ package eu.stratosphere.pact.compiler;
import eu.stratosphere.pact.common.contract.Order;
import eu.stratosphere.pact.common.contract.Ordering;
import eu.stratosphere.pact.common.util.FieldSet;
import eu.stratosphere.pact.compiler.plan.OptimizerNode;
import eu.stratosphere.pact.compiler.plan.candidate.Channel;
import eu.stratosphere.pact.runtime.shipping.ShipStrategyType;
/**
* This class represents global properties of the data. Global properties are properties that
......@@ -28,9 +31,9 @@ import eu.stratosphere.pact.compiler.plan.OptimizerNode;
*/
public final class GlobalProperties implements Cloneable
{
private PartitionProperty partitioning; // the type partitioning
private PartitioningProperty partitioning; // the type partitioning
private OptimizerFieldSet partitioningFields; // the fields which are partitioned
private FieldSet partitioningFields; // the fields which are partitioned
private Ordering ordering; // order of the partitioned fields, if it is an ordered (range) range partitioning
......@@ -40,52 +43,65 @@ public final class GlobalProperties implements Cloneable
* Initializes the global properties with no partitioning.
*/
public GlobalProperties() {
this.partitioning = PartitionProperty.NONE;
}
/**
* @param partitioning
* @param ordering
*/
public GlobalProperties(PartitionProperty partitioning, Ordering ordering) {
this.partitioning = partitioning;
this.ordering = ordering;
this.partitioning = PartitioningProperty.RANDOM;
}
/**
* @param partitioning
* @param partitioningFields
* @param partitioning2
* @param newFields
*/
public GlobalProperties(PartitionProperty partitioning, OptimizerFieldSet partitioningFields) {
private GlobalProperties(PartitioningProperty partitioning, FieldSet fields) {
this.partitioning = partitioning;
this.partitioningFields = partitioningFields;
this.partitioningFields = fields;
}
// --------------------------------------------------------------------------------------------
/**
* Sets the partitioning property for the global properties.
*
* @param partitioning
* The new partitioning to set.
* @param partitioning The new partitioning to set.
* @param partitionedFields
*/
public void setPartitioning(PartitionProperty partitioning, OptimizerFieldSet partitionedFields) {
this.partitioning = partitioning;
public void setHashPartitioned(FieldSet partitionedFields) {
this.partitioning = PartitioningProperty.HASH_PARTITIONED;
this.partitioningFields = partitionedFields;
this.ordering = null;
}
public void setPartitioning(PartitionProperty partitioning, Ordering ordering) {
this.partitioning = partitioning;
public void setRangePartitioned(Ordering ordering) {
this.partitioning = PartitioningProperty.RANGE_PARTITIONED;
this.ordering = ordering;
this.partitioningFields = null;
}
public void setAnyPartitioning(FieldSet partitionedFields) {
this.partitioning = PartitioningProperty.ANY_PARTITIONING;
this.partitioningFields = partitionedFields;
this.ordering = null;
}
public void setRandomDistribution() {
this.partitioning = PartitioningProperty.RANDOM;
this.partitioningFields = null;
this.ordering = null;
}
public void setFullyReplicated() {
this.partitioning = PartitioningProperty.FULL_REPLICATION;
this.partitioningFields = null;
this.ordering = null;
}
/**
* Gets the partitioning property.
*
* @return The partitioning property.
*/
public PartitionProperty getPartitioning() {
public PartitioningProperty getPartitioning() {
return partitioning;
}
......@@ -94,7 +110,7 @@ public final class GlobalProperties implements Cloneable
*
* @return The partitioning fields.
*/
public OptimizerFieldSet getPartitionedFields() {
public FieldSet getPartitionedFields() {
return this.partitioningFields;
}
......@@ -111,14 +127,14 @@ public final class GlobalProperties implements Cloneable
* Checks, if the properties in this object are trivial, i.e. only standard values.
*/
public boolean isTrivial() {
return partitioning == PartitionProperty.NONE;
return partitioning == PartitioningProperty.RANDOM;
}
/**
* This method resets the properties to a state where no properties are given.
*/
public void reset() {
this.partitioning = PartitionProperty.NONE;
this.partitioning = PartitioningProperty.RANDOM;
this.ordering = null;
this.partitioningFields = null;
}
......@@ -140,8 +156,8 @@ public final class GlobalProperties implements Cloneable
}
}
} else if (this.partitioningFields != null) {
for (ColumnWithType col : this.partitioningFields) {
if (!node.isFieldConstant(input, col.getColumnIndex())) {
for (int colIndex : this.partitioningFields) {
if (!node.isFieldConstant(input, colIndex)) {
return null;
}
}
......@@ -163,17 +179,17 @@ public final class GlobalProperties implements Cloneable
else if (partitioningFields != null) {
boolean allIn = true;
boolean atLeasOneIn = false;
for (ColumnWithType col : this.partitioningFields) {
boolean res = node.isFieldConstant(input, col.getColumnIndex());
for (int col : this.partitioningFields) {
boolean res = node.isFieldConstant(input, col);
allIn &= res;
atLeasOneIn |= res;
}
if (allIn) {
return this;
} else if (atLeasOneIn) {
OptimizerFieldSet newFields = new OptimizerFieldSet();
for (ColumnWithType nc : this.partitioningFields) {
if (node.isFieldConstant(input, nc.getColumnIndex())) {
FieldSet newFields = new FieldSet();
for (Integer nc : this.partitioningFields) {
if (node.isFieldConstant(input, nc)) {
newFields.add(nc);
}
}
......@@ -196,12 +212,16 @@ public final class GlobalProperties implements Cloneable
*/
public boolean isMetBy(GlobalProperties other)
{
if (this.partitioning == PartitionProperty.NONE) {
if (this.partitioning == PartitioningProperty.FULL_REPLICATION || other.partitioning == PartitioningProperty.FULL_REPLICATION) {
return other.partitioning == this.partitioning;
}
if (this.partitioning == PartitioningProperty.RANDOM) {
return true;
}
if (this.partitioning == PartitionProperty.ANY) {
if (other.partitioning == PartitionProperty.NONE) {
if (this.partitioning == PartitioningProperty.ANY_PARTITIONING) {
if (other.partitioning == PartitioningProperty.RANDOM) {
return false;
}
} else if (other.partitioning != this.partitioning) {
......@@ -233,6 +253,30 @@ public final class GlobalProperties implements Cloneable
throw new RuntimeException("Found a partitioning property, but no fields.");
}
}
/**
* Parameterizes the ship strategy fields of a channel such that the channel produces the desired global properties.
*
* @param channel The channel to parameterize.
*/
public void parameterizeChannel(Channel channel)
{
if (this.partitioning == null || this.partitioning == PartitioningProperty.RANDOM) {
channel.setShipStrategy(ShipStrategyType.FORWARD);
} else switch (this.partitioning) {
case FULL_REPLICATION:
channel.setShipStrategy(ShipStrategyType.BROADCAST);
case ANY_PARTITIONING:
case HASH_PARTITIONED:
channel.setShipStrategy(ShipStrategyType.PARTITION_HASH, Utils.createOrderedFromSet(this.partitioningFields));
break;
case RANGE_PARTITIONED:
channel.setShipStrategy(ShipStrategyType.PARTITION_RANGE, this.ordering.getInvolvedIndexes(), this.ordering.getFieldSortDirections());
break;
default:
throw new CompilerException();
}
}
// ------------------------------------------------------------------------
......
......@@ -19,6 +19,8 @@ import eu.stratosphere.pact.common.contract.Ordering;
import eu.stratosphere.pact.common.util.FieldList;
import eu.stratosphere.pact.common.util.FieldSet;
import eu.stratosphere.pact.compiler.plan.OptimizerNode;
import eu.stratosphere.pact.compiler.plan.candidate.Channel;
import eu.stratosphere.pact.runtime.task.util.TaskConfig.LocalStrategy;
/**
* This class represents local properties of the data. A local property is a property that exists
......@@ -28,9 +30,9 @@ import eu.stratosphere.pact.compiler.plan.OptimizerNode;
*/
public final class LocalProperties implements Cloneable
{
private Ordering ordering; // order inside a partition, null if not ordered
private Ordering ordering; // order inside a partition, null if not ordered
private OptimizerFieldSet groupedFields; // fields by which the stream is grouped. null if not grouped.
private FieldSet groupedFields; // fields by which the stream is grouped. null if not grouped.
private FieldSet uniqueFields; // fields whose value combination is unique in the stream
......@@ -52,7 +54,7 @@ public final class LocalProperties implements Cloneable
* @param groupedFields The grouped fields for these local properties.
* @param uniqueFields The unique fields for these local properties.
*/
private LocalProperties(Ordering ordering, OptimizerFieldSet groupedFields, FieldSet uniqueFields) {
private LocalProperties(Ordering ordering, FieldSet groupedFields, FieldSet uniqueFields) {
this.ordering = ordering;
this.groupedFields = groupedFields;
this.uniqueFields = uniqueFields;
......@@ -84,7 +86,7 @@ public final class LocalProperties implements Cloneable
*
* @return The grouped fields, or <code>null</code> if nothing is grouped.
*/
public OptimizerFieldSet getGroupedFields() {
public FieldSet getGroupedFields() {
return this.groupedFields;
}
......@@ -93,7 +95,7 @@ public final class LocalProperties implements Cloneable
*
* @param groupedFields The fields that are grouped in these data properties.
*/
public void setGroupedFields(OptimizerFieldSet groupedFields) {
public void setGroupedFields(FieldSet groupedFields) {
this.groupedFields = groupedFields;
}
......@@ -226,11 +228,29 @@ public final class LocalProperties implements Cloneable
if (this.uniqueFields != null) {
// we demand field uniqueness
throw new RuntimeException("Uniqueness as a required property is not supported.");
throw new CompilerException("Uniqueness as a required property is not supported.");
}
return true;
}
/**
* Parameterizes the local strategy fields of a channel such that the channel produces the desired local properties.
*
* @param channel The channel to parameterize.
*/
public void parameterizeChannel(Channel channel)
{
if (this.uniqueFields != null) {
throw new CompilerException("Uniqueness as a required property is not supported.");
}
if (this.ordering != null) {
channel.setLocalStrategy(LocalStrategy.SORT, this.ordering.getInvolvedIndexes(), this.ordering.getFieldSortDirections());
} else if (this.groupedFields != null) {
channel.setLocalStrategy(LocalStrategy.SORT, Utils.createOrderedFromSet(this.groupedFields));
}
}
// ------------------------------------------------------------------------
......@@ -286,23 +306,14 @@ public final class LocalProperties implements Cloneable
public LocalProperties clone() {
LocalProperties newProps = new LocalProperties();
if (this.ordering != null) {
newProps.ordering = this.ordering.clone();
newProps.ordering = this.ordering.clone();
}
if (this.groupedFields != null) {
newProps.groupedFields = this.groupedFields.clone();
newProps.groupedFields = this.groupedFields.clone();
}
if (this.uniqueFields != null) {
newProps.uniqueFields = this.uniqueFields.clone();
newProps.uniqueFields = this.uniqueFields.clone();
}
return newProps;
}
/**
* Convenience method to create copies without the cloning exception.
*
* @return A perfect deep copy of this object.
*/
public final LocalProperties createCopy() {
return this.clone();
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2012 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.pact.compiler;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import eu.stratosphere.pact.common.type.Key;
/**
*
*/
public class OptimizerFieldSet implements Iterable<ColumnWithType>
{
protected final Collection<ColumnWithType> collection;
// --------------------------------------------------------------------------------------------
public OptimizerFieldSet() {
this.collection = initCollection();
}
// --------------------------------------------------------------------------------------------
public void add(ColumnWithType columnIndex) {
this.collection.add(columnIndex);
}
public void add(int columnIndex, Class<? extends Key> columnType) {
add(new ColumnWithType(columnIndex, columnType));
}
public void addAll(Collection<ColumnWithType> columnIndexes) {
this.collection.addAll(columnIndexes);
}
public void addAll(OptimizerFieldSet set) {
for (ColumnWithType i : set) {
add(i);
}
}
public boolean contains(ColumnWithType columnIndex) {
return this.collection.contains(columnIndex);
}
public int size() {
return this.collection.size();
}
/* (non-Javadoc)
* @see java.lang.Iterable#iterator()
*/
@Override
public Iterator<ColumnWithType> iterator() {
return this.collection.iterator();
}
// --------------------------------------------------------------------------------------------
/**
* Checks if the given set of fields is a valid subset of this set of fields. For unordered
* sets, this is the case if all of the given set's fields are also part of this field.
* <p>
* Subclasses that describe field sets where the field order matters must override this method
* to implement a field ordering sensitive check.
*
* @param set The set that is a candidate subset.
* @return True, if the given set is a subset of this set, false otherwise.
*/
public boolean isValidSubset(OptimizerFieldSet set) {
if (set.size() > size()) {
return false;
}
for (ColumnWithType i : set) {
if (!contains(i)) {
return false;
}
}
return true;
}
// --------------------------------------------------------------------------------------------
/* (non-Javadoc)
* @see java.lang.Object#hashCode()
*/
@Override
public int hashCode() {
return this.collection.hashCode();
}
/* (non-Javadoc)
* @see java.lang.Object#equals(java.lang.Object)
*/
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (obj instanceof OptimizerFieldSet) {
return this.collection.equals(((OptimizerFieldSet) obj).collection);
} else {
return false;
}
}
/* (non-Javadoc)
* @see java.lang.Object#toString()
*/
@Override
public String toString() {
StringBuilder bld = new StringBuilder();
bld.append(getDescriptionPrefix()).append(' ').append('[');
for (ColumnWithType i : this.collection) {
bld.append(i);
bld.append(',');
bld.append(' ');
}
bld.setLength(bld.length() - 2);
bld.append(']');
return bld.toString();
}
/* (non-Javadoc)
* @see java.lang.Object#clone()
*/
public OptimizerFieldSet clone() {
OptimizerFieldSet set = new OptimizerFieldSet();
set.addAll(this.collection);
return set;
}
// --------------------------------------------------------------------------------------------
protected Collection<ColumnWithType> initCollection() {
return new HashSet<ColumnWithType>();
}
protected String getDescriptionPrefix() {
return "Field Set";
}
}
......@@ -57,6 +57,7 @@ import eu.stratosphere.pact.compiler.plan.OptimizedPlan;
import eu.stratosphere.pact.compiler.plan.OptimizerNode;
import eu.stratosphere.pact.compiler.plan.ReduceNode;
import eu.stratosphere.pact.compiler.plan.SinkJoiner;
import eu.stratosphere.pact.compiler.plan.candidate.PlanNode;
/**
* The optimizer that takes the user specified pact plan and creates an optimized plan that contains
......@@ -670,13 +671,13 @@ public class PactCompiler {
BranchesVisitor branchingVisitor = new BranchesVisitor();
rootNode.accept(branchingVisitor);
//
// // the final step is now to generate the actual plan alternatives
// List<? extends OptimizerNode> bestPlan = rootNode.getAlternativePlans(this.costEstimator);
//
// if (bestPlan.size() != 1) {
// throw new CompilerException("Error in compiler: more than one best plan was created!");
// }
// the final step is now to generate the actual plan alternatives
List<? extends PlanNode> bestPlan = rootNode.getAlternativePlans(this.costEstimator);
if (bestPlan.size() != 1) {
throw new CompilerException("Error in compiler: more than one best plan was created!");
}
//
// // check if the best plan's root is a data sink (single sink plan)
// // if so, directly take it. if it is a sink joiner node, get its contained sinks
......@@ -899,7 +900,7 @@ public class PactCompiler {
// that computation must happen during the last descend.
if (node.haveAllOutputConnectionInterestingProperties() && node.getInterestingProperties() == null) {
node.computeInterestingProperties();
node.computeUnionOfInterestingPropertiesFromSuccessors();
node.computeInterestingPropertiesForInputs(this.estimator);
return true;
} else {
......
......@@ -16,15 +16,16 @@
package eu.stratosphere.pact.compiler;
/**
* An enumeration tracking the different types of partitioning.
* An enumeration tracking the different types of sharding strategies.
*
* @author Stephan Ewen (stephan.ewen@tu-berlin.de)
* @author Stephan Ewen
*/
public enum PartitionProperty {
public enum PartitioningProperty {
/**
* Constant indicating no partitioning.
* Constant indicating no particular partitioning (i.e. random)
*/
NONE,
RANDOM,
/**
* Constant indicating a hash partitioning.
......@@ -39,17 +40,41 @@ public enum PartitionProperty {
/**
* Constant indicating any not further specified partitioning.
*/
ANY;
ANY_PARTITIONING,
/**
* Constant indicating full replication of the data.
*/
FULL_REPLICATION;
/**
* Checks, if this property represents in fact a partitioning. That is,
* whether this property is not equal to <tt>PartitionProperty.NONE</tt>.
* whether this property is not equal to <tt>PartitionProperty.FULL_REPLICATION</tt>.
*
* @return True, if this enum constant is unequal to <tt>PartitionProperty.NONE</tt>,
* @return True, if this enum constant is unequal to <tt>PartitionProperty.FULL_REPLICATION</tt>,
* false otherwise.
*/
public boolean isPartitioned() {
return this != PartitionProperty.NONE;
return this != FULL_REPLICATION;
}
/**
* Checks, if this property represents a full replication.
*
* @return True, if this enum constant is equal to <tt>PartitionProperty.FULL_REPLICATION</tt>,
* false otherwise.
*/
public boolean isReplication() {
return this == FULL_REPLICATION;
}
/**
* Checks if this property presents a partitioning that is not random, but on a partitioning key.
*
* @return True, if the data is partitioned on a key.
*/
public boolean isPartitionedOnKey() {
return isPartitioned() && this != RANDOM;
}
/**
......@@ -63,17 +88,6 @@ public enum PartitionProperty {
* @return True, if this enum constant is a re-computable partitioning.
*/
public boolean isComputablyPartitioned() {
return this == HASH_PARTITIONED || this == PartitionProperty.RANGE_PARTITIONED;
}
/**
* Checks whether this partition property is compatible with another one.
*
* @param other
* The other partition property to check against.
* @return True, if this partition property is compatible with the other, false if not.
*/
public boolean isCompatibleWith(PartitionProperty other) {
return other == this && this != ANY && this != NONE;
return this == HASH_PARTITIONED || this == RANGE_PARTITIONED;
}
}
......@@ -15,101 +15,29 @@
package eu.stratosphere.pact.compiler;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Arrays;
import eu.stratosphere.pact.common.contract.Ordering;
import eu.stratosphere.pact.common.util.FieldList;
import eu.stratosphere.pact.common.util.FieldSet;
/**
*
*
* @author Stephan Ewen
*/
public class OptimizerFieldList extends OptimizerFieldSet
public class Utils
{
public OptimizerFieldList() {
super();
}
// --------------------------------------------------------------------------------------------
public ColumnWithType get(int pos) {
return get().get(pos);
}
// --------------------------------------------------------------------------------------------
/* (non-Javadoc)
* @see eu.stratosphere.pact.common.util.FieldSet#isValidSubset(eu.stratosphere.pact.common.util.FieldSet)
*/
@Override
public boolean isValidSubset(OptimizerFieldSet set) {
if (set instanceof OptimizerFieldList) {
return (isValidSubset((OptimizerFieldList) set));
} else {
return false;
}
}
public boolean isValidSubset(OptimizerFieldList list) {
if (list.size() > size()) {
return false;
}
final List<ColumnWithType> myList = get();
final List<ColumnWithType> theirList = list.get();
for (int i = 0; i < theirList.size(); i++) {
ColumnWithType myInt = myList.get(i);
ColumnWithType theirInt = theirList.get(i);
if (!myInt.equals(theirInt)) {
return false;
}
}
return true;
}
public boolean isValidUnorderedPrefix(OptimizerFieldSet set) {
if (set.size() > size()) {
return false;
}
List<ColumnWithType> list = get();
for (int i = 0; i < set.size(); i++) {
if (!set.contains(list.get(i))) {
return false;
}
}
return true;
public static final FieldList createOrderedFromSet(FieldSet set) {
final int[] cols = set.toArray();
Arrays.sort(cols);
return new FieldList(cols);
}
// --------------------------------------------------------------------------------------------
/* (non-Javadoc)
* @see eu.stratosphere.pact.common.util.AbstractFieldSet#initCollection()
*/
@Override
protected Collection<ColumnWithType> initCollection() {
return new ArrayList<ColumnWithType>();
}
/* (non-Javadoc)
* @see eu.stratosphere.pact.common.util.FieldSet#getDescriptionPrefix()
/**
* No instantiation.
*/
@Override
protected String getDescriptionPrefix() {
return "Field List";
}
private List<ColumnWithType> get() {
return (List<ColumnWithType>) this.collection;
}
// --------------------------------------------------------------------------------------------
public static OptimizerFieldList getFromOrdering(Ordering ordering) {
final OptimizerFieldList list = new OptimizerFieldList();
for (int i = 0; i < ordering.getNumberOfFields(); i++) {
list.add(ordering.getFieldNumber(i), ordering.getType(i));
}
return list;
}
private Utils() {}
}
......@@ -83,8 +83,6 @@ public abstract class CostEstimator {
case BROADCAST:
addBroadcastCost(channel, channel.getReplicationFactor(), costs);
break;
case SFR:
throw new CompilerException("Symmetric-Fragment-And-Replicate Strategy currently not supported.");
default:
throw new CompilerException("Unknown shipping strategy for input: " + channel.getShipStrategy());
}
......
......@@ -30,7 +30,7 @@ import eu.stratosphere.pact.common.util.FieldSet;
import eu.stratosphere.pact.compiler.CompilerException;
import eu.stratosphere.pact.compiler.GlobalProperties;
import eu.stratosphere.pact.compiler.LocalProperties;
import eu.stratosphere.pact.compiler.PartitionProperty;
import eu.stratosphere.pact.compiler.PartitioningProperty;
import eu.stratosphere.pact.compiler.plan.OptimizedPlan;
import eu.stratosphere.pact.compiler.plan.OptimizerNode;
import eu.stratosphere.pact.compiler.plan.PactConnection;
......@@ -345,7 +345,7 @@ public class JSONGenerator implements Visitor<OptimizerNode> {
this.jsonString.append(",\n\t\t\"global_properties\": [\n");
addProperty(jsonString, "Partitioning", gp.getPartitioning().name(), true);
if (gp.getPartitioning() != PartitionProperty.NONE) {
if (gp.getPartitioning() != PartitioningProperty.NONE) {
addProperty(jsonString, "Partitioned on", gp.getPartitionedFields().toString(), false);
}
if (gp.getOrdering() != null) {
......
......@@ -33,7 +33,7 @@ import eu.stratosphere.pact.compiler.DataStatistics;
import eu.stratosphere.pact.compiler.GlobalProperties;
import eu.stratosphere.pact.compiler.LocalProperties;
import eu.stratosphere.pact.compiler.PactCompiler;
import eu.stratosphere.pact.compiler.PartitionProperty;
import eu.stratosphere.pact.compiler.PartitioningProperty;
import eu.stratosphere.pact.compiler.costs.CostEstimator;
import eu.stratosphere.pact.runtime.shipping.ShipStrategy;
import eu.stratosphere.pact.runtime.shipping.ShipStrategy.ForwardSS;
......@@ -204,7 +204,7 @@ public class CoGroupNode extends TwoInputNode {
}
// partition and any order
p.getGlobalProperties().setPartitioning(PartitionProperty.ANY, (FieldList)keyFields.clone());
p.getGlobalProperties().setPartitioning(PartitioningProperty.ANY, (FieldList)keyFields.clone());
Ordering ordering = new Ordering();
for (Integer index : getPactContract().getKeyColumnNumbers(inputNum)) {
......@@ -221,7 +221,7 @@ public class CoGroupNode extends TwoInputNode {
// partition only
p = new InterestingProperties();
p.getGlobalProperties().setPartitioning(PartitionProperty.ANY, (FieldList)keyFields.clone());
p.getGlobalProperties().setPartitioning(PartitioningProperty.ANY, (FieldList)keyFields.clone());
estimator.getHashPartitioningCost(input, p.getMaximalCosts());
InterestingProperties.mergeUnionOfInterestingProperties(target, p);
}
......@@ -277,14 +277,14 @@ public class CoGroupNode extends TwoInputNode {
// 2 alternatives:
// 1) re-partition 2 the same way as 1
// 2) re-partition 1 the same way as 2
if (gp1.getPartitioning() == PartitionProperty.HASH_PARTITIONED
&& gp2.getPartitioning() == PartitionProperty.RANGE_PARTITIONED) {
if (gp1.getPartitioning() == PartitioningProperty.HASH_PARTITIONED
&& gp2.getPartitioning() == PartitioningProperty.RANGE_PARTITIONED) {
createCoGroupAlternative(outputPlans, subPlan1, subPlan2, new ForwardSS(),
new PartitionHashSS(this.keySet2), estimator);
createCoGroupAlternative(outputPlans, subPlan1, subPlan2, new PartitionRangeSS(this.keySet1),
new ForwardSS(), estimator);
} else if (gp1.getPartitioning() == PartitionProperty.RANGE_PARTITIONED
&& gp2.getPartitioning() == PartitionProperty.HASH_PARTITIONED) {
} else if (gp1.getPartitioning() == PartitioningProperty.RANGE_PARTITIONED
&& gp2.getPartitioning() == PartitioningProperty.HASH_PARTITIONED) {
createCoGroupAlternative(outputPlans, subPlan1, subPlan2, new ForwardSS(),
new PartitionRangeSS(this.keySet2), estimator);
createCoGroupAlternative(outputPlans, subPlan1, subPlan2, new PartitionHashSS(this.keySet1),
......@@ -307,7 +307,7 @@ public class CoGroupNode extends TwoInputNode {
// we create an additional plan with a range partitioning
// if this is not already a range partitioning
if (gp1.getPartitioning() != PartitionProperty.RANGE_PARTITIONED) {
if (gp1.getPartitioning() != PartitioningProperty.RANGE_PARTITIONED) {
// createCoGroupAlternative(outputPlans, predList1, predList2, ShipStrategy.PARTITION_RANGE,
// ShipStrategy.PARTITION_RANGE, estimator);
}
......@@ -317,12 +317,12 @@ public class CoGroupNode extends TwoInputNode {
// add two plans:
// 1) make input 2 the same partitioning as input 1
// 2) partition both inputs with a different partitioning function (hash <-> range)
if (partitioningIsOnRightFields(gp1, 0) && gp1.getPartitioning() == PartitionProperty.HASH_PARTITIONED) {
if (partitioningIsOnRightFields(gp1, 0) && gp1.getPartitioning() == PartitioningProperty.HASH_PARTITIONED) {
createCoGroupAlternative(outputPlans, subPlan1, subPlan2, ss1,
new PartitionHashSS(this.keySet2), estimator);
// createCoGroupAlternative(outputPlans, predList1, predList2, ShipStrategy.PARTITION_RANGE,
// ShipStrategy.PARTITION_RANGE, estimator);
} else if (partitioningIsOnRightFields(gp1, 0) && gp1.getPartitioning() == PartitionProperty.RANGE_PARTITIONED) {
} else if (partitioningIsOnRightFields(gp1, 0) && gp1.getPartitioning() == PartitioningProperty.RANGE_PARTITIONED) {
createCoGroupAlternative(outputPlans, subPlan1, subPlan2, ss1,
new PartitionRangeSS(this.keySet2), estimator);
createCoGroupAlternative(outputPlans, subPlan1, subPlan2, new PartitionHashSS(this.keySet1),
......@@ -339,12 +339,12 @@ public class CoGroupNode extends TwoInputNode {
// add two plans:
// 1) make input 1 the same partitioning as input 2
// 2) partition both inputs with a different partitioning function (hash <-> range)
if (partitioningIsOnRightFields(gp2, 1) && gp2.getPartitioning() == PartitionProperty.HASH_PARTITIONED) {
if (partitioningIsOnRightFields(gp2, 1) && gp2.getPartitioning() == PartitioningProperty.HASH_PARTITIONED) {
createCoGroupAlternative(outputPlans, subPlan1, subPlan2, new PartitionHashSS(this.keySet1), ss2,
estimator);
// createCoGroupAlternative(outputPlans, predList1, predList2, ShipStrategy.PARTITION_RANGE,
// ShipStrategy.PARTITION_RANGE, estimator);
} else if (partitioningIsOnRightFields(gp2, 1) && gp2.getPartitioning() == PartitionProperty.RANGE_PARTITIONED) {
} else if (partitioningIsOnRightFields(gp2, 1) && gp2.getPartitioning() == PartitioningProperty.RANGE_PARTITIONED) {
createCoGroupAlternative(outputPlans, subPlan1, subPlan2, new PartitionRangeSS(this.keySet1), ss2,
estimator);
createCoGroupAlternative(outputPlans, subPlan1, subPlan2, new PartitionHashSS(this.keySet1),
......@@ -375,10 +375,10 @@ public class CoGroupNode extends TwoInputNode {
case FORWARD:
if (partitioningIsOnRightFields(gp2, 1) && gp2.getPartitioning().isPartitioned()) {
// adapt to the partitioning
if (gp2.getPartitioning() == PartitionProperty.HASH_PARTITIONED) {
if (gp2.getPartitioning() == PartitioningProperty.HASH_PARTITIONED) {
//TODO check other input for partitioining
ss1 = new PartitionHashSS(this.keySet1);
} else if (gp2.getPartitioning() == PartitionProperty.RANGE_PARTITIONED) {
} else if (gp2.getPartitioning() == PartitioningProperty.RANGE_PARTITIONED) {
ss1 = new PartitionRangeSS(this.keySet1);
} else {
throw new CompilerException();
......@@ -389,11 +389,11 @@ public class CoGroupNode extends TwoInputNode {
}
break;
case PARTITION_HASH:
ss1 = (partitioningIsOnSameSubkey(gp1.getPartitionedFields(), this.keySet2) && gp1.getPartitioning() == PartitionProperty.HASH_PARTITIONED) ? new ForwardSS()
ss1 = (partitioningIsOnSameSubkey(gp1.getPartitionedFields(), this.keySet2) && gp1.getPartitioning() == PartitioningProperty.HASH_PARTITIONED) ? new ForwardSS()
: new PartitionHashSS(this.keySet1);
break;
case PARTITION_RANGE:
ss1 = (partitioningIsOnRightFields(gp1, 0) && gp1.getPartitioning() == PartitionProperty.RANGE_PARTITIONED) ? new ForwardSS()
ss1 = (partitioningIsOnRightFields(gp1, 0) && gp1.getPartitioning() == PartitioningProperty.RANGE_PARTITIONED) ? new ForwardSS()
: new PartitionRangeSS(this.keySet1);
break;
default:
......@@ -417,9 +417,9 @@ public class CoGroupNode extends TwoInputNode {
case FORWARD:
if (partitioningIsOnRightFields(gp1, 0) && gp1.getPartitioning().isPartitioned()) {
// adapt to the partitioning
if (gp1.getPartitioning() == PartitionProperty.HASH_PARTITIONED) {
if (gp1.getPartitioning() == PartitioningProperty.HASH_PARTITIONED) {
ss2 = new PartitionHashSS(this.keySet2);
} else if (gp1.getPartitioning() == PartitionProperty.RANGE_PARTITIONED) {
} else if (gp1.getPartitioning() == PartitioningProperty.RANGE_PARTITIONED) {
ss2 = new PartitionRangeSS(this.keySet2);
} else {
throw new CompilerException();
......@@ -430,11 +430,11 @@ public class CoGroupNode extends TwoInputNode {
}
break;
case PARTITION_HASH:
ss2 = (partitioningIsOnSameSubkey(this.keySet1, gp2.getPartitionedFields()) && gp2.getPartitioning() == PartitionProperty.HASH_PARTITIONED) ? new ForwardSS()
ss2 = (partitioningIsOnSameSubkey(this.keySet1, gp2.getPartitionedFields()) && gp2.getPartitioning() == PartitioningProperty.HASH_PARTITIONED) ? new ForwardSS()
: new PartitionHashSS(this.keySet2);
break;
case PARTITION_RANGE:
ss2 = (partitioningIsOnRightFields(gp2, 1) && gp2.getPartitioning() == PartitionProperty.RANGE_PARTITIONED) ? new ForwardSS()
ss2 = (partitioningIsOnRightFields(gp2, 1) && gp2.getPartitioning() == PartitioningProperty.RANGE_PARTITIONED) ? new ForwardSS()
: new PartitionRangeSS(this.keySet2);
break;
default:
......@@ -740,7 +740,7 @@ public class CoGroupNode extends TwoInputNode {
throw new CompilerException("Invalid input number "+inputNum+" for CoGroup.");
}
if (gp.getPartitioning() == PartitionProperty.RANGE_PARTITIONED) {
if (gp.getPartitioning() == PartitioningProperty.RANGE_PARTITIONED) {
return keyFields.equals(partitionedFields);
}
......
......@@ -26,7 +26,7 @@ import eu.stratosphere.pact.common.contract.Ordering;
import eu.stratosphere.pact.common.plan.Visitor;
import eu.stratosphere.pact.compiler.CompilerException;
import eu.stratosphere.pact.compiler.DataStatistics;
import eu.stratosphere.pact.compiler.PartitionProperty;
import eu.stratosphere.pact.compiler.PartitioningProperty;
import eu.stratosphere.pact.compiler.costs.CostEstimator;
import eu.stratosphere.pact.runtime.task.util.TaskConfig.LocalStrategy;
......@@ -228,7 +228,7 @@ public class DataSinkNode extends OptimizerNode {
// in both cases create a range partitioned only IP
InterestingProperties partitioningProps = new InterestingProperties();
partitioningProps.getGlobalProperties().setPartitioning(PartitionProperty.RANGE_PARTITIONED, partitioning);
partitioningProps.getGlobalProperties().setPartitioning(PartitioningProperty.RANGE_PARTITIONED, partitioning);
estimator.addRangePartitionCost(this.input, partitioningProps.getMaximalCosts());
this.input.addInterestingProperties(partitioningProps);
......@@ -241,7 +241,7 @@ public class DataSinkNode extends OptimizerNode {
// global sort case: create IP for range partitioned and sorted
InterestingProperties globalSortProps = new InterestingProperties();
globalSortProps.getGlobalProperties().setPartitioning(PartitionProperty.RANGE_PARTITIONED, partitioning);
globalSortProps.getGlobalProperties().setPartitioning(PartitioningProperty.RANGE_PARTITIONED, partitioning);
estimator.addRangePartitionCost(this.input, globalSortProps.getMaximalCosts());
globalSortProps.getLocalProperties().setOrdering(partitioning);
......
......@@ -15,6 +15,7 @@
package eu.stratosphere.pact.compiler.plan;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
......@@ -27,19 +28,24 @@ import eu.stratosphere.pact.common.generic.io.InputFormat;
import eu.stratosphere.pact.common.io.statistics.BaseStatistics;
import eu.stratosphere.pact.common.plan.Visitor;
import eu.stratosphere.pact.common.util.FieldSet;
import eu.stratosphere.pact.compiler.Costs;
import eu.stratosphere.pact.compiler.DataStatistics;
import eu.stratosphere.pact.compiler.PactCompiler;
import eu.stratosphere.pact.compiler.costs.CostEstimator;
import eu.stratosphere.pact.compiler.plan.candidate.PlanNode;
import eu.stratosphere.pact.compiler.plan.candidate.SourcePlanNode;
import eu.stratosphere.pact.runtime.task.util.TaskConfig.LocalStrategy;
/**
* The Optimizer representation of a data source.
* The optimizer's internal representation of a data source.
*
* @author Stephan Ewen
*/
public class DataSourceNode extends OptimizerNode
{
private long inputSize; //the size of the input in bytes
private List<PlanNode> candidate; // the candidate (there can only be one) for this node
private long inputSize; //the size of the input in bytes
/**
* Creates a new DataSourceNode for the given contract.
......@@ -249,38 +255,23 @@ public class DataSourceNode extends OptimizerNode
* @see eu.stratosphere.pact.compiler.plan.OptimizerNode#computeAlternativePlans()
*/
@Override
public List<OptimizerNode> getAlternativePlans(CostEstimator estimator) {
// if (this.cachedPlans != null) {
// return this.cachedPlans;
// }
//
// GlobalProperties gp = new GlobalProperties();
// LocalProperties lp = new LocalProperties();
//
// // first, compute the properties of the output
//// if (getOutputContract() == OutputContract.UniqueKey) {
//// gp.setKeyUnique(true);
//// gp.setPartitioning(PartitionProperty.ANY);
////
//// lp.setKeyUnique(true);
//// lp.setKeysGrouped(true);
//// }
//
// DataSourceNode candidate = new DataSourceNode(this, gp, lp);
//
// // compute the costs
// candidate.setCosts(new Costs(0, this.inputSize));
//
// // since there is only a single plan for the data-source, return a list with that element only
// List<OptimizerNode> plans = new ArrayList<OptimizerNode>(1);
// plans.add(candidate);
//
// if (isBranching()) {
// this.cachedPlans = plans;
// }
//
// return plans;
return null;
public List<PlanNode> getAlternativePlans(CostEstimator estimator) {
if (this.candidate != null) {
return this.candidate;
}
SourcePlanNode candidate = new SourcePlanNode(this);
candidate.updatePropertiesWithUniqueSets(getUniqueFields());
candidate.setCosts(new Costs(0, this.inputSize));
// since there is only a single plan for the data-source, return a list with that element only
List<PlanNode> plans = new ArrayList<PlanNode>(1);
plans.add(candidate);
if (isBranching()) {
this.candidate = plans;
}
return plans;
}
......
......@@ -21,7 +21,10 @@ import java.util.List;
import eu.stratosphere.pact.compiler.Costs;
import eu.stratosphere.pact.compiler.GlobalProperties;
import eu.stratosphere.pact.compiler.LocalProperties;
import eu.stratosphere.pact.compiler.plan.candidate.Channel;
import eu.stratosphere.pact.compiler.plan.candidate.PlanNode;
import eu.stratosphere.pact.runtime.shipping.ShipStrategyType;
import eu.stratosphere.pact.runtime.task.util.TaskConfig.LocalStrategy;
/**
* The interesting properties that a node in the optimizer plan hands to its predecessors. It has the
......@@ -46,7 +49,7 @@ public class InterestingProperties implements Cloneable
*/
public InterestingProperties() {
// instantiate the maximal costs to the possible maximum
this.maximalCosts = new Costs(Long.MAX_VALUE, Long.MAX_VALUE);
this.maximalCosts = new Costs(0, 0);
this.globalProps = new GlobalProperties();
this.localProps = new LocalProperties();
......@@ -123,12 +126,15 @@ public class InterestingProperties implements Cloneable
return globalProps.isMetBy(node.getGlobalProperties()) && localProps.isMetBy(node.getLocalProperties());
}
public InterestingProperties filterByCodeAnnotations(OptimizerNode node, int input) {
GlobalProperties gp = this.globalProps.filterByNodesConstantSet(node, input);
LocalProperties lp = this.localProps.filterByNodesConstantSet(node, input);
public InterestingProperties filterByCodeAnnotations(OptimizerNode node, int input)
{
final GlobalProperties gp = this.globalProps.filterByNodesConstantSet(node, input);
final LocalProperties lp = this.localProps.filterByNodesConstantSet(node, input);
if (gp != this.globalProps || lp != this.localProps) {
if (gp == null && lp == null) {
if ((gp == null || gp.isTrivial()) && (lp == null || lp.isTrivial())) {
return null;
} else {
return new InterestingProperties(this.maximalCosts,
......@@ -138,6 +144,24 @@ public class InterestingProperties implements Cloneable
return this;
}
}
public Channel createChannelRealizingProperties(PlanNode inputNode) {
final Channel c = new Channel(inputNode);
if (this.globalProps.isMetBy(inputNode.getGlobalProperties())) {
c.setShipStrategy(ShipStrategyType.FORWARD);
} else {
this.globalProps.parameterizeChannel(c);
}
final LocalProperties lps = c.getLocalPropertiesAfterShippingOnly();
if (this.localProps.isMetBy(lps)) {
c.setLocalStrategy(LocalStrategy.NONE);
} else {
this.localProps.parameterizeChannel(c);
}
return c;
}
// ------------------------------------------------------------------------
......@@ -188,7 +212,7 @@ public class InterestingProperties implements Cloneable
@Override
public InterestingProperties clone() {
return new InterestingProperties(this.maximalCosts.clone(),
this.globalProps.clone(), this.localProps.createCopy());
this.globalProps.clone(), this.localProps.clone());
}
// ------------------------------------------------------------------------
......@@ -223,7 +247,7 @@ public class InterestingProperties implements Cloneable
}
}
// if it was not subsumed, add it
properties.add(toMerge.clone());
properties.add(toMerge);
}
/**
......@@ -249,6 +273,12 @@ public class InterestingProperties implements Cloneable
}
/**
* @param props
* @param node
* @param input
* @return
*/
public static final List<InterestingProperties> filterInterestingPropertiesForInput(
List<InterestingProperties> props, OptimizerNode node, int input)
{
......@@ -272,6 +302,6 @@ public class InterestingProperties implements Cloneable
new InterestingProperties(filteredProps.getMaximalCosts(), topDownAdjustedGP, filteredProps.localProps);
mergeUnionOfInterestingProperties(preserved, toAdd);
}
return preserved;
return preserved == null ? new ArrayList<InterestingProperties>() : preserved;
}
}
......@@ -19,15 +19,18 @@ import java.util.List;
import eu.stratosphere.pact.common.contract.MapContract;
import eu.stratosphere.pact.compiler.costs.CostEstimator;
import eu.stratosphere.pact.compiler.plan.candidate.Channel;
import eu.stratosphere.pact.compiler.plan.candidate.PlanNode;
import eu.stratosphere.pact.compiler.plan.candidate.SingleInputPlanNode;
import eu.stratosphere.pact.runtime.task.util.TaskConfig.LocalStrategy;
/**
* The Optimizer representation of a <i>Map</i> contract node.
* The optimizer's internal representation of a <i>Map</i> contract node.
*
* @author Stephan Ewen
*/
public class MapNode extends SingleInputNode {
public class MapNode extends SingleInputNode
{
/**
* Creates a new MapNode for the given contract.
*
......@@ -39,28 +42,6 @@ public class MapNode extends SingleInputNode {
setLocalStrategy(LocalStrategy.NONE);
}
// /**
// * Copy constructor to create a copy a MapNode with a different predecessor. The predecessor
// * is assumed to be of the same type and merely a copy with different strategies, as they
// * are created in the process of the plan enumeration.
// *
// * @param template
// * The node to create a copy of.
// * @param pred
// * The new predecessor.
// * @param conn
// * The old connection to copy properties from.
// * @param globalProps
// * The global properties of this copy.
// * @param localProps
// * The local properties of this copy.
// */
// protected MapNode(MapNode template, OptimizerNode pred, PactConnection conn, GlobalProperties globalProps,
// LocalProperties localProps) {
// super(template, pred, conn, globalProps, localProps);
// setLocalStrategy(LocalStrategy.NONE);
// }
/**
* Gets the contract object for this map node.
*
......@@ -109,60 +90,26 @@ public class MapNode extends SingleInputNode {
}
}
/*
* (non-Javadoc)
* @see eu.stratosphere.pact.compiler.plan.SingleInputNode#computeValidPlanAlternatives(java.util.List, eu.stratosphere.pact.compiler.costs.CostEstimator, java.util.List)
/* (non-Javadoc)
* @see eu.stratosphere.pact.compiler.plan.SingleInputNode#createPlanAlternatives(java.util.List, java.util.List)
*/
@Override
protected void computeValidPlanAlternatives(List<? extends OptimizerNode> altSubPlans, CostEstimator estimator, List<OptimizerNode> outputPlans) {
//
// // we have to check if all input ShipStrategies are the same or at least compatible
// ShipStrategy ss = new NoneSS();
//
// // check hint shipping strategy
// ShipStrategy hintSS = this.inConn.getShipStrategy();
// if(hintSS.type() == ShipStrategyType.BROADCAST || hintSS.type() == ShipStrategyType.SFR)
// // invalid strategy: we do not produce an alternative node
// return;
// else
// ss = hintSS;
//
// // if no hint for a strategy was provided, we use the default
// if(ss.type() == ShipStrategyType.NONE)
// ss = new ForwardSS();
//
// for(OptimizerNode subPlan : altSubPlans) {
//
// GlobalProperties gp = PactConnection.getGlobalPropertiesAfterConnection(subPlan, this, 0, ss);
// LocalProperties lp = PactConnection.getLocalPropertiesAfterConnection(subPlan, this, ss);
//
// MapNode nMap = new MapNode(this, subPlan, this.inConn, gp, lp);
// nMap.inConn.setShipStrategy(ss);
//
// // now, the properties (copied from the inputs) are filtered by the output contracts
// nMap.getGlobalProperties().filterByNodesConstantSet(this, 0);
// nMap.getLocalProperties().filterByNodesConstantSet(this, 0);
//
// // copy the cumulative costs and set the costs of the map itself to zero
// estimator.costOperator(nMap);
//
// outputPlans.add(nMap);
// }
protected void createPlanAlternatives(List<Channel> inputs, List<PlanNode> outputPlans)
{
for (Channel c : inputs) {
outputPlans.add(new SingleInputPlanNode(this, c, this.localStrategy));
}
}
/**
* Computes the number of stub calls.
*
* @return the number of stub calls.
*/
protected long computeNumberOfStubCalls() {
if(this.getPredNode() != null)
return this.getPredNode().estimatedNumRecords;
if (getPredecessorNode() != null)
return getPredecessorNode().estimatedNumRecords;
else
return -1;
}
}
......@@ -32,7 +32,7 @@ import eu.stratosphere.pact.compiler.Costs;
import eu.stratosphere.pact.compiler.GlobalProperties;
import eu.stratosphere.pact.compiler.LocalProperties;
import eu.stratosphere.pact.compiler.PactCompiler;
import eu.stratosphere.pact.compiler.PartitionProperty;
import eu.stratosphere.pact.compiler.PartitioningProperty;
import eu.stratosphere.pact.compiler.costs.CostEstimator;
import eu.stratosphere.pact.runtime.shipping.ShipStrategy;
import eu.stratosphere.pact.runtime.shipping.ShipStrategy.BroadcastSS;
......@@ -75,10 +75,6 @@ public class MatchNode extends TwoInputNode {
setLocalStrategy(LocalStrategy.HYBRIDHASH_FIRST);
} else if (PactCompiler.HINT_LOCAL_STRATEGY_HASH_BUILD_SECOND.equals(localStrategy)) {
setLocalStrategy(LocalStrategy.HYBRIDHASH_SECOND);
} else if (PactCompiler.HINT_LOCAL_STRATEGY_INMEM_HASH_BUILD_FIRST.equals(localStrategy)) {
setLocalStrategy(LocalStrategy.MMHASH_FIRST);
} else if (PactCompiler.HINT_LOCAL_STRATEGY_INMEM_HASH_BUILD_SECOND.equals(localStrategy)) {
setLocalStrategy(LocalStrategy.MMHASH_SECOND);
} else if (PactCompiler.HINT_LOCAL_STRATEGY_SORT_SELF_NESTEDLOOP.equals(localStrategy)) {
setLocalStrategy(LocalStrategy.SORT_SELF_NESTEDLOOP);
} else if (PactCompiler.HINT_LOCAL_STRATEGY_SELF_NESTEDLOOP.equals(localStrategy)) {
......@@ -91,30 +87,30 @@ public class MatchNode extends TwoInputNode {
}
}
/**
* Copy constructor to create a copy of a node with different predecessors. The predecessors
* is assumed to be of the same type as in the template node and merely copies with different
* strategies, as they are created in the process of the plan enumeration.
*
* @param template
* The node to create a copy of.
* @param pred1
* The new predecessor for the first input.
* @param pred2
* The new predecessor for the second input.
* @param conn1
* The old connection of the first input to copy properties from.
* @param conn2
* The old connection of the second input to copy properties from.
* @param globalProps
* The global properties of this copy.
* @param localProps
* The local properties of this copy.
*/
protected MatchNode(MatchNode template, OptimizerNode pred1, OptimizerNode pred2, PactConnection conn1,
PactConnection conn2, GlobalProperties globalProps, LocalProperties localProps) {
super(template, pred1, pred2, conn1, conn2, globalProps, localProps);
}
// /**
// * Copy constructor to create a copy of a node with different predecessors. The predecessors
// * is assumed to be of the same type as in the template node and merely copies with different
// * strategies, as they are created in the process of the plan enumeration.
// *
// * @param template
// * The node to create a copy of.
// * @param pred1
// * The new predecessor for the first input.
// * @param pred2
// * The new predecessor for the second input.
// * @param conn1
// * The old connection of the first input to copy properties from.
// * @param conn2
// * The old connection of the second input to copy properties from.
// * @param globalProps
// * The global properties of this copy.
// * @param localProps
// * The local properties of this copy.
// */
// protected MatchNode(MatchNode template, OptimizerNode pred1, OptimizerNode pred2, PactConnection conn1,
// PactConnection conn2, GlobalProperties globalProps, LocalProperties localProps) {
// super(template, pred1, pred2, conn1, conn2, globalProps, localProps);
// }
// ------------------------------------------------------------------------
......@@ -150,8 +146,6 @@ public class MatchNode extends TwoInputNode {
case MERGE: return 1;
case HYBRIDHASH_FIRST: return 1;
case HYBRIDHASH_SECOND: return 1;
case MMHASH_FIRST: return 1;
case MMHASH_SECOND: return 1;
case SORT_SELF_NESTEDLOOP: return 2;
case SELF_NESTEDLOOP: return 1;
default: return 0;
......@@ -237,7 +231,7 @@ public class MatchNode extends TwoInputNode {
}
// partition and any order
p.getGlobalProperties().setPartitioning(PartitionProperty.ANY, (FieldList)keys.clone());
p.getGlobalProperties().setPartitioning(PartitioningProperty.ANY, (FieldList)keys.clone());
Ordering ordering = new Ordering();
for (Integer index : getPactContract().getKeyColumnNumbers(inputNum)) {
......@@ -254,7 +248,7 @@ public class MatchNode extends TwoInputNode {
// partition only
p = new InterestingProperties();
p.getGlobalProperties().setPartitioning(PartitionProperty.ANY, (FieldList)keys.clone());
p.getGlobalProperties().setPartitioning(PartitioningProperty.ANY, (FieldList)keys.clone());
estimator.getHashPartitioningCost(input, p.getMaximalCosts());
InterestingProperties.mergeUnionOfInterestingProperties(target, p);
}
......@@ -353,17 +347,17 @@ public class MatchNode extends TwoInputNode {
// 1) re-partition 2 the same way as 1
// 2) re-partition 1 the same way as 2
if (gp1.getPartitioning() == PartitionProperty.HASH_PARTITIONED) {
if (gp1.getPartitioning() == PartitioningProperty.HASH_PARTITIONED) {
createLocalAlternatives(outputPlans, subPlan1, subPlan2, new ForwardSS(),
new PartitionHashSS(this.keySet2), estimator);
} else if (gp1.getPartitioning() == PartitionProperty.RANGE_PARTITIONED) {
} else if (gp1.getPartitioning() == PartitioningProperty.RANGE_PARTITIONED) {
createLocalAlternatives(outputPlans, subPlan1, subPlan2, new ForwardSS(),
new PartitionRangeSS(this.keySet2), estimator);
}
if (gp2.getPartitioning() == PartitionProperty.HASH_PARTITIONED) {
if (gp2.getPartitioning() == PartitioningProperty.HASH_PARTITIONED) {
createLocalAlternatives(outputPlans, subPlan1, subPlan2, new PartitionHashSS(this.keySet1),
new ForwardSS(), estimator);
} else if (gp2.getPartitioning() == PartitionProperty.RANGE_PARTITIONED) {
} else if (gp2.getPartitioning() == PartitioningProperty.RANGE_PARTITIONED) {
createLocalAlternatives(outputPlans, subPlan1, subPlan2, new PartitionRangeSS(this.keySet1),
new ForwardSS(), estimator);
}
......@@ -384,7 +378,7 @@ public class MatchNode extends TwoInputNode {
// we create an additional plan with a range partitioning
// if this is not already a range partitioning
if (gp1.getPartitioning() != PartitionProperty.RANGE_PARTITIONED) {
if (gp1.getPartitioning() != PartitioningProperty.RANGE_PARTITIONED) {
createLocalAlternatives(outputPlans, subPlan1, subPlan2, new PartitionRangeSS(this.keySet1),
new PartitionRangeSS(this.keySet2), estimator);
}
......@@ -394,12 +388,12 @@ public class MatchNode extends TwoInputNode {
// add two plans:
// 1) make input 2 the same partitioning as input 1
// 2) partition both inputs with a different partitioning function (hash <-> range)
if (partitioningIsOnRightFields(gp1, 0) && gp1.getPartitioning() == PartitionProperty.HASH_PARTITIONED) {
if (partitioningIsOnRightFields(gp1, 0) && gp1.getPartitioning() == PartitioningProperty.HASH_PARTITIONED) {
createLocalAlternatives(outputPlans, subPlan1, subPlan2, ss1,
new PartitionHashSS(this.keySet2), estimator);
// createLocalAlternatives(outputPlans, predList1, predList2, ShipStrategy.PARTITION_RANGE,
// ShipStrategy.PARTITION_RANGE, estimator);
} else if (partitioningIsOnRightFields(gp1, 0) && gp1.getPartitioning() == PartitionProperty.RANGE_PARTITIONED) {
} else if (partitioningIsOnRightFields(gp1, 0) && gp1.getPartitioning() == PartitioningProperty.RANGE_PARTITIONED) {
createLocalAlternatives(outputPlans, subPlan1, subPlan2, ss1,
new PartitionRangeSS(this.keySet2), estimator);
createLocalAlternatives(outputPlans, subPlan1, subPlan2, new PartitionHashSS(this.keySet1),
......@@ -415,12 +409,12 @@ public class MatchNode extends TwoInputNode {
// add two plans:
// 1) make input 1 the same partitioning as input 2
// 2) partition both inputs with a different partitioning function (hash <-> range)
if (partitioningIsOnRightFields(gp2, 1) && gp2.getPartitioning() == PartitionProperty.HASH_PARTITIONED) {
if (partitioningIsOnRightFields(gp2, 1) && gp2.getPartitioning() == PartitioningProperty.HASH_PARTITIONED) {
createLocalAlternatives(outputPlans, subPlan1, subPlan2, new PartitionHashSS(this.keySet1), ss2,
estimator);
// createLocalAlternatives(outputPlans, predList1, predList2, ShipStrategy.PARTITION_RANGE,
// ShipStrategy.PARTITION_RANGE, estimator);
} else if (partitioningIsOnRightFields(gp2, 1) && gp2.getPartitioning() == PartitionProperty.RANGE_PARTITIONED) {
} else if (partitioningIsOnRightFields(gp2, 1) && gp2.getPartitioning() == PartitioningProperty.RANGE_PARTITIONED) {
createLocalAlternatives(outputPlans, subPlan1, subPlan2, new PartitionRangeSS(this.keySet1), ss2,
estimator);
createLocalAlternatives(outputPlans, subPlan1, subPlan2, new PartitionHashSS(this.keySet1),
......@@ -465,9 +459,9 @@ public class MatchNode extends TwoInputNode {
case FORWARD:
if (partitioningIsOnRightFields(gp2, 1) && gp2.getPartitioning().isPartitioned()) {
// adapt to the partitioning
if (gp2.getPartitioning() == PartitionProperty.HASH_PARTITIONED) {
if (gp2.getPartitioning() == PartitioningProperty.HASH_PARTITIONED) {
ss1 = new PartitionHashSS(this.keySet1);
} else if (gp2.getPartitioning() == PartitionProperty.RANGE_PARTITIONED) {
} else if (gp2.getPartitioning() == PartitioningProperty.RANGE_PARTITIONED) {
ss1 = new PartitionRangeSS(this.keySet1);
} else {
throw new CompilerException();
......@@ -478,11 +472,11 @@ public class MatchNode extends TwoInputNode {
}
break;
case PARTITION_HASH:
ss1 = (partitioningIsOnSameSubkey(gp1.getPartitionedFields(), this.keySet2) && gp1.getPartitioning() == PartitionProperty.HASH_PARTITIONED) ? new ForwardSS()
ss1 = (partitioningIsOnSameSubkey(gp1.getPartitionedFields(), this.keySet2) && gp1.getPartitioning() == PartitioningProperty.HASH_PARTITIONED) ? new ForwardSS()
: new PartitionHashSS(this.keySet1);
break;
case PARTITION_RANGE:
ss1 = (partitioningIsOnRightFields(gp1, 0) && gp1.getPartitioning() == PartitionProperty.RANGE_PARTITIONED) ? new ForwardSS()
ss1 = (partitioningIsOnRightFields(gp1, 0) && gp1.getPartitioning() == PartitioningProperty.RANGE_PARTITIONED) ? new ForwardSS()
: new PartitionRangeSS(this.keySet1);
break;
default:
......@@ -510,9 +504,9 @@ public class MatchNode extends TwoInputNode {
case FORWARD:
if (partitioningIsOnRightFields(gp1, 0) && gp1.getPartitioning().isPartitioned()) {
// adapt to the partitioning
if (gp1.getPartitioning() == PartitionProperty.HASH_PARTITIONED) {
if (gp1.getPartitioning() == PartitioningProperty.HASH_PARTITIONED) {
ss2 = new PartitionHashSS(this.keySet2);
} else if (gp1.getPartitioning() == PartitionProperty.RANGE_PARTITIONED) {
} else if (gp1.getPartitioning() == PartitioningProperty.RANGE_PARTITIONED) {
ss2 = new PartitionRangeSS(this.keySet2);
} else {
throw new CompilerException();
......@@ -523,11 +517,11 @@ public class MatchNode extends TwoInputNode {
}
break;
case PARTITION_HASH:
ss2 = (partitioningIsOnSameSubkey(this.keySet1, gp2.getPartitionedFields()) && partitioningIsOnRightFields(gp2, 1) && gp2.getPartitioning() == PartitionProperty.HASH_PARTITIONED) ? new ForwardSS()
ss2 = (partitioningIsOnSameSubkey(this.keySet1, gp2.getPartitionedFields()) && partitioningIsOnRightFields(gp2, 1) && gp2.getPartitioning() == PartitioningProperty.HASH_PARTITIONED) ? new ForwardSS()
: new PartitionHashSS(this.keySet2);
break;
case PARTITION_RANGE:
ss2 = (partitioningIsOnRightFields(gp2, 1) && gp2.getPartitioning() == PartitionProperty.RANGE_PARTITIONED) ? new ForwardSS()
ss2 = (partitioningIsOnRightFields(gp2, 1) && gp2.getPartitioning() == PartitioningProperty.RANGE_PARTITIONED) ? new ForwardSS()
: new PartitionRangeSS(this.keySet2);
break;
default:
......@@ -1037,7 +1031,7 @@ public class MatchNode extends TwoInputNode {
default:
throw new CompilerException("Invalid input number "+inputNum+" for Match.");
}
if (gp.getPartitioning() == PartitionProperty.RANGE_PARTITIONED) {
if (gp.getPartitioning() == PartitioningProperty.RANGE_PARTITIONED) {
return keyFields.equals(partitionedFields);
}
......
......@@ -35,6 +35,7 @@ import eu.stratosphere.pact.common.util.FieldSet;
import eu.stratosphere.pact.compiler.CompilerException;
import eu.stratosphere.pact.compiler.DataStatistics;
import eu.stratosphere.pact.compiler.costs.CostEstimator;
import eu.stratosphere.pact.compiler.plan.candidate.PlanNode;
import eu.stratosphere.pact.compiler.util.PactType;
import eu.stratosphere.pact.runtime.task.util.TaskConfig.LocalStrategy;
......@@ -211,7 +212,7 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat
* The cost estimator used to estimate the costs of each plan alternative.
* @return A list containing all plan alternatives.
*/
public abstract List<? extends OptimizerNode> getAlternativePlans(CostEstimator estimator);
public abstract List<? extends PlanNode> getAlternativePlans(CostEstimator estimator);
/**
* This method implements the visit of a depth-first graph traversing visitor. Implementors must first
......@@ -498,21 +499,32 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat
* This method returns copies of the original interesting properties objects and
* leaves the original objects, contained by the connections, unchanged.
*/
public void computeInterestingProperties()
public void computeUnionOfInterestingPropertiesFromSuccessors()
{
List<PactConnection> conns = getOutgoingConnections();
List<InterestingProperties> props = null;
for (PactConnection conn : conns) {
List<InterestingProperties> ips = conn.getInterestingProperties();
if (ips.size() > 0) {
if (props == null) {
props = new ArrayList<InterestingProperties>();
if (conns.size() == 0) {
// no incoming, we have none ourselves
this.intProps = Collections.<InterestingProperties>emptyList();
} else if (conns.size() == 1) {
// one incoming, no need to make a union, just take them
List<InterestingProperties> ips = conns.get(0).getInterestingProperties();
this.intProps = ips.isEmpty() ?
Collections.<InterestingProperties>emptyList() :
new ArrayList<InterestingProperties>(ips);
} else {
// union them
List<InterestingProperties> props = null;
for (PactConnection conn : conns) {
List<InterestingProperties> ips = conn.getInterestingProperties();
if (ips.size() > 0) {
if (props == null) {
props = new ArrayList<InterestingProperties>();
}
InterestingProperties.mergeUnionOfInterestingProperties(props, ips);
}
InterestingProperties.mergeUnionOfInterestingProperties(props, ips);
}
this.intProps = (props == null || props.isEmpty()) ? Collections.<InterestingProperties>emptyList() : props;
}
this.intProps = (props == null || props.isEmpty()) ? Collections.<InterestingProperties>emptyList() : props;
}
/**
......@@ -714,8 +726,10 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat
{
if (this.outgoingConnections.size() == 1) {
// return our own stack of open branches, because nothing is added
if (this.openBranches == null) return null;
return new ArrayList<UnclosedBranchDescriptor>(this.openBranches);
if (this.openBranches == null)
return null;
else
return new ArrayList<UnclosedBranchDescriptor>(this.openBranches);
}
else if (this.outgoingConnections.size() > 1) {
// we branch add a branch info to the stack
......@@ -749,11 +763,12 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat
protected void removeClosedBranches(List<UnclosedBranchDescriptor> openList) {
if (openList == null || openList.isEmpty() || closedBranchingNodes == null || closedBranchingNodes.isEmpty()) return;
if (openList == null || openList.isEmpty() || this.closedBranchingNodes == null || this.closedBranchingNodes.isEmpty())
return;
Iterator<UnclosedBranchDescriptor> it = openList.iterator();
while (it.hasNext()) {
if (closedBranchingNodes.contains(it.next().getBranchingNode())) {
if (this.closedBranchingNodes.contains(it.next().getBranchingNode())) {
//this branch was already closed --> remove it from the list
it.remove();
}
......@@ -761,7 +776,8 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat
}
protected void addClosedBranches(Set<OptimizerNode> alreadyClosed) {
if (alreadyClosed == null || alreadyClosed.isEmpty()) return;
if (alreadyClosed == null || alreadyClosed.isEmpty())
return;
if (this.closedBranchingNodes == null)
this.closedBranchingNodes = new HashSet<OptimizerNode>(alreadyClosed);
else
......
......@@ -25,7 +25,7 @@ import eu.stratosphere.pact.common.util.FieldSet;
import eu.stratosphere.pact.compiler.CompilerException;
import eu.stratosphere.pact.compiler.GlobalProperties;
import eu.stratosphere.pact.compiler.LocalProperties;
import eu.stratosphere.pact.runtime.shipping.ShipStrategy.ShipStrategyType;
import eu.stratosphere.pact.runtime.shipping.ShipStrategyType;
/**
* A connection between to PACTs. Represents a channel together with a data shipping
......@@ -78,29 +78,6 @@ public class PactConnection implements EstimateProvider
this.shipStrategy = shipStrategy;
}
// /**
// * Creates a copy of the given connection, but sets as source and target the given nodes.
// * This constructor is intended as a partial-copy constructor for the enumeration of
// * plans in the optimization phase.
// *
// * @param template
// * The connection to copy the properties from.
// * @param source
// * The reference to the source node.
// * @param target
// * The reference to the target node.
// */
// public PactConnection(PactConnection template, OptimizerNode source, OptimizerNode target) {
// this.sourcePact = source;
// this.targetPact = target;
//
// this.shipStrategy = template.shipStrategy;
// this.replicationFactor = template.replicationFactor;
// this.tempMode = template.tempMode;
//
// this.interestingProps = template.interestingProps;
// }
/**
* Gets the source of the connection.
*
......@@ -294,62 +271,6 @@ public class PactConnection implements EstimateProvider
return buf.toString();
}
/**
* Gets the global properties of the source's output after it crossed a pact connection with
* the given shipping strategy.
* Global properties are maintained on <tt>FORWARD</tt> connections.
* If a partitioning happens, then a partitioning property exists afterwards.
* A <tt>BROADCAST</tt> connection destroys the key uniqueness.
* <p>
* If the shipping strategy has not yet been determined, the properties of the connections source are returned.
*
* @return The properties of the data after this channel.
*/
public static GlobalProperties getGlobalPropertiesAfterConnection(OptimizerNode source, OptimizerNode target, int targetInputNum, ShipStrategyType shipMode) {
// GlobalProperties gp = source.getGlobalPropertiesForParent(target);
//
// switch (shipMode.type()) {
// case BROADCAST:
// gp.reset();
// break;
// case PARTITION_RANGE:
// gp.setPartitioning(PartitionProperty.RANGE_PARTITIONED, ((PartitionShipStrategy)shipMode).getPartitionFields());
// gp.setOrdering(null);
// break;
// case PARTITION_HASH:
// gp.setPartitioning(PartitionProperty.HASH_PARTITIONED, ((PartitionShipStrategy)shipMode).getPartitionFields());
// gp.setOrdering(null);
// break;
// case FORWARD:
// if (source.getDegreeOfParallelism() > target.getDegreeOfParallelism()) {
// gp.setOrdering(null);
// }
//
// if (gp.getPartitioning() == PartitionProperty.NONE) {
// if (source.getUniqueFields().size() > 0) {
// FieldList partitionedFields = new FieldList();
// //TODO maintain a list of partitioned fields in global properties
// //Up to now: only add first unique fieldset
// for (Integer field : source.getUniqueFields().iterator().next()) {
// partitionedFields.add(field);
// }
// gp.setPartitioning(PartitionProperty.ANY, partitionedFields);
// }
// }
//
// // nothing else changes
// break;
// case NONE:
// throw new CompilerException(
// "Cannot determine properties after connection, id shipping strategy is not set.");
// case SFR:
// default:
throw new CompilerException("Unsupported shipping strategy: " + shipMode.name());
// }
//
// return gp;
}
/**
* Gets the local properties of the sources output after it crossed a pact connection with the given
* strategy. Local properties are only maintained on <tt>FORWARD</tt> connections.
......
......@@ -21,15 +21,14 @@ import java.util.List;
import eu.stratosphere.nephele.configuration.Configuration;
import eu.stratosphere.pact.common.contract.CompilerHints;
import eu.stratosphere.pact.common.contract.ReduceContract;
import eu.stratosphere.pact.common.util.FieldList;
import eu.stratosphere.pact.common.util.FieldSet;
import eu.stratosphere.pact.compiler.CompilerException;
import eu.stratosphere.pact.compiler.Costs;
import eu.stratosphere.pact.compiler.DataStatistics;
import eu.stratosphere.pact.compiler.GlobalProperties;
import eu.stratosphere.pact.compiler.PactCompiler;
import eu.stratosphere.pact.compiler.PartitionProperty;
import eu.stratosphere.pact.compiler.PartitioningProperty;
import eu.stratosphere.pact.compiler.costs.CostEstimator;
import eu.stratosphere.pact.compiler.plan.candidate.PlanNode;
import eu.stratosphere.pact.runtime.task.util.TaskConfig.LocalStrategy;
/**
......@@ -172,14 +171,14 @@ public class ReduceNode extends SingleInputNode {
// add the first interesting properties: partitioned and grouped
InterestingProperties ip1 = new InterestingProperties();
ip1.getGlobalProperties().setPartitioning(PartitionProperty.ANY, this.keys);
ip1.getGlobalProperties().setPartitioning(PartitioningProperty.ANY, this.keys);
ip1.getLocalProperties().setGroupedFields(this.keys);
estimator.addHashPartitioningCost(this.inConn, ip1.getMaximalCosts());
estimator.addLocalSortCost(this.inConn, -1, ip1.getMaximalCosts());
// add the second interesting properties: partitioned only
InterestingProperties ip2 = new InterestingProperties();
ip2.getGlobalProperties().setPartitioning(PartitionProperty.ANY, this.keys);
ip2.getGlobalProperties().setPartitioning(PartitioningProperty.ANY, this.keys);
estimator.addHashPartitioningCost(this.inConn, ip2.getMaximalCosts());
InterestingProperties.mergeUnionOfInterestingProperties(props, ip1);
......@@ -188,8 +187,8 @@ public class ReduceNode extends SingleInputNode {
}
@Override
protected void computeValidPlanAlternatives(List<? extends OptimizerNode> altSubPlans,
CostEstimator estimator, List<OptimizerNode> outputPlans)
protected void computeValidPlanAlternatives(List<? extends PlanNode> altSubPlans,
CostEstimator estimator, List<PlanNode> outputPlans)
{
//
// FieldSet keySet = new FieldSet(getPactContract().getKeyColumnNumbers(0));
......@@ -319,7 +318,7 @@ public class ReduceNode extends SingleInputNode {
if(this.getPredNode() != null) {
// return key count of predecessor
return this.getPredNode().getEstimatedCardinality(new FieldSet(this.keyList));
return this.getPredNode().getEstimatedCardinality(this.keys);
} else
return -1;
}
......@@ -420,15 +419,14 @@ public class ReduceNode extends SingleInputNode {
@Override
public List<FieldSet> createUniqueFieldsForNode()
{
if (this.keyList != null) {
for (int keyField : this.keyList) {
if (this.keys != null) {
for (int keyField : this.keys) {
if (!isFieldConstant(0, keyField)) {
return null;
}
}
return Collections.singletonList(new FieldSet(keyList));
return Collections.singletonList(this.keys);
}
return null;
}
}
......@@ -27,96 +27,49 @@ import eu.stratosphere.pact.common.contract.SingleInputContract;
import eu.stratosphere.pact.common.plan.Visitor;
import eu.stratosphere.pact.common.stubs.StubAnnotation.ConstantFields;
import eu.stratosphere.pact.common.stubs.StubAnnotation.ConstantFieldsExcept;
import eu.stratosphere.pact.common.type.Key;
import eu.stratosphere.pact.common.util.FieldSet;
import eu.stratosphere.pact.compiler.ColumnWithType;
import eu.stratosphere.pact.compiler.CompilerException;
import eu.stratosphere.pact.compiler.OptimizerFieldSet;
import eu.stratosphere.pact.compiler.PactCompiler;
import eu.stratosphere.pact.compiler.costs.CostEstimator;
import eu.stratosphere.pact.runtime.shipping.ShipStrategy.ShipStrategyType;
import eu.stratosphere.pact.compiler.plan.candidate.Channel;
import eu.stratosphere.pact.compiler.plan.candidate.PlanNode;
import eu.stratosphere.pact.runtime.shipping.ShipStrategyType;
import eu.stratosphere.pact.runtime.task.util.TaskConfig.LocalStrategy;
/**
* A node in the optimizer plan that represents a PACT with a single input.
* A node in the optimizer's program representation for a PACT with a single input.
*
* @author Stephan Ewen
*/
public abstract class SingleInputNode extends OptimizerNode {
// ------------- Node Connection
public abstract class SingleInputNode extends OptimizerNode
{
protected PactConnection inConn; // the input of the node
protected PactConnection inConn; // the input of the node
// ------------- Stub Annotations
protected FieldSet constantSet; // set of fields that are left unchanged by the stub
protected FieldSet notConstantSet; // set of fields that are changed by the stub
protected FieldSet constantSet; // set of fields that are left unchanged by the stub
protected FieldSet notConstantSet; // set of fields that are changed by the stub
protected final FieldSet keys; // The set of key fields
protected final OptimizerFieldSet keys; // The set of key fields
private List<PlanNode> cachedPlans;
// ------------------------------
// --------------------------------------------------------------------------------------------
/**
* Creates a new node with a single input for the optimizer plan.
*
* @param pactContract
* The PACT that the node represents.
* @param pactContract The PACT that the node represents.
*/
public SingleInputNode(SingleInputContract<?> pactContract) {
super(pactContract);
this.keys = new OptimizerFieldSet();
int[] keyPos = pactContract.getKeyColumnNumbers(0);
Class<? extends Key>[] keyTypes = pactContract.getKeyClasses();
if (keyPos != null) {
for (int i = 0; i < keyPos.length; i++) {
this.keys.add(new ColumnWithType(keyPos[i], keyTypes[i]));
}
}
this.keys = new FieldSet(pactContract.getKeyColumnNumbers(0));
}
// /**
// * Copy constructor to create a copy of a node with a different predecessor. The predecessor
// * is assumed to be of the same type and merely a copy with different strategies, as they
// * are created in the process of the plan enumeration.
// *
// * @param template
// * The node to create a copy of.
// * @param predNode
// * The new predecessor.
// * @param inConn
// * The old connection to copy properties from.
// * @param globalProps
// * The global properties of this copy.
// * @param localProps
// * The local properties of this copy.
// */
// protected SingleInputNode(SingleInputNode template, OptimizerNode predNode, PactConnection inConn,
// GlobalProperties globalProps, LocalProperties localProps) {
// super(template, globalProps, localProps);
//
// // copy annotations
// this.constantSet = template.constantSet;
//
// // copy key set
// this.keyList = template.keyList;
//
// // copy input connection
// this.inConn = new PactConnection(inConn, predNode, this);
//
// if (this.branchPlan == null) {
// this.branchPlan = predNode.branchPlan;
// } else if (predNode.branchPlan != null) {
// this.branchPlan.putAll(predNode.branchPlan);
// }
// }
/**
* Gets the <tt>PactConnection</tt> through which this node receives its input.
*
* @return The input connection.
*/
public PactConnection getInConn() {
public PactConnection getIncomingConnection() {
return this.inConn;
}
......@@ -126,7 +79,7 @@ public abstract class SingleInputNode extends OptimizerNode {
* @param conn
* The input connection to set.
*/
public void setInConn(PactConnection inConn) {
public void setIncomingConnection(PactConnection inConn) {
this.inConn = inConn;
}
......@@ -135,8 +88,8 @@ public abstract class SingleInputNode extends OptimizerNode {
*
* @return The predecessor of this node.
*/
public OptimizerNode getPredNode() {
if(this.inConn != null) {
public OptimizerNode getPredecessorNode() {
if (this.inConn != null) {
return this.inConn.getSourcePact();
} else {
return null;
......@@ -174,7 +127,7 @@ public abstract class SingleInputNode extends OptimizerNode {
}
// create the connection and add it
PactConnection conn = new PactConnection(pred, this);
this.setInConn(conn);
setIncomingConnection(conn);
pred.addOutgoingConnection(conn);
// see if an internal hint dictates the strategy to use
......@@ -209,7 +162,7 @@ public abstract class SingleInputNode extends OptimizerNode {
*
* @return The key fields of this optimizer node.
*/
public OptimizerFieldSet getKeySet() {
public FieldSet getKeySet() {
return this.keys;
}
......@@ -222,39 +175,53 @@ public abstract class SingleInputNode extends OptimizerNode {
* @see eu.stratosphere.pact.compiler.plan.OptimizerNode#getAlternativePlans()
*/
@Override
final public List<OptimizerNode> getAlternativePlans(CostEstimator estimator) {
// // check if we have a cached version
// if (this.cachedPlans != null) {
// return this.cachedPlans;
// }
//
// // calculate alternative subplans for predecessor
// List<? extends OptimizerNode> subPlans = this.getPredNode().getAlternativePlans(estimator);
final public List<PlanNode> getAlternativePlans(CostEstimator estimator) {
// check if we have a cached version
if (this.cachedPlans != null) {
return this.cachedPlans;
}
List<OptimizerNode> outputPlans = new ArrayList<OptimizerNode>();
//
// computeValidPlanAlternatives(subPlans, estimator, outputPlans);
//
// // prune the plans
// prunePlanAlternatives(outputPlans);
//
// // cache the result only if we have multiple outputs --> this function gets invoked multiple times
// if (this.getOutConns() != null && this.getOutConns().size() > 1) {
// this.cachedPlans = outputPlans;
// }
// calculate alternative subplans for predecessor
List<? extends PlanNode> subPlans = getPredecessorNode().getAlternativePlans(estimator);
List<Channel> candidates = new ArrayList<Channel>(subPlans.size());
List<InterestingProperties> ips = this.inConn.getInterestingProperties();
for (PlanNode p : subPlans) {
if (ips.isEmpty()) {
// create a simple forwarding channel
Channel c = new Channel(p);
c.setShipStrategy(ShipStrategyType.FORWARD);
c.setLocalStrategy(LocalStrategy.NONE);
candidates.add(c);
} else {
for (InterestingProperties ip : ips) {
// create a channel that realizes the properties
candidates.add(ip.createChannelRealizingProperties(p));
}
}
}
List<PlanNode> outputPlans = new ArrayList<PlanNode>();
createPlanAlternatives(candidates, outputPlans);
// prune the plans
prunePlanAlternatives(outputPlans);
// cache the result only if we have multiple outputs --> this function gets invoked multiple times
if (isBranching()) {
this.cachedPlans = outputPlans;
}
return outputPlans;
}
/**
* Takes a list with all subplans and produces alternative plans for the current node
* Takes a list with all sub-plans and produces alternative plans for the current node.
*
* @param altSubPlans Alternative subplans
* @param estimator Cost estimator to be used
* @param outputPlans The generated output plans
* @param inputs The different input alternatives for the current node.
* @param outputPlans The generated output plan candidates.
*/
protected abstract void computeValidPlanAlternatives(List<? extends OptimizerNode> altSubPlans,
CostEstimator estimator, List<OptimizerNode> outputPlans);
protected abstract void createPlanAlternatives(List<Channel> inputs, List<PlanNode> outputPlans);
// --------------------------------------------------------------------------------------------
// Branch Handling
......@@ -270,17 +237,17 @@ public abstract class SingleInputNode extends OptimizerNode {
return;
}
addClosedBranches(this.getPredNode().closedBranchingNodes);
addClosedBranches(getPredecessorNode().closedBranchingNodes);
List<UnclosedBranchDescriptor> result = new ArrayList<UnclosedBranchDescriptor>();
// TODO: check if merge of lists is really necessary
result = mergeLists(result, this.getPredNode().getBranchesForParent(this));
result = mergeLists(result, getPredecessorNode().getBranchesForParent(this));
this.openBranches = result;
}
// --------------------------------------------------------------------------------------------
// Stub Annotation Handling
// Estimates Computation
// --------------------------------------------------------------------------------------------
/**
......@@ -301,8 +268,8 @@ public abstract class SingleInputNode extends OptimizerNode {
final long numRecords = computeNumberOfStubCalls();
long outputSize = 0;
if (this.getPredNode() != null) {
outputSize = this.getPredNode().estimatedOutputSize;
if (getPredecessorNode() != null) {
outputSize = getPredecessorNode().estimatedOutputSize;
}
// compute width only if we have information
......@@ -368,8 +335,8 @@ public abstract class SingleInputNode extends OptimizerNode {
@Override
public void accept(Visitor<OptimizerNode> visitor) {
if (visitor.preVisit(this)) {
if (this.getPredNode() != null) {
this.getPredNode().accept(visitor);
if (getPredecessorNode() != null) {
getPredecessorNode().accept(visitor);
} else {
throw new CompilerException();
}
......
......@@ -32,7 +32,7 @@ import eu.stratosphere.pact.common.util.FieldSet;
import eu.stratosphere.pact.compiler.DataStatistics;
import eu.stratosphere.pact.compiler.GlobalProperties;
import eu.stratosphere.pact.compiler.LocalProperties;
import eu.stratosphere.pact.compiler.PartitionProperty;
import eu.stratosphere.pact.compiler.PartitioningProperty;
import eu.stratosphere.pact.compiler.costs.CostEstimator;
import eu.stratosphere.pact.runtime.shipping.ShipStrategy.ForwardSS;
import eu.stratosphere.pact.runtime.task.util.TaskConfig.LocalStrategy;
......@@ -232,10 +232,10 @@ public class UnionNode extends OptimizerNode {
// only property which would survive is a hash partitioning on every input
GlobalProperties gpForInput = alternative.getGlobalPropertiesForParent(this);
if (index == 0 && gpForInput.getPartitioning() == PartitionProperty.HASH_PARTITIONED) {
if (index == 0 && gpForInput.getPartitioning() == PartitioningProperty.HASH_PARTITIONED) {
newPartitionedFieldsInCommon = gpForInput.getPartitionedFields();
}
else if (gpForInput.getPartitioning() != PartitionProperty.HASH_PARTITIONED
else if (gpForInput.getPartitioning() != PartitioningProperty.HASH_PARTITIONED
|| gpForInput.getPartitionedFields().equals(partitionedFieldsInCommon) == false) {
newPartitionedFieldsInCommon = null;
}
......@@ -250,7 +250,7 @@ public class UnionNode extends OptimizerNode {
GlobalProperties gp = new GlobalProperties();
if (newPartitionedFieldsInCommon != null) {
gp.setPartitioning(PartitionProperty.HASH_PARTITIONED, newPartitionedFieldsInCommon);
gp.setPartitioning(PartitioningProperty.HASH_PARTITIONED, newPartitionedFieldsInCommon);
}
UnionNode unionNode = new UnionNode(this, subplanStack, gp, new LocalProperties());
unionNode.branchPlan = newBranchPlan;
......
......@@ -15,12 +15,21 @@
package eu.stratosphere.pact.compiler.plan.candidate;
import eu.stratosphere.pact.compiler.plan.EstimateProvider;
import eu.stratosphere.pact.runtime.shipping.ShipStrategy.ShipStrategyType;
import java.util.HashMap;
import java.util.Map;
import eu.stratosphere.pact.common.contract.Order;
import eu.stratosphere.pact.common.contract.Ordering;
import eu.stratosphere.pact.common.util.FieldList;
import eu.stratosphere.pact.common.util.FieldSet;
import eu.stratosphere.pact.compiler.CompilerException;
import eu.stratosphere.pact.compiler.GlobalProperties;
import eu.stratosphere.pact.compiler.LocalProperties;
import eu.stratosphere.pact.compiler.plan.EstimateProvider;
import eu.stratosphere.pact.runtime.shipping.ShipStrategyType;
import eu.stratosphere.pact.runtime.task.util.TaskConfig.LocalStrategy;
/**
*
*
* @author Stephan Ewen
*/
......@@ -37,11 +46,25 @@ public class Channel implements EstimateProvider
// --------------------------------------------------------------------------------------------
private final PlanNode source = null;
private PlanNode source;
private PlanNode target;
private ShipStrategyType shipStrategy = ShipStrategyType.NONE;
private LocalStrategy localStrategy = LocalStrategy.NONE;
private FieldList shipKeys;
private final PlanNode target = null;
private FieldList localKeys;
private ShipStrategyType shipStrategy;
private boolean[] shipSortOrder;
private boolean[] localSortOrder;
private GlobalProperties globalProps;
private LocalProperties localProps;
private TempMode tempMode;
......@@ -49,6 +72,12 @@ public class Channel implements EstimateProvider
// --------------------------------------------------------------------------------------------
public Channel(PlanNode sourceNode) {
this.source = sourceNode;
}
// --------------------------------------------------------------------------------------------
/**
* Gets the source of this Channel.
*
......@@ -67,10 +96,43 @@ public class Channel implements EstimateProvider
return this.target;
}
public void setShipStrategy(ShipStrategyType strategy) {
setShipStrategy(strategy, null, null);
}
public void setShipStrategy(ShipStrategyType strategy, FieldList keys) {
setShipStrategy(strategy, keys, null);
}
public void setShipStrategy(ShipStrategyType strategy, FieldList keys, boolean[] sortDirection) {
this.shipStrategy = strategy;
this.shipKeys = keys;
this.shipSortOrder = sortDirection;
}
public void setLocalStrategy(LocalStrategy strategy) {
setLocalStrategy(strategy, null, null);
}
public void setLocalStrategy(LocalStrategy strategy, FieldList keys) {
setLocalStrategy(strategy, keys, null);
}
public void setLocalStrategy(LocalStrategy strategy, FieldList keys, boolean[] sortDirection) {
this.localStrategy = strategy;
this.localKeys = keys;
this.localSortOrder = sortDirection;
}
public ShipStrategyType getShipStrategy() {
return this.shipStrategy;
}
public FieldList getShipStrategyKeys() {
return this.shipKeys;
}
/**
* Returns the TempMode of the Connection. NONE if the connection is not temped,
* TEMP_SENDER_SIDE if the connection is temped on the sender node, and
......@@ -100,4 +162,119 @@ public class Channel implements EstimateProvider
public int getReplicationFactor() {
return this.replicationFactor;
}
// --------------------------------------------------------------------------------------------
/* (non-Javadoc)
* @see eu.stratosphere.pact.compiler.plan.EstimateProvider#getEstimatedOutputSize()
*/
@Override
public long getEstimatedOutputSize() {
return this.source.template.getEstimatedOutputSize() * this.replicationFactor;
}
/* (non-Javadoc)
* @see eu.stratosphere.pact.compiler.plan.EstimateProvider#getEstimatedNumRecords()
*/
@Override
public long getEstimatedNumRecords() {
return this.source.template.getEstimatedNumRecords() * this.replicationFactor;
}
/* (non-Javadoc)
* @see eu.stratosphere.pact.compiler.plan.EstimateProvider#getEstimatedCardinalities()
*/
@Override
public Map<FieldSet, Long> getEstimatedCardinalities() {
final Map<FieldSet, Long> m = this.source.template.getEstimatedCardinalities();
final Map<FieldSet, Long> res = new HashMap<FieldSet, Long>();
for (Map.Entry<FieldSet, Long> entry : m.entrySet()) {
res.put(entry.getKey(), entry.getValue() * this.replicationFactor);
}
return res;
}
/* (non-Javadoc)
* @see eu.stratosphere.pact.compiler.plan.EstimateProvider#getEstimatedCardinality(eu.stratosphere.pact.common.util.FieldSet)
*/
@Override
public long getEstimatedCardinality(FieldSet cP) {
return this.source.template.getEstimatedCardinality(cP) * this.replicationFactor;
}
// --------------------------------------------------------------------------------------------
public GlobalProperties getGlobalProperties() {
if (this.globalProps == null) {
this.globalProps = this.source.getGlobalProperties().clone();
switch (this.shipStrategy) {
case BROADCAST:
this.globalProps.setFullyReplicated();
break;
case PARTITION_HASH:
case PARTITION_LOCAL_HASH:
this.globalProps.setHashPartitioned(this.shipKeys);
break;
case PARTITION_RANGE:
case PARTITION_LOCAL_RANGE:
Ordering o = new Ordering();
for (int i = 0; i < this.shipKeys.size(); i++) {
o.appendOrdering(this.shipKeys.get(i), null, this.shipSortOrder == null || this.shipSortOrder[i] ? Order.ASCENDING : Order.DESCENDING);
}
this.globalProps.setRangePartitioned(o);
break;
case FORWARD:
break;
case NONE:
throw new CompilerException("Cannot produce GlobalProperties before ship strategy is set.");
}
}
return this.globalProps;
}
public LocalProperties getLocalProperties() {
if (this.localProps == null) {
this.localProps = getLocalPropertiesAfterShippingOnly().clone();
switch (this.localStrategy) {
case NONE:
break;
case SORT:
Ordering o = new Ordering();
for (int i = 0; i < this.localKeys.size(); i++) {
o.appendOrdering(this.localKeys.get(i), null, this.localSortOrder == null || this.localSortOrder[i] ? Order.ASCENDING : Order.DESCENDING);
}
this.localProps.setOrdering(o);
this.localProps.setGroupedFields(o.getInvolvedIndexes());
break;
default:
throw new CompilerException("Unsupported local strategy for channel.");
}
}
return this.localProps;
}
public LocalProperties getLocalPropertiesAfterShippingOnly() {
if (this.shipStrategy == ShipStrategyType.FORWARD) {
return this.source.getLocalProperties();
} else {
final LocalProperties props = this.source.getLocalProperties().clone();
switch (this.shipStrategy) {
case BROADCAST:
case PARTITION_HASH:
case PARTITION_RANGE:
case PARTITION_LOCAL_HASH:
case PARTITION_LOCAL_RANGE:
props.reset();
break;
case NONE:
throw new CompilerException("ShipStrategy has not yet been set.");
default:
throw new CompilerException();
}
return props;
}
}
}
......@@ -15,55 +15,69 @@
package eu.stratosphere.pact.compiler.plan.candidate;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import eu.stratosphere.pact.common.plan.Visitable;
import eu.stratosphere.pact.compiler.CompilerException;
import eu.stratosphere.pact.common.util.FieldSet;
import eu.stratosphere.pact.compiler.Costs;
import eu.stratosphere.pact.compiler.GlobalProperties;
import eu.stratosphere.pact.compiler.LocalProperties;
import eu.stratosphere.pact.compiler.PactCompiler;
import eu.stratosphere.pact.compiler.plan.OptimizerNode;
import eu.stratosphere.pact.runtime.task.util.TaskConfig.LocalStrategy;
/**
*
* @author Stephan Ewen
*/
public abstract class PlanNode implements Visitable<PlanNode>
{
protected final OptimizerNode template = null;
protected final OptimizerNode template;
protected final LocalProperties localProps; // local properties of the data produced by this node
protected final GlobalProperties globalProps; // global properties of the data produced by this node
private final LocalStrategy localStrategy; // The local strategy (sorting / hashing, ...)
private final LocalStrategy localStrategy; // The local strategy (sorting / hashing, ...)
protected LocalProperties localProps; // local properties of the data produced by this node
protected GlobalProperties globalProps; // global properties of the data produced by this node
protected Map<OptimizerNode, PlanNode> branchPlan; // the actual plan alternative chosen at a branch point
protected PlanNode lastJoinedBranchNode; // the node with latest branch (node with multiple outputs)
// that both children share and that is at least partially joined
protected PlanNode lastJoinedBranchNode; // the node with latest branch (node with multiple outputs)
// that both children share and that is at least partially joined
private Costs nodeCosts; // the costs incurred by this node
private Costs nodeCosts; // the costs incurred by this node
private Costs cumulativeCosts; // the cumulative costs of all operators in the sub-tree of this node
private Costs cumulativeCosts; // the cumulative costs of all operators in the sub-tree
private int memoryPerTask; // the amount of memory dedicated to each task, in MiBytes
private int memoryPerTask; // the amount of memory dedicated to each task, in MiBytes
protected boolean pFlag; // flag for the internal pruning algorithm
protected boolean pFlag; // flag for the internal pruning algorithm
// --------------------------------------------------------------------------------------------
public PlanNode(OptimizerNode template) {
this(template, template.getLocalStrategy());
}
public PlanNode(OptimizerNode template, LocalStrategy strategy) {
this(template, strategy, new LocalProperties(), new GlobalProperties());
}
public PlanNode(OptimizerNode template, LocalStrategy strategy, LocalProperties localProps, GlobalProperties globalProps)
{
this.template = template;
this.localStrategy = strategy == null ? LocalStrategy.NONE : strategy;
this.localProps = localProps;
this.globalProps = globalProps;
}
// --------------------------------------------------------------------------------------------
// Accessors
......@@ -155,7 +169,7 @@ public abstract class PlanNode implements Visitable<PlanNode>
}
public int getDegreeOfParallelism() {
return 1;
return this.template.getDegreeOfParallelism();
}
public long getTotalAvailableMemory() {
......@@ -163,7 +177,7 @@ public abstract class PlanNode implements Visitable<PlanNode>
}
// --------------------------------------------------------------------------------------------
//
// Input and Predecessors
// --------------------------------------------------------------------------------------------
public abstract Iterator<Channel> getInputs();
......@@ -174,188 +188,205 @@ public abstract class PlanNode implements Visitable<PlanNode>
// Branching and Pruning
// --------------------------------------------------------------------------------------------
/**
* Checks whether to candidate plans for the sub-plan of this node are comparable. The two
* alternative plans are comparable, if
* a) There is no branch in the sub-plan of this node
* b) Both candidates have the same candidate as the child at the last open branch.
*
* @param subPlan1
* @param subPlan2
* @return
*/
protected boolean areBranchCompatible(PlanNode subPlan1, PlanNode subPlan2)
{
if (subPlan1 == null || subPlan2 == null)
throw new CompilerException("SubPlans may not be null.");
// if there is no open branch, the children are always compatible.
// in most plans, that will be the dominant case
if (this.lastJoinedBranchNode == null) {
return true;
}
final PlanNode nodeToCompare = subPlan1.branchPlan.get(this.lastJoinedBranchNode);
return nodeToCompare == subPlan2.branchPlan.get(this.lastJoinedBranchNode);
}
// /**
// * Checks whether to candidate plans for the sub-plan of this node are comparable. The two
// * alternative plans are comparable, if
// * a) There is no branch in the sub-plan of this node
// * b) Both candidates have the same candidate as the child at the last open branch.
// *
// * @param subPlan1
// * @param subPlan2
// * @return
// */
// protected boolean areBranchCompatible(PlanNode subPlan1, PlanNode subPlan2)
// {
// if (subPlan1 == null || subPlan2 == null)
// throw new CompilerException("SubPlans may not be null.");
//
// // if there is no open branch, the children are always compatible.
// // in most plans, that will be the dominant case
// if (this.lastJoinedBranchNode == null) {
// return true;
// }
//
// final PlanNode nodeToCompare = subPlan1.branchPlan.get(this.lastJoinedBranchNode);
// return nodeToCompare == subPlan2.branchPlan.get(this.lastJoinedBranchNode);
// }
//
// /**
// * Takes the given list of plans that are candidates for this node in the final plan and retains for each distinct
// * set of interesting properties only the cheapest plan.
// *
// * @param plans
// * The plans to prune.
// */
// public <T extends OptimizerNode> void prunePlanAlternatives(List<T> plans) {
// // shortcut for the case that there is only one plan
// if (plans.size() == 1) {
// return;
// }
//
// // if we have unjoined branches, split the list of plans such that only those
// // with the same candidates at the branch points are compared
// // otherwise, we may end up with the case that no compatible plans are found at
// // nodes that join
// if (this.openBranches == null) {
// prunePlansWithCommonBranchAlternatives(plans);
// } else {
// // TODO brute force still
// List<T> result = new ArrayList<T>();
// List<T> turn = new ArrayList<T>();
//
// while (!plans.isEmpty()) {
// turn.clear();
// T determiner = plans.remove(plans.size() - 1);
// turn.add(determiner);
//
// for (int k = plans.size() - 1; k >= 0; k--) {
// boolean equal = true;
// T toCheck = plans.get(k);
//
// for (int b = 0; b < this.openBranches.size(); b++) {
// OptimizerNode brancher = this.openBranches.get(b).branchingNode;
// OptimizerNode cand1 = determiner.branchPlan.get(brancher);
// OptimizerNode cand2 = toCheck.branchPlan.get(brancher);
// if (cand1 != cand2) {
// equal = false;
// break;
// }
// }
//
// if (equal) {
// turn.add(plans.remove(k));
// }
// }
//
// // now that we have only plans with the same branch alternatives, prune!
// if (turn.size() > 1) {
// prunePlansWithCommonBranchAlternatives(turn);
// }
// result.addAll(turn);
// }
//
// // after all turns are complete
// plans.clear();
// plans.addAll(result);
// }
// }
//
// private final <T extends OptimizerNode> void prunePlansWithCommonBranchAlternatives(List<T> plans) {
// List<List<T>> toKeep = new ArrayList<List<T>>(this.intProps.size()); // for each interesting property, which plans
// // are cheapest
// for (int i = 0; i < this.intProps.size(); i++) {
// toKeep.add(null);
// }
//
// T cheapest = null; // the overall cheapest plan
//
// // go over all plans from the list
// for (T candidate : plans) {
// // check if that plan is the overall cheapest
// if (cheapest == null || (cheapest.getCumulativeCosts().compareTo(candidate.getCumulativeCosts()) > 0)) {
// cheapest = candidate;
// }
//
// // find the interesting properties that this plan matches
// for (int i = 0; i < this.intProps.size(); i++) {
// if (this.intProps.get(i).isMetBy(candidate)) {
// // the candidate meets them
// if (toKeep.get(i) == null) {
// // first one to meet the interesting properties, so store it
// List<T> l = new ArrayList<T>(2);
// l.add(candidate);
// toKeep.set(i, l);
// } else {
// // others met that one before
// // see if that one is more expensive and not more general than
// // one of the others. If so, drop it.
// List<T> l = toKeep.get(i);
// boolean met = false;
// boolean replaced = false;
//
// for (int k = 0; k < l.size(); k++) {
// T other = l.get(k);
//
// // check if the candidate is both cheaper and at least as general
// if (other.getGlobalProperties().isMetBy(candidate.getGlobalProperties())
// && other.getLocalProperties().isMetBy(candidate.getLocalProperties())
// && other.getCumulativeCosts().compareTo(candidate.getCumulativeCosts()) > 0) {
// // replace that one with the candidate
// l.set(k, replaced ? null : candidate);
// replaced = true;
// met = true;
// } else {
// // check if the previous plan is more general and not more expensive than the candidate
// met |= (candidate.getGlobalProperties().isMetBy(other.getGlobalProperties())
// && candidate.getLocalProperties().isMetBy(other.getLocalProperties()) && candidate
// .getCumulativeCosts().compareTo(other.getCumulativeCosts()) >= 0);
// }
// }
//
// if (!met) {
// l.add(candidate);
// }
// }
// }
// }
// }
//
// // all plans are set now
// plans.clear();
//
// // add the cheapest plan
// if (cheapest != null) {
// plans.add(cheapest);
// cheapest.pFlag = true; // remember that that plan is in the set
// }
//
// Costs cheapestCosts = cheapest.cumulativeCosts;
//
// // add all others, which are optimal for some interesting properties
// for (int i = 0; i < toKeep.size(); i++) {
// List<T> l = toKeep.get(i);
//
// if (l != null) {
// Costs maxDelta = this.intProps.get(i).getMaximalCosts();
//
// for (T plan : l) {
// if (plan != null && !plan.pFlag) {
// plan.pFlag = true;
//
// // check, if that plan is not more than the delta above the costs of the
// if (!cheapestCosts.isOtherMoreThanDeltaAbove(plan.getCumulativeCosts(), maxDelta)) {
// plans.add(plan);
// }
// }
// }
// }
// }
//
// // reset the flags
// for (T p : plans) {
// p.pFlag = false;
// }
// }
/**
* Takes the given list of plans that are candidates for this node in the final plan and retains for each distinct
* set of interesting properties only the cheapest plan.
*
* @param plans
* The plans to prune.
*/
public <T extends OptimizerNode> void prunePlanAlternatives(List<T> plans) {
// shortcut for the case that there is only one plan
if (plans.size() == 1) {
return;
}
// if we have unjoined branches, split the list of plans such that only those
// with the same candidates at the branch points are compared
// otherwise, we may end up with the case that no compatible plans are found at
// nodes that join
if (this.openBranches == null) {
prunePlansWithCommonBranchAlternatives(plans);
} else {
// TODO brute force still
List<T> result = new ArrayList<T>();
List<T> turn = new ArrayList<T>();
while (!plans.isEmpty()) {
turn.clear();
T determiner = plans.remove(plans.size() - 1);
turn.add(determiner);
for (int k = plans.size() - 1; k >= 0; k--) {
boolean equal = true;
T toCheck = plans.get(k);
for (int b = 0; b < this.openBranches.size(); b++) {
OptimizerNode brancher = this.openBranches.get(b).branchingNode;
OptimizerNode cand1 = determiner.branchPlan.get(brancher);
OptimizerNode cand2 = toCheck.branchPlan.get(brancher);
if (cand1 != cand2) {
equal = false;
break;
}
}
if (equal) {
turn.add(plans.remove(k));
}
}
// now that we have only plans with the same branch alternatives, prune!
if (turn.size() > 1) {
prunePlansWithCommonBranchAlternatives(turn);
}
result.addAll(turn);
}
// after all turns are complete
plans.clear();
plans.addAll(result);
}
}
// --------------------------------------------------------------------------------------------
// Miscellaneous
// --------------------------------------------------------------------------------------------
private final <T extends OptimizerNode> void prunePlansWithCommonBranchAlternatives(List<T> plans) {
List<List<T>> toKeep = new ArrayList<List<T>>(this.intProps.size()); // for each interesting property, which plans
// are cheapest
for (int i = 0; i < this.intProps.size(); i++) {
toKeep.add(null);
}
T cheapest = null; // the overall cheapest plan
// go over all plans from the list
for (T candidate : plans) {
// check if that plan is the overall cheapest
if (cheapest == null || (cheapest.getCumulativeCosts().compareTo(candidate.getCumulativeCosts()) > 0)) {
cheapest = candidate;
}
// find the interesting properties that this plan matches
for (int i = 0; i < this.intProps.size(); i++) {
if (this.intProps.get(i).isMetBy(candidate)) {
// the candidate meets them
if (toKeep.get(i) == null) {
// first one to meet the interesting properties, so store it
List<T> l = new ArrayList<T>(2);
l.add(candidate);
toKeep.set(i, l);
} else {
// others met that one before
// see if that one is more expensive and not more general than
// one of the others. If so, drop it.
List<T> l = toKeep.get(i);
boolean met = false;
boolean replaced = false;
for (int k = 0; k < l.size(); k++) {
T other = l.get(k);
// check if the candidate is both cheaper and at least as general
if (other.getGlobalProperties().isMetBy(candidate.getGlobalProperties())
&& other.getLocalProperties().isMetBy(candidate.getLocalProperties())
&& other.getCumulativeCosts().compareTo(candidate.getCumulativeCosts()) > 0) {
// replace that one with the candidate
l.set(k, replaced ? null : candidate);
replaced = true;
met = true;
} else {
// check if the previous plan is more general and not more expensive than the candidate
met |= (candidate.getGlobalProperties().isMetBy(other.getGlobalProperties())
&& candidate.getLocalProperties().isMetBy(other.getLocalProperties()) && candidate
.getCumulativeCosts().compareTo(other.getCumulativeCosts()) >= 0);
}
}
if (!met) {
l.add(candidate);
}
}
}
}
}
// all plans are set now
plans.clear();
// add the cheapest plan
if (cheapest != null) {
plans.add(cheapest);
cheapest.pFlag = true; // remember that that plan is in the set
}
Costs cheapestCosts = cheapest.cumulativeCosts;
// add all others, which are optimal for some interesting properties
for (int i = 0; i < toKeep.size(); i++) {
List<T> l = toKeep.get(i);
if (l != null) {
Costs maxDelta = this.intProps.get(i).getMaximalCosts();
for (T plan : l) {
if (plan != null && !plan.pFlag) {
plan.pFlag = true;
// check, if that plan is not more than the delta above the costs of the
if (!cheapestCosts.isOtherMoreThanDeltaAbove(plan.getCumulativeCosts(), maxDelta)) {
plans.add(plan);
}
}
}
}
public void updatePropertiesWithUniqueSets(Set<FieldSet> uniqueFieldCombinations)
{
if (uniqueFieldCombinations.isEmpty()) {
return;
} else if (uniqueFieldCombinations.size() > 1) {
PactCompiler.LOG.warn("Node has multiple unique field combinations. " +
"The compiler can currently exploit only the first one as a hint.");
}
// reset the flags
for (T p : plans) {
p.pFlag = false;
final FieldSet unique = uniqueFieldCombinations.iterator().next();
if (this.localProps.getUniqueFields() == null) {
this.localProps.setUniqueFields(unique);
}
}
// --------------------------------------------------------------------------------------------
}
......@@ -21,6 +21,8 @@ import java.util.NoSuchElementException;
import eu.stratosphere.pact.common.plan.Visitor;
import eu.stratosphere.pact.compiler.CompilerException;
import eu.stratosphere.pact.compiler.Costs;
import eu.stratosphere.pact.compiler.plan.OptimizerNode;
import eu.stratosphere.pact.runtime.task.util.TaskConfig.LocalStrategy;
/**
......@@ -28,13 +30,16 @@ import eu.stratosphere.pact.compiler.Costs;
*
* @author Stephan Ewen
*/
public abstract class SingleInputPlanNode extends PlanNode
public class SingleInputPlanNode extends PlanNode
{
protected final Channel input;
// --------------------------------------------------------------------------------------------
protected SingleInputPlanNode(Channel input) {
public SingleInputPlanNode(OptimizerNode template, Channel input, LocalStrategy localStrategy)
{
super(template, localStrategy);
this.input = input;
}
......@@ -93,4 +98,30 @@ public abstract class SingleInputPlanNode extends PlanNode
}
};
}
/* (non-Javadoc)
* @see eu.stratosphere.pact.compiler.plan.candidate.PlanNode#getInputs()
*/
@Override
public Iterator<Channel> getInputs() {
return new Iterator<Channel>() {
private boolean hasLeft = true;
@Override
public boolean hasNext() {
return this.hasLeft;
}
@Override
public Channel next() {
if (this.hasLeft) {
this.hasLeft = false;
return SingleInputPlanNode.this.input;
} else
throw new NoSuchElementException();
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
}
......@@ -13,76 +13,56 @@
*
**********************************************************************************************************************/
package eu.stratosphere.pact.compiler;
package eu.stratosphere.pact.compiler.plan.candidate;
import eu.stratosphere.pact.common.type.Key;
import java.util.Collections;
import java.util.Iterator;
import eu.stratosphere.pact.common.plan.Visitor;
import eu.stratosphere.pact.compiler.plan.OptimizerNode;
import eu.stratosphere.pact.runtime.task.util.TaskConfig.LocalStrategy;
/**
*
* Plan candidate node for data flow sources that have no input and no special strategies.
*
* @author Stephan Ewen
*/
public class ColumnWithType
public class SourcePlanNode extends PlanNode
{
private final Class<? extends Key> columnType;
private final int columnIndex;
public ColumnWithType(int columnIndex, Class<? extends Key> columnType) {
this.columnType = columnType;
this.columnIndex = columnIndex;
}
/**
* Gets the columnType from this column.
*
* @return The column type.
*/
public Class<? extends Key> getColumnType() {
return columnType;
}
/**
* Gets the column index from this column.
*
* @return The column index.
* Constructs a new source candidate node that uses <i>NONE</i> as its local strategy.
*
* @param template The template optimizer node that this candidate is created for.
*/
public int getColumnIndex() {
return columnIndex;
public SourcePlanNode(OptimizerNode template) {
super(template, LocalStrategy.NONE);
}
// --------------------------------------------------------------------------------------------
/* (non-Javadoc)
* @see java.lang.Object#hashCode()
* @see eu.stratosphere.pact.common.plan.Visitable#accept(eu.stratosphere.pact.common.plan.Visitor)
*/
@Override
public int hashCode() {
return this.columnIndex;
public void accept(Visitor<PlanNode> visitor) {
if (visitor.preVisit(this)) {
visitor.postVisit(this);
}
}
/* (non-Javadoc)
* @see java.lang.Object#equals(java.lang.Object)
* @see eu.stratosphere.pact.compiler.plan.candidate.PlanNode#getPredecessors()
*/
@Override
public boolean equals(Object obj)
{
if (obj != null && obj instanceof ColumnWithType) {
final ColumnWithType cwt = (ColumnWithType) obj;
if (this.columnIndex == cwt.columnIndex) {
if (this.columnType != cwt.columnType) {
// DEBUG ONLY!!! Should never happen, if used properly.
throw new RuntimeException("Comparing columns with equal position but incompatible type.");
}
return true;
}
}
return false;
public Iterator<PlanNode> getPredecessors() {
return Collections.<PlanNode>emptyList().iterator();
}
/* (non-Javadoc)
* @see java.lang.Object#toString()
* @see eu.stratosphere.pact.compiler.plan.candidate.PlanNode#getInputs()
*/
@Override
public String toString() {
return "Column " + this.columnIndex + " (" + this.columnType.getName() + ')';
public Iterator<Channel> getInputs() {
return Collections.<Channel>emptyList().iterator();
}
}
\ No newline at end of file
}
......@@ -18,7 +18,7 @@ package eu.stratosphere.pact.runtime.shipping;
import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.pact.common.generic.types.TypeComparator;
import eu.stratosphere.pact.runtime.plugable.SerializationDelegate;
import eu.stratosphere.pact.runtime.shipping.ShipStrategy.ShipStrategyType;
import eu.stratosphere.pact.runtime.shipping.ShipStrategyType;
/**
* @author Erik Nijkamp
......
......@@ -20,7 +20,7 @@ import eu.stratosphere.pact.common.contract.DataDistribution;
import eu.stratosphere.pact.common.type.Key;
import eu.stratosphere.pact.common.type.PactRecord;
import eu.stratosphere.pact.runtime.plugable.PactRecordComparator;
import eu.stratosphere.pact.runtime.shipping.ShipStrategy.ShipStrategyType;
import eu.stratosphere.pact.runtime.shipping.ShipStrategyType;
/**
* @author Erik Nijkamp
......
......@@ -15,106 +15,19 @@
package eu.stratosphere.pact.runtime.shipping;
import eu.stratosphere.pact.common.util.FieldList;
/**
* Enumeration defining the different shipping types of the output, such as local forward, re-partitioning by hash,
* or re-partitioning by range.
*
* @author Stephan Ewen
*/
public abstract class ShipStrategy {
public enum ShipStrategyType {
FORWARD,
PARTITION_HASH,
PARTITION_LOCAL_HASH,
PARTITION_RANGE,
PARTITION_LOCAL_RANGE,
BROADCAST,
SFR,
NONE
}
private ShipStrategyType type;
private ShipStrategy(ShipStrategyType type) {
this.type = type;
}
public ShipStrategyType type() {
return this.type;
}
public String name() {
return this.type.name();
}
// ------------------ SHIP STRATEGIES ---------------
public static class ForwardSS extends ShipStrategy {
public ForwardSS() { super(ShipStrategyType.FORWARD); }
}
public static class BroadcastSS extends ShipStrategy {
public BroadcastSS() { super(ShipStrategyType.BROADCAST); }
}
public static class SFRSS extends ShipStrategy {
public SFRSS() { super(ShipStrategyType.SFR); }
}
public static class NoneSS extends ShipStrategy {
public NoneSS() { super(ShipStrategyType.NONE); }
}
public static abstract class PartitionShipStrategy extends ShipStrategy {
private FieldList partitionFields;
private PartitionShipStrategy(ShipStrategyType type, FieldList partitionFields) {
super(type);
this.partitionFields = partitionFields;
}
public FieldList getPartitionFields() {
return this.partitionFields;
}
}
public static class PartitionHashSS extends PartitionShipStrategy {
public PartitionHashSS(FieldList partitionFields) {
super(ShipStrategyType.PARTITION_HASH, partitionFields);
}
}
public static class PartitionLocalHashSS extends PartitionShipStrategy {
public PartitionLocalHashSS(FieldList partitionFields) {
super(ShipStrategyType.PARTITION_LOCAL_HASH, partitionFields);
}
}
public static class PartitionRangeSS extends PartitionShipStrategy {
public PartitionRangeSS(FieldList partitionFields) {
super(ShipStrategyType.PARTITION_RANGE, partitionFields);
}
}
public static class PartitionLocalRangeSS extends PartitionShipStrategy {
public PartitionLocalRangeSS(FieldList partitionFields) {
super(ShipStrategyType.PARTITION_LOCAL_RANGE, partitionFields);
}
}
}
\ No newline at end of file
public enum ShipStrategyType
{
NONE,
FORWARD,
PARTITION_HASH,
PARTITION_LOCAL_HASH,
PARTITION_RANGE,
PARTITION_LOCAL_RANGE,
BROADCAST,
}
\ No newline at end of file
......@@ -58,7 +58,7 @@ import eu.stratosphere.pact.runtime.shipping.OutputCollector;
import eu.stratosphere.pact.runtime.shipping.OutputEmitter;
import eu.stratosphere.pact.runtime.shipping.PactRecordOutputCollector;
import eu.stratosphere.pact.runtime.shipping.PactRecordOutputEmitter;
import eu.stratosphere.pact.runtime.shipping.ShipStrategy.ShipStrategyType;
import eu.stratosphere.pact.runtime.shipping.ShipStrategyType;
import eu.stratosphere.pact.runtime.task.chaining.ChainedDriver;
import eu.stratosphere.pact.runtime.task.chaining.ExceptionInChainedStubException;
import eu.stratosphere.pact.runtime.task.util.NepheleReaderIterator;
......
......@@ -32,7 +32,7 @@ import eu.stratosphere.pact.common.generic.types.TypePairComparatorFactory;
import eu.stratosphere.pact.common.generic.types.TypeSerializerFactory;
import eu.stratosphere.pact.common.stubs.Stub;
import eu.stratosphere.pact.common.util.InstantiationUtil;
import eu.stratosphere.pact.runtime.shipping.ShipStrategy.ShipStrategyType;
import eu.stratosphere.pact.runtime.shipping.ShipStrategyType;
import eu.stratosphere.pact.runtime.task.PactDriver;
import eu.stratosphere.pact.runtime.task.chaining.ChainedDriver;
......
......@@ -40,7 +40,7 @@ import eu.stratosphere.pact.common.type.base.PactInteger;
import eu.stratosphere.pact.common.type.base.PactString;
import eu.stratosphere.pact.runtime.plugable.PactRecordComparator;
import eu.stratosphere.pact.runtime.shipping.PactRecordOutputEmitter;
import eu.stratosphere.pact.runtime.shipping.ShipStrategy.ShipStrategyType;
import eu.stratosphere.pact.runtime.shipping.ShipStrategyType;
public class OutputEmitterTest extends TestCase
{
......
......@@ -32,7 +32,7 @@ import eu.stratosphere.pact.common.io.FileOutputFormat;
import eu.stratosphere.pact.common.stubs.Stub;
import eu.stratosphere.pact.common.type.PactRecord;
import eu.stratosphere.pact.common.util.MutableObjectIterator;
import eu.stratosphere.pact.runtime.shipping.ShipStrategy.ShipStrategyType;
import eu.stratosphere.pact.runtime.shipping.ShipStrategyType;
import eu.stratosphere.pact.runtime.task.DataSinkTask;
import eu.stratosphere.pact.runtime.task.DataSourceTask;
import eu.stratosphere.pact.runtime.task.PactDriver;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册