diff --git a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/dataproperties/GlobalProperties.java b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/dataproperties/GlobalProperties.java index 93191220c498c3ac28800a690f06de87b1ffae33..3a1e53700ce5fc44255d1a6939e6ce14c5fd4582 100644 --- a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/dataproperties/GlobalProperties.java +++ b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/dataproperties/GlobalProperties.java @@ -23,7 +23,11 @@ import eu.stratosphere.pact.common.contract.Order; import eu.stratosphere.pact.common.contract.Ordering; import eu.stratosphere.pact.common.util.FieldList; import eu.stratosphere.pact.common.util.FieldSet; +import eu.stratosphere.pact.compiler.CompilerException; import eu.stratosphere.pact.compiler.plan.OptimizerNode; +import eu.stratosphere.pact.compiler.plan.candidate.Channel; +import eu.stratosphere.pact.compiler.util.Utils; +import eu.stratosphere.pact.runtime.shipping.ShipStrategyType; /** * This class represents global properties of the data at a certain point in the plan. @@ -233,6 +237,26 @@ public class GlobalProperties implements Cloneable } return this; } + + public void parameterizeChannel(Channel channel, boolean globalDopChange) { + switch (this.partitioning) { + case RANDOM: + channel.setShipStrategy(globalDopChange ? ShipStrategyType.PARTITION_RANDOM : ShipStrategyType.FORWARD); + break; + case FULL_REPLICATION: + channel.setShipStrategy(ShipStrategyType.BROADCAST); + break; + case ANY_PARTITIONING: + case HASH_PARTITIONED: + channel.setShipStrategy(ShipStrategyType.PARTITION_HASH, Utils.createOrderedFromSet(this.partitioningFields)); + break; + case RANGE_PARTITIONED: + channel.setShipStrategy(ShipStrategyType.PARTITION_RANGE, this.ordering.getInvolvedIndexes(), this.ordering.getFieldSortDirections()); + break; + default: + throw new CompilerException(); + } + } // ------------------------------------------------------------------------ diff --git a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/dataproperties/RequestedLocalProperties.java b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/dataproperties/RequestedLocalProperties.java index a5e64914b08bfc942cee87787350a343b52d56d7..3ed07bedaa2cb9b803e38c7e37dc239a10f3174a 100644 --- a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/dataproperties/RequestedLocalProperties.java +++ b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/dataproperties/RequestedLocalProperties.java @@ -21,9 +21,6 @@ import eu.stratosphere.pact.common.contract.Ordering; import eu.stratosphere.pact.common.util.FieldList; import eu.stratosphere.pact.common.util.FieldSet; import eu.stratosphere.pact.compiler.CompilerException; -import eu.stratosphere.pact.compiler.costs.CostEstimator; -import eu.stratosphere.pact.compiler.costs.Costs; -import eu.stratosphere.pact.compiler.plan.EstimateProvider; import eu.stratosphere.pact.compiler.plan.OptimizerNode; import eu.stratosphere.pact.compiler.plan.candidate.Channel; import eu.stratosphere.pact.compiler.util.Utils; @@ -33,11 +30,15 @@ import eu.stratosphere.pact.runtime.task.util.LocalStrategy; * This class represents local properties of the data. A local property is a property that exists * within the data of a single partition. */ -public final class RequestedLocalProperties implements Cloneable -{ - private Ordering ordering; // order inside a partition, null if not ordered +public class RequestedLocalProperties implements Cloneable { - private FieldSet groupedFields; // fields by which the stream is grouped. null if not grouped. + public static final RequestedLocalProperties DEFAULT_PROPERTIES = null; + + // -------------------------------------------------------------------------------------------- + + Ordering ordering; // order inside a partition, null if not ordered + + FieldSet groupedFields; // fields by which the stream is grouped. null if not grouped. // -------------------------------------------------------------------------------------------- @@ -140,8 +141,7 @@ public final class RequestedLocalProperties implements Cloneable * * @return True, if the resulting properties are non trivial. */ - public RequestedLocalProperties filterByNodesConstantSet(OptimizerNode node, int input) - { + public RequestedLocalProperties filterByNodesConstantSet(OptimizerNode node, int input) { if (this.ordering != null) { final FieldList involvedIndexes = this.ordering.getInvolvedIndexes(); for (int i = 0; i < involvedIndexes.size(); i++) { @@ -168,8 +168,7 @@ public final class RequestedLocalProperties implements Cloneable * The properties for which to check whether they meet these properties. * @return True, if the properties are met, false otherwise. */ - public boolean isMetBy(LocalProperties other) - { + public boolean isMetBy(LocalProperties other) { if (this.ordering != null) { // we demand an ordering return other.getOrdering() != null && this.ordering.isMetBy(other.getOrdering()); @@ -200,15 +199,7 @@ public final class RequestedLocalProperties implements Cloneable } } - public void addMinimalRequiredCosts(Costs to, CostEstimator estimator, EstimateProvider estimate, long memory) { - if (this.ordering != null) { - estimator.addLocalSortCost(estimate, memory, to); - } else if (this.groupedFields != null) { - estimator.addLocalSortCost(estimate, memory, to); - } - } - - // ------------------------------------------------------------------------ + // -------------------------------------------------------------------------------------------- /** * @param requested1 @@ -246,12 +237,8 @@ public final class RequestedLocalProperties implements Cloneable } } - // ------------------------------------------------------------------------ + // -------------------------------------------------------------------------------------------- - /* - * (non-Javadoc) - * @see java.lang.Object#hashCode() - */ @Override public int hashCode() { final int prime = 31; @@ -261,10 +248,6 @@ public final class RequestedLocalProperties implements Cloneable return result; } - /* - * (non-Javadoc) - * @see java.lang.Object#equals(java.lang.Object) - */ @Override public boolean equals(Object obj) { if (obj instanceof RequestedLocalProperties) { @@ -276,19 +259,11 @@ public final class RequestedLocalProperties implements Cloneable } } - /* - * (non-Javadoc) - * @see java.lang.Object#toString() - */ @Override public String toString() { return "Requested Local Properties [ordering=" + this.ordering + ", grouped=" + this.groupedFields + "]"; } - /* - * (non-Javadoc) - * @see java.lang.Object#clone() - */ @Override public RequestedLocalProperties clone() { return new RequestedLocalProperties(this.ordering, this.groupedFields); diff --git a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/operators/BinaryUnionOpDescriptor.java b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/operators/BinaryUnionOpDescriptor.java index 00ac32bce90069b7cf5769811f177a0f24321efc..b437484c779f93b0fd92ede46ca1ff4e16cfdba8 100644 --- a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/operators/BinaryUnionOpDescriptor.java +++ b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/operators/BinaryUnionOpDescriptor.java @@ -33,29 +33,33 @@ import eu.stratosphere.pact.runtime.task.DriverStrategy; /** * */ -public class BinaryUnionOpDescriptor extends OperatorDescriptorDual -{ - /* (non-Javadoc) - * @see eu.stratosphere.pact.compiler.dataproperties.DriverProperties#getStrategy() - */ +public class BinaryUnionOpDescriptor extends OperatorDescriptorDual { + + private final RequestedGlobalProperties props; + + public BinaryUnionOpDescriptor(RequestedGlobalProperties props) { + super(); + if (props == null) { + throw new NullPointerException(); + } + this.props = props; + } + @Override public DriverStrategy getStrategy() { - return DriverStrategy.NONE; + return DriverStrategy.UNION; } - /* (non-Javadoc) - * @see eu.stratosphere.pact.compiler.operators.OperatorDescriptorDual#createPossibleGlobalProperties() - */ @Override - protected List createPossibleGlobalProperties() { - // all properties are possible - return Collections.singletonList(new GlobalPropertiesPair( - new RequestedGlobalProperties(), new RequestedGlobalProperties())); + public List getPossibleGlobalProperties() { + return Collections.singletonList(new GlobalPropertiesPair(this.props, this.props)); } - /* (non-Javadoc) - * @see eu.stratosphere.pact.compiler.dataproperties.DriverPropertiesDual#createPossibleLocalProperties() - */ + @Override + protected List createPossibleGlobalProperties() { + return Collections.emptyList(); + } + @Override protected List createPossibleLocalProperties() { // all properties are possible @@ -63,17 +67,11 @@ public class BinaryUnionOpDescriptor extends OperatorDescriptorDual new RequestedLocalProperties(), new RequestedLocalProperties())); } - /* (non-Javadoc) - * @see eu.stratosphere.pact.compiler.dataproperties.DriverPropertiesDual#instantiate(eu.stratosphere.pact.compiler.plan.candidate.Channel, eu.stratosphere.pact.compiler.plan.candidate.Channel, eu.stratosphere.pact.compiler.plan.TwoInputNode) - */ @Override public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) { return new BinaryUnionPlanNode((BinaryUnionNode) node, in1, in2); } - /* (non-Javadoc) - * @see eu.stratosphere.pact.compiler.operators.OperatorDescriptorDual#computeGlobalProperties(eu.stratosphere.pact.compiler.dataproperties.GlobalProperties, eu.stratosphere.pact.compiler.dataproperties.GlobalProperties) - */ @Override public GlobalProperties computeGlobalProperties(GlobalProperties in1, GlobalProperties in2) { GlobalProperties newProps = new GlobalProperties(); @@ -88,9 +86,6 @@ public class BinaryUnionOpDescriptor extends OperatorDescriptorDual return newProps; } - /* (non-Javadoc) - * @see eu.stratosphere.pact.compiler.operators.OperatorDescriptorDual#computeLocalProperties(eu.stratosphere.pact.compiler.dataproperties.LocalProperties, eu.stratosphere.pact.compiler.dataproperties.LocalProperties) - */ @Override public LocalProperties computeLocalProperties(LocalProperties in1, LocalProperties in2) { // all local properties are destroyed diff --git a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/BinaryUnionNode.java b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/BinaryUnionNode.java index bff315b975cc47285069450d1407852471de70df..6a217f840cc1f459e1a2ed3bb22e0ec869b97714 100644 --- a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/BinaryUnionNode.java +++ b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/BinaryUnionNode.java @@ -15,17 +15,22 @@ package eu.stratosphere.pact.compiler.plan; -import java.util.Collections; -import java.util.HashSet; +import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import eu.stratosphere.pact.common.util.FieldSet; +import eu.stratosphere.pact.compiler.CompilerException; import eu.stratosphere.pact.compiler.DataStatistics; +import eu.stratosphere.pact.compiler.costs.CostEstimator; +import eu.stratosphere.pact.compiler.dataproperties.GlobalProperties; +import eu.stratosphere.pact.compiler.dataproperties.InterestingProperties; +import eu.stratosphere.pact.compiler.dataproperties.RequestedGlobalProperties; +import eu.stratosphere.pact.compiler.dataproperties.RequestedLocalProperties; import eu.stratosphere.pact.compiler.operators.BinaryUnionOpDescriptor; import eu.stratosphere.pact.compiler.operators.OperatorDescriptorDual; +import eu.stratosphere.pact.compiler.operators.OperatorDescriptorDual.LocalPropertiesPair; +import eu.stratosphere.pact.compiler.plan.candidate.Channel; +import eu.stratosphere.pact.compiler.plan.candidate.PlanNode; import eu.stratosphere.pact.generic.contract.Contract; import eu.stratosphere.pact.generic.contract.DualInputContract; import eu.stratosphere.pact.generic.stub.AbstractStub; @@ -37,7 +42,6 @@ import eu.stratosphere.pact.runtime.shipping.ShipStrategyType; * input. */ public class BinaryUnionNode extends TwoInputNode { - public BinaryUnionNode(OptimizerNode pred1, OptimizerNode pred2) { super(new UnionPlaceholderContract()); @@ -58,140 +62,136 @@ public class BinaryUnionNode extends TwoInputNode { } } - /* (non-Javadoc) - * @see eu.stratosphere.pact.compiler.plan.OptimizerNode#getName() - */ @Override public String getName() { return "Union"; } - - /* (non-Javadoc) - * @see eu.stratosphere.pact.compiler.plan.OptimizerNode#setInputs(java.util.Map) - */ + @Override public void setInputs(Map contractToNode) { throw new UnsupportedOperationException(); } - /* (non-Javadoc) - * @see eu.stratosphere.pact.compiler.plan.TwoInputNode#getPossibleProperties() - */ @Override protected List getPossibleProperties() { - return Collections.singletonList(new BinaryUnionOpDescriptor()); + return new ArrayList(); } + @Override public void computeUnionOfInterestingPropertiesFromSuccessors() { super.computeUnionOfInterestingPropertiesFromSuccessors(); // clear all local properties, as they are destroyed anyways getInterestingProperties().getLocalProperties().clear(); } - /* (non-Javadoc) - * @see eu.stratosphere.pact.compiler.plan.OptimizerNode#readStubAnnotations() - */ - @Override - protected void readStubAnnotations() { - //DO NOTHING - //do not read annotations for union nodes, as this node is artificially generated - } - - /* (non-Javadoc) - * @see eu.stratosphere.pact.compiler.plan.OptimizerNode#readConstantAnnotation() - */ - @Override - protected void readConstantAnnotation() { - //DO NOTHING - } - - /* (non-Javadoc) - * @see eu.stratosphere.pact.compiler.plan.OptimizerNode#isFieldConstant(int, int) - */ @Override - public boolean isFieldConstant(int input, int fieldNumber) { - return true; + public void computeInterestingPropertiesForInputs(CostEstimator estimator) { + final InterestingProperties props = getInterestingProperties(); + + // if no other properties exist, add the pruned trivials back + if (props.getGlobalProperties().isEmpty()) { + props.addGlobalProperties(new RequestedGlobalProperties()); + } + props.addLocalProperties(new RequestedLocalProperties()); + this.input1.setInterestingProperties(props.clone()); + this.input2.setInterestingProperties(props.clone()); + + // make these interesting properties the only allowed properties for the candidates + this.possibleProperties.clear(); + for (RequestedGlobalProperties gprops : props.getGlobalProperties()) { + this.possibleProperties.add(new BinaryUnionOpDescriptor(gprops)); + } } - /* - * (non-Javadoc) - * @see eu.stratosphere.pact.compiler.plan.OptimizerNode#computeOutputEstimates(eu.stratosphere.pact.compiler.DataStatistics) - */ @Override - public void computeOutputEstimates(DataStatistics statistics) { - - - this.estimatedNumRecords = 0; - this.estimatedOutputSize = 0; - - // init estimated cardinalities with the fields of the first input - // remove the field which are unknown for other inputs later on - for (FieldSet fieldSet : getIncomingConnections().get(0).getSource().getEstimatedCardinalities().keySet()) { - this.estimatedCardinality.put(fieldSet, -1L); + protected void addLocalCandidates(Channel c1, Channel c2, RequestedGlobalProperties rgps1, + RequestedGlobalProperties rgps2, List target, LocalPropertiesPair[] validLocalCombinations, + CostEstimator estimator) + { + if (!(rgps1.equals(rgps2))) { + return; } + // get the global properties and clear unique fields (not preserved anyways during the union) + GlobalProperties p1 = c1.getGlobalProperties(); + GlobalProperties p2 = c2.getGlobalProperties(); + p1.clearUniqueFieldCombinations(); + p2.clearUniqueFieldCombinations(); - for (PactConnection inConn : getIncomingConnections()) { - - OptimizerNode inputPact = inConn.getSource(); - - // sum up estimatedNumRecords for inputs - long estimatedNumRecordForInput = inputPact.estimatedNumRecords; - - if (estimatedNumRecordForInput != -1 && this.estimatedNumRecords != -1) { - this.estimatedNumRecords += estimatedNumRecordForInput; - } - else { - this.estimatedNumRecords = -1; - } - - // sum up estimatedOutputSize for inputs - long estimatedOutputSizeForInput = inputPact.estimatedOutputSize; - - if (estimatedOutputSizeForInput != -1 && this.estimatedOutputSize != -1) { - this.estimatedOutputSize += estimatedOutputSizeForInput; - } - else { - this.estimatedOutputSize = -1; - } + // adjust the partitionings, if they exist but are not equal. this may happen when both channels have a + // partitioning that fulfills the requirements, but both are incompatible. For example may a property requirement + // be ANY_PARTITIONING on fields (0) and one channel is range partitioned on that field, the other is hash + // partitioned on that field. + if (!rgps1.isTrivial() && !(p1.equals(p2))) { + final int dop = getDegreeOfParallelism(); + final int subPerInstance = getSubtasksPerInstance(); + final int numInstances = dop / subPerInstance + (dop % subPerInstance == 0 ? 0 : 1); + final int inDop1 = getFirstPredecessorNode().getDegreeOfParallelism(); + final int inSubPerInstance1 = getFirstPredecessorNode().getSubtasksPerInstance(); + final int inNumInstances1 = inDop1 / inSubPerInstance1 + (inDop1 % inSubPerInstance1 == 0 ? 0 : 1); + final int inDop2 = getSecondPredecessorNode().getDegreeOfParallelism(); + final int inSubPerInstance2 = getSecondPredecessorNode().getSubtasksPerInstance(); + final int inNumInstances2 = inDop2 / inSubPerInstance2 + (inDop2 % inSubPerInstance2 == 0 ? 0 : 1); + final boolean globalDopChange1 = numInstances != inNumInstances1; + final boolean globalDopChange2 = numInstances != inNumInstances2; - //sum up cardinalities or remove them if they are unknown - Set
toRemove = new HashSet
(); - for (Entry cardinality : this.estimatedCardinality.entrySet()) { - long inputCard = inputPact.getEstimatedCardinality(cardinality.getKey()); - if (inputCard == -1) { - toRemove.add(cardinality.getKey()); - } - else { - //to be conservative for joins we use the max for new column cardinality - inputCard = Math.max(inputCard, cardinality.getValue()); - cardinality.setValue(inputCard); + if (c1.getShipStrategy() == ShipStrategyType.FORWARD && c2.getShipStrategy() != ShipStrategyType.FORWARD) { + // adjust c2 to c1 + c2 = c2.clone(); + p1.parameterizeChannel(c2,globalDopChange2); + } else if (c2.getShipStrategy() == ShipStrategyType.FORWARD && c1.getShipStrategy() != ShipStrategyType.FORWARD) { + // adjust c1 to c2 + c1 = c1.clone(); + p2.parameterizeChannel(c1,globalDopChange1); + } else if (c1.getShipStrategy() == ShipStrategyType.FORWARD && c2.getShipStrategy() == ShipStrategyType.FORWARD) { + boolean adjustC1 = c1.getEstimatedOutputSize() <= 0 || c2.getEstimatedOutputSize() <= 0 || + c1.getEstimatedOutputSize() <= c2.getEstimatedOutputSize(); + if (adjustC1) { + c2 = c2.clone(); + p1.parameterizeChannel(c2, globalDopChange2); + } else { + c1 = c1.clone(); + p2.parameterizeChannel(c1, globalDopChange1); } + } else { + // this should never happen, as it implies both realize a different strategy, which is + // excluded by the check that the required strategies must match + throw new CompilerException("Bug in Plan Enumeration for Union Node."); } - - this.estimatedCardinality.keySet().removeAll(toRemove); - } + super.addLocalCandidates(c1, c2, rgps1, rgps2, target, validLocalCombinations, estimator); + } + + @Override + protected void readStubAnnotations() {} + + @Override + protected void readConstantAnnotation() {} + + @Override + public boolean isFieldConstant(int input, int fieldNumber) { + return true; + } + + @Override + public void computeOutputEstimates(DataStatistics statistics) { + OptimizerNode in1 = getFirstPredecessorNode(); + OptimizerNode in2 = getSecondPredecessorNode(); + this.estimatedNumRecords = in1.estimatedNumRecords > 0 && in2.estimatedNumRecords > 0 ? + in1.estimatedNumRecords + in2.estimatedNumRecords : -1; + this.estimatedOutputSize = in1.estimatedOutputSize > 0 && in2.estimatedOutputSize > 0 ? + in1.estimatedOutputSize + in2.estimatedOutputSize : -1; } - /* - * (non-Javadoc) - * @see eu.stratosphere.pact.compiler.plan.OptimizerNode#computeNumberOfStubCalls() - */ @Override protected long computeNumberOfStubCalls() { return this.estimatedNumRecords; } - - /* - * (non-Javadoc) - * @see eu.stratosphere.pact.compiler.plan.OptimizerNode#computeNumberOfStubCalls() - */ @Override protected double computeAverageRecordWidth() { if (this.estimatedNumRecords == -1 || this.estimatedOutputSize == -1) return -1; @@ -212,8 +212,7 @@ public class BinaryUnionNode extends TwoInputNode { private static final class MockStub extends AbstractStub {} - private static final class UnionPlaceholderContract extends DualInputContract - { + private static final class UnionPlaceholderContract extends DualInputContract { private UnionPlaceholderContract() { super(MockStub.class, "UnionPlaceholderContract"); } diff --git a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/TwoInputNode.java b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/TwoInputNode.java index 1e96309dd7d57e731a20888f14123490b02f33d2..b37ffd3eaa21add832264877fe0177a9878a1a2a 100644 --- a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/TwoInputNode.java +++ b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/TwoInputNode.java @@ -328,7 +328,7 @@ public abstract class TwoInputNode extends OptimizerNode { * @see eu.stratosphere.pact.compiler.plan.OptimizerNode#getAlternativePlans() */ @Override - final public List getAlternativePlans(CostEstimator estimator) { + public List getAlternativePlans(CostEstimator estimator) { // check if we have a cached version if (this.cachedPlans != null) { return this.cachedPlans; diff --git a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/CustomCompensatableDotProductCoGroup.java b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/CustomCompensatableDotProductCoGroup.java index 498390e163b9759fbb49f5a738becf2bed64b145..b1fe7d5a06f1960cd806369e9882ff73689a6a02 100644 --- a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/CustomCompensatableDotProductCoGroup.java +++ b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/CustomCompensatableDotProductCoGroup.java @@ -42,7 +42,7 @@ public class CustomCompensatableDotProductCoGroup extends AbstractStub implement @Override public void open(Configuration parameters) throws Exception { - workerIndex = ConfigUtils.asInteger("pact.parallel.task.id", parameters); + workerIndex = getRuntimeContext().getIndexOfThisSubtask(); currentIteration = ConfigUtils.asInteger("pact.iterations.currentIteration", parameters); failingIteration = ConfigUtils.asInteger("compensation.failingIteration", parameters); failingWorkers = ConfigUtils.asIntSet("compensation.failingWorker", parameters); diff --git a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/CustomCompensatableDotProductMatch.java b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/CustomCompensatableDotProductMatch.java index e7d022e32710c2a0b99415949b9d82455fb56f0a..07c8e707f6d47a285aa8ec056142e87302df38df 100644 --- a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/CustomCompensatableDotProductMatch.java +++ b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/CustomCompensatableDotProductMatch.java @@ -40,7 +40,7 @@ public class CustomCompensatableDotProductMatch extends AbstractStub implements @Override public void open(Configuration parameters) throws Exception { - int workerIndex = ConfigUtils.asInteger("pact.parallel.task.id", parameters); + int workerIndex = getRuntimeContext().getIndexOfThisSubtask(); int currentIteration = ConfigUtils.asInteger("pact.iterations.currentIteration", parameters); int failingIteration = ConfigUtils.asInteger("compensation.failingIteration", parameters); Set failingWorkers = ConfigUtils.asIntSet("compensation.failingWorker", parameters); diff --git a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/CustomCompensatingMap.java b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/CustomCompensatingMap.java index ccf8a772dcb4827c33ac3163ded877a269441f14..e199d5cbeab06b94965489ccc37f7ccd25d1b6a5 100644 --- a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/CustomCompensatingMap.java +++ b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/CustomCompensatingMap.java @@ -29,7 +29,7 @@ public class CustomCompensatingMap extends AbstractStub implements GenericMapper int failingIteration = ConfigUtils.asInteger("compensation.failingIteration", parameters); isFailureIteration = currentIteration == failingIteration + 1; - int workerIndex = ConfigUtils.asInteger("pact.parallel.task.id", parameters); + int workerIndex = getRuntimeContext().getIndexOfThisSubtask(); Set failingWorkers = ConfigUtils.asIntSet("compensation.failingWorker", parameters); isFailingWorker = failingWorkers.contains(workerIndex); diff --git a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/task/AbstractIterativePactTask.java b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/task/AbstractIterativePactTask.java index de6f8c3f4e7a77db918444b26a75407760fc2c02..4c2e0b02e0ad17c37914ed02ccb0503254c32c70 100644 --- a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/task/AbstractIterativePactTask.java +++ b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/task/AbstractIterativePactTask.java @@ -67,6 +67,7 @@ public abstract class AbstractIterativePactTask extends Regu // if the class is null, the driver has no user code if (userCodeFunctionType != null && (this.stub == null || REINSTANTIATE_STUB_PER_ITERATION)) { this.stub = initStub(userCodeFunctionType); + this.stub.setRuntimeContext(getRuntimeContext()); } } catch (Exception e) { throw new RuntimeException("Initializing the user code and the configuration failed" + diff --git a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/task/DriverStrategy.java b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/task/DriverStrategy.java index d5c618c815baeede92eaa36568fcf74e2372200f..d8e87d9f427afa47735cb326fed2d09aa9db3d7e 100644 --- a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/task/DriverStrategy.java +++ b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/task/DriverStrategy.java @@ -53,7 +53,9 @@ public enum DriverStrategy // the second input is inner loop, the first input is outer loop and stream-processed NESTEDLOOP_STREAMED_OUTER_FIRST(CrossDriver.class, null, PIPELINED, MATERIALIZING, false), // the first input is inner loop, the second input is outer loop and stream-processed - NESTEDLOOP_STREAMED_OUTER_SECOND(CrossDriver.class, null, MATERIALIZING, PIPELINED, false); + NESTEDLOOP_STREAMED_OUTER_SECOND(CrossDriver.class, null, MATERIALIZING, PIPELINED, false), + // union utility op + UNION(null, null, FULL_DAM, FULL_DAM, false); // -------------------------------------------------------------------------------------------- diff --git a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/task/RegularPactTask.java b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/task/RegularPactTask.java index d79e2bd8937e3487a633e058521bb9256e373f7e..3bd919328f66c02cf05e771c05486307f1cdf526 100644 --- a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/task/RegularPactTask.java +++ b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/task/RegularPactTask.java @@ -261,11 +261,7 @@ public class RegularPactTask extends AbstractTask implements LOG.info(formatLogString("Start PACT code.")); // sanity check the input setup - final int numInputs = this.config.getNumInputs(); - if (numInputs != this.driver.getNumberOfInputs()) { - throw new Exception("Inconsistent config data: Number of inputs inconsistent with the driver requirements."); - } - + final int numInputs = this.driver.getNumberOfInputs(); // whatever happens in this scope, make sure that the local strategies are cleaned up! // note that the initialization of the local strategies is in the try-finally block as well, // so that the thread that creates them catches its own errors that may happen in that process. @@ -501,14 +497,17 @@ public class RegularPactTask extends AbstractTask implements final int numInputs = this.driver.getNumberOfInputs(); final MutableReader[] inputReaders = new MutableReader[numInputs]; + int numGates = 0; + for (int i = 0; i < numInputs; i++) { // ---------------- create the input readers --------------------- // in case where a logical input unions multiple physical inputs, create a union reader final int groupSize = this.config.getGroupSize(i); - if (groupSize < 2) { + numGates += groupSize; + if (groupSize == 1) { // non-union case inputReaders[i] = new MutableRecordReader(this); - } else { + } else if (groupSize > 1){ // union case @SuppressWarnings("unchecked") MutableRecordReader[] readers = new MutableRecordReader[groupSize]; @@ -516,9 +515,16 @@ public class RegularPactTask extends AbstractTask implements readers[j] = new MutableRecordReader(this); } inputReaders[i] = new MutableUnionRecordReader(readers); + } else { + throw new Exception("Illegal input group size in task configuration: " + groupSize); } } this.inputReaders = inputReaders; + + // final sanity check + if (numGates != this.config.getNumInputs()) { + throw new Exception("Illegal configuration: Number of input gates and group sizes are not consistent."); + } } /** @@ -755,13 +761,12 @@ public class RegularPactTask extends AbstractTask implements if (serializer.getClass() == PactRecordSerializer.class) { // pact record specific deserialization @SuppressWarnings("unchecked") - MutableRecordReader reader = (MutableRecordReader) inputReader; + MutableReader reader = (MutableReader) inputReader; return new PactRecordNepheleReaderIterator(reader, readerInterruptionBehavior(inputIndex)); } else { // generic data type serialization @SuppressWarnings("unchecked") - MutableRecordReader> reader = - (MutableRecordReader>) inputReader; + MutableReader> reader = (MutableReader>) inputReader; @SuppressWarnings({ "unchecked", "rawtypes" }) final MutableObjectIterator iter = new NepheleReaderIterator(reader, serializer, readerInterruptionBehavior(inputIndex)); diff --git a/pact/pact-tests/src/test/java/eu/stratosphere/pact/test/cancelling/CancellingTestBase.java b/pact/pact-tests/src/test/java/eu/stratosphere/pact/test/cancelling/CancellingTestBase.java index 6d8c859c942a3d3c470d65efff0dbefb3faacc73..d35a7e90597ee055b0f547930b65ba36e17afb3c 100644 --- a/pact/pact-tests/src/test/java/eu/stratosphere/pact/test/cancelling/CancellingTestBase.java +++ b/pact/pact-tests/src/test/java/eu/stratosphere/pact/test/cancelling/CancellingTestBase.java @@ -190,6 +190,9 @@ public abstract class CancellingTestBase { case CANCELED: exitLoop = true; break; + case SCHEDULED: // okay + case RUNNING: + break; default: throw new Exception("Bug: Unrecognized Job Status."); } diff --git a/pact/pact-tests/src/test/java/eu/stratosphere/pact/test/contracts/CoGroupITCase.java b/pact/pact-tests/src/test/java/eu/stratosphere/pact/test/contracts/CoGroupITCase.java index 73b9b2f1b032a569ce330a45a4659d602d98f0c8..a2cb043029cafbdaf613fdc9dc0bbae5365f7d9c 100644 --- a/pact/pact-tests/src/test/java/eu/stratosphere/pact/test/contracts/CoGroupITCase.java +++ b/pact/pact-tests/src/test/java/eu/stratosphere/pact/test/contracts/CoGroupITCase.java @@ -50,9 +50,7 @@ import eu.stratosphere.pact.test.util.TestBase; * @author Fabian Hueske */ @RunWith(Parameterized.class) -public class CoGroupITCase extends TestBase - -{ +public class CoGroupITCase extends TestBase { private static final Log LOG = LogFactory.getLog(CoGroupITCase.class); public CoGroupITCase(String clusterConfig, Configuration testConfig) { diff --git a/pact/pact-tests/src/test/java/eu/stratosphere/pact/test/contracts/UnionITCase.java b/pact/pact-tests/src/test/java/eu/stratosphere/pact/test/contracts/UnionITCase.java index 9466e29b021fc523169a2e6c87c769fec682d68d..2068daca37aa0b5fb866a70adcc145b7426fdc17 100644 --- a/pact/pact-tests/src/test/java/eu/stratosphere/pact/test/contracts/UnionITCase.java +++ b/pact/pact-tests/src/test/java/eu/stratosphere/pact/test/contracts/UnionITCase.java @@ -46,13 +46,8 @@ import eu.stratosphere.pact.test.contracts.io.ContractITCaseIOFormats.ContractIT import eu.stratosphere.pact.test.contracts.io.ContractITCaseIOFormats.ContractITCaseOutputFormat; import eu.stratosphere.pact.test.util.TestBase; -/** - * @author Matthias Ringwald - */ @RunWith(Parameterized.class) -public class UnionITCase extends TestBase - -{ +public class UnionITCase extends TestBase { private static final Log LOG = LogFactory.getLog(UnionITCase.class); public UnionITCase(String clusterConfig, Configuration testConfig) { @@ -107,7 +102,8 @@ public class UnionITCase extends TestBase keyString = record.getField(0, keyString); valueString = record.getField(1, valueString); - LOG.debug("Processed: [" + keyString.toString() + "," + valueString.getValue() + "]"); + if (LOG.isDebugEnabled()) + LOG.debug("Processed: [" + keyString.toString() + "," + valueString.getValue() + "]"); if (Integer.parseInt(keyString.toString()) + Integer.parseInt(valueString.toString()) < 10) { diff --git a/pact/pact-tests/src/test/java/eu/stratosphere/pact/test/pactPrograms/MergeOnlyJoinITCase.java b/pact/pact-tests/src/test/java/eu/stratosphere/pact/test/pactPrograms/MergeOnlyJoinITCase.java index 698193badacc5efff728d2a0ca44cdf0d30674ee..6393334c2d82bdddcb27a284ad56f0f966033c94 100644 --- a/pact/pact-tests/src/test/java/eu/stratosphere/pact/test/pactPrograms/MergeOnlyJoinITCase.java +++ b/pact/pact-tests/src/test/java/eu/stratosphere/pact/test/pactPrograms/MergeOnlyJoinITCase.java @@ -15,8 +15,8 @@ package eu.stratosphere.pact.test.pactPrograms; +import java.util.ArrayList; import java.util.Collection; -import java.util.LinkedList; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -118,10 +118,8 @@ public class MergeOnlyJoinITCase extends TestBase { @Override protected void postSubmit() throws Exception { - // Test results compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath); - } @Override @@ -135,8 +133,7 @@ public class MergeOnlyJoinITCase extends TestBase { @Parameters public static Collection getConfigurations() { - - LinkedList tConfigs = new LinkedList(); + ArrayList tConfigs = new ArrayList(); Configuration config = new Configuration(); config.setInteger("MergeOnlyJoinTest#NoSubtasks", 3); @@ -157,7 +154,6 @@ public class MergeOnlyJoinITCase extends TestBase { } private String[] splitInputString(String inputString, char splitChar, int noSplits) { - String splitString = inputString.toString(); String[] splits = new String[noSplits]; int partitionSize = (splitString.length() / noSplits) - 2; @@ -176,9 +172,6 @@ public class MergeOnlyJoinITCase extends TestBase { } splits[noSplits - 1] = splitString; - return splits; - } - }