提交 d39a0ec5 编写于 作者: S sewen

Fixed Union in Optimizer Plan Enumeration.

Fixed Cancelling tests.

Adjusted example programs to use runtime context for subtask information.
上级 11f44c44
......@@ -23,7 +23,11 @@ import eu.stratosphere.pact.common.contract.Order;
import eu.stratosphere.pact.common.contract.Ordering;
import eu.stratosphere.pact.common.util.FieldList;
import eu.stratosphere.pact.common.util.FieldSet;
import eu.stratosphere.pact.compiler.CompilerException;
import eu.stratosphere.pact.compiler.plan.OptimizerNode;
import eu.stratosphere.pact.compiler.plan.candidate.Channel;
import eu.stratosphere.pact.compiler.util.Utils;
import eu.stratosphere.pact.runtime.shipping.ShipStrategyType;
/**
* This class represents global properties of the data at a certain point in the plan.
......@@ -233,6 +237,26 @@ public class GlobalProperties implements Cloneable
}
return this;
}
public void parameterizeChannel(Channel channel, boolean globalDopChange) {
switch (this.partitioning) {
case RANDOM:
channel.setShipStrategy(globalDopChange ? ShipStrategyType.PARTITION_RANDOM : ShipStrategyType.FORWARD);
break;
case FULL_REPLICATION:
channel.setShipStrategy(ShipStrategyType.BROADCAST);
break;
case ANY_PARTITIONING:
case HASH_PARTITIONED:
channel.setShipStrategy(ShipStrategyType.PARTITION_HASH, Utils.createOrderedFromSet(this.partitioningFields));
break;
case RANGE_PARTITIONED:
channel.setShipStrategy(ShipStrategyType.PARTITION_RANGE, this.ordering.getInvolvedIndexes(), this.ordering.getFieldSortDirections());
break;
default:
throw new CompilerException();
}
}
// ------------------------------------------------------------------------
......
......@@ -21,9 +21,6 @@ import eu.stratosphere.pact.common.contract.Ordering;
import eu.stratosphere.pact.common.util.FieldList;
import eu.stratosphere.pact.common.util.FieldSet;
import eu.stratosphere.pact.compiler.CompilerException;
import eu.stratosphere.pact.compiler.costs.CostEstimator;
import eu.stratosphere.pact.compiler.costs.Costs;
import eu.stratosphere.pact.compiler.plan.EstimateProvider;
import eu.stratosphere.pact.compiler.plan.OptimizerNode;
import eu.stratosphere.pact.compiler.plan.candidate.Channel;
import eu.stratosphere.pact.compiler.util.Utils;
......@@ -33,11 +30,15 @@ import eu.stratosphere.pact.runtime.task.util.LocalStrategy;
* This class represents local properties of the data. A local property is a property that exists
* within the data of a single partition.
*/
public final class RequestedLocalProperties implements Cloneable
{
private Ordering ordering; // order inside a partition, null if not ordered
public class RequestedLocalProperties implements Cloneable {
private FieldSet groupedFields; // fields by which the stream is grouped. null if not grouped.
public static final RequestedLocalProperties DEFAULT_PROPERTIES = null;
// --------------------------------------------------------------------------------------------
Ordering ordering; // order inside a partition, null if not ordered
FieldSet groupedFields; // fields by which the stream is grouped. null if not grouped.
// --------------------------------------------------------------------------------------------
......@@ -140,8 +141,7 @@ public final class RequestedLocalProperties implements Cloneable
*
* @return True, if the resulting properties are non trivial.
*/
public RequestedLocalProperties filterByNodesConstantSet(OptimizerNode node, int input)
{
public RequestedLocalProperties filterByNodesConstantSet(OptimizerNode node, int input) {
if (this.ordering != null) {
final FieldList involvedIndexes = this.ordering.getInvolvedIndexes();
for (int i = 0; i < involvedIndexes.size(); i++) {
......@@ -168,8 +168,7 @@ public final class RequestedLocalProperties implements Cloneable
* The properties for which to check whether they meet these properties.
* @return True, if the properties are met, false otherwise.
*/
public boolean isMetBy(LocalProperties other)
{
public boolean isMetBy(LocalProperties other) {
if (this.ordering != null) {
// we demand an ordering
return other.getOrdering() != null && this.ordering.isMetBy(other.getOrdering());
......@@ -200,15 +199,7 @@ public final class RequestedLocalProperties implements Cloneable
}
}
public void addMinimalRequiredCosts(Costs to, CostEstimator estimator, EstimateProvider estimate, long memory) {
if (this.ordering != null) {
estimator.addLocalSortCost(estimate, memory, to);
} else if (this.groupedFields != null) {
estimator.addLocalSortCost(estimate, memory, to);
}
}
// ------------------------------------------------------------------------
// --------------------------------------------------------------------------------------------
/**
* @param requested1
......@@ -246,12 +237,8 @@ public final class RequestedLocalProperties implements Cloneable
}
}
// ------------------------------------------------------------------------
// --------------------------------------------------------------------------------------------
/*
* (non-Javadoc)
* @see java.lang.Object#hashCode()
*/
@Override
public int hashCode() {
final int prime = 31;
......@@ -261,10 +248,6 @@ public final class RequestedLocalProperties implements Cloneable
return result;
}
/*
* (non-Javadoc)
* @see java.lang.Object#equals(java.lang.Object)
*/
@Override
public boolean equals(Object obj) {
if (obj instanceof RequestedLocalProperties) {
......@@ -276,19 +259,11 @@ public final class RequestedLocalProperties implements Cloneable
}
}
/*
* (non-Javadoc)
* @see java.lang.Object#toString()
*/
@Override
public String toString() {
return "Requested Local Properties [ordering=" + this.ordering + ", grouped=" + this.groupedFields + "]";
}
/*
* (non-Javadoc)
* @see java.lang.Object#clone()
*/
@Override
public RequestedLocalProperties clone() {
return new RequestedLocalProperties(this.ordering, this.groupedFields);
......
......@@ -33,29 +33,33 @@ import eu.stratosphere.pact.runtime.task.DriverStrategy;
/**
*
*/
public class BinaryUnionOpDescriptor extends OperatorDescriptorDual
{
/* (non-Javadoc)
* @see eu.stratosphere.pact.compiler.dataproperties.DriverProperties#getStrategy()
*/
public class BinaryUnionOpDescriptor extends OperatorDescriptorDual {
private final RequestedGlobalProperties props;
public BinaryUnionOpDescriptor(RequestedGlobalProperties props) {
super();
if (props == null) {
throw new NullPointerException();
}
this.props = props;
}
@Override
public DriverStrategy getStrategy() {
return DriverStrategy.NONE;
return DriverStrategy.UNION;
}
/* (non-Javadoc)
* @see eu.stratosphere.pact.compiler.operators.OperatorDescriptorDual#createPossibleGlobalProperties()
*/
@Override
protected List<GlobalPropertiesPair> createPossibleGlobalProperties() {
// all properties are possible
return Collections.singletonList(new GlobalPropertiesPair(
new RequestedGlobalProperties(), new RequestedGlobalProperties()));
public List<GlobalPropertiesPair> getPossibleGlobalProperties() {
return Collections.singletonList(new GlobalPropertiesPair(this.props, this.props));
}
/* (non-Javadoc)
* @see eu.stratosphere.pact.compiler.dataproperties.DriverPropertiesDual#createPossibleLocalProperties()
*/
@Override
protected List<GlobalPropertiesPair> createPossibleGlobalProperties() {
return Collections.emptyList();
}
@Override
protected List<LocalPropertiesPair> createPossibleLocalProperties() {
// all properties are possible
......@@ -63,17 +67,11 @@ public class BinaryUnionOpDescriptor extends OperatorDescriptorDual
new RequestedLocalProperties(), new RequestedLocalProperties()));
}
/* (non-Javadoc)
* @see eu.stratosphere.pact.compiler.dataproperties.DriverPropertiesDual#instantiate(eu.stratosphere.pact.compiler.plan.candidate.Channel, eu.stratosphere.pact.compiler.plan.candidate.Channel, eu.stratosphere.pact.compiler.plan.TwoInputNode)
*/
@Override
public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) {
return new BinaryUnionPlanNode((BinaryUnionNode) node, in1, in2);
}
/* (non-Javadoc)
* @see eu.stratosphere.pact.compiler.operators.OperatorDescriptorDual#computeGlobalProperties(eu.stratosphere.pact.compiler.dataproperties.GlobalProperties, eu.stratosphere.pact.compiler.dataproperties.GlobalProperties)
*/
@Override
public GlobalProperties computeGlobalProperties(GlobalProperties in1, GlobalProperties in2) {
GlobalProperties newProps = new GlobalProperties();
......@@ -88,9 +86,6 @@ public class BinaryUnionOpDescriptor extends OperatorDescriptorDual
return newProps;
}
/* (non-Javadoc)
* @see eu.stratosphere.pact.compiler.operators.OperatorDescriptorDual#computeLocalProperties(eu.stratosphere.pact.compiler.dataproperties.LocalProperties, eu.stratosphere.pact.compiler.dataproperties.LocalProperties)
*/
@Override
public LocalProperties computeLocalProperties(LocalProperties in1, LocalProperties in2) {
// all local properties are destroyed
......
......@@ -15,17 +15,22 @@
package eu.stratosphere.pact.compiler.plan;
import java.util.Collections;
import java.util.HashSet;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import eu.stratosphere.pact.common.util.FieldSet;
import eu.stratosphere.pact.compiler.CompilerException;
import eu.stratosphere.pact.compiler.DataStatistics;
import eu.stratosphere.pact.compiler.costs.CostEstimator;
import eu.stratosphere.pact.compiler.dataproperties.GlobalProperties;
import eu.stratosphere.pact.compiler.dataproperties.InterestingProperties;
import eu.stratosphere.pact.compiler.dataproperties.RequestedGlobalProperties;
import eu.stratosphere.pact.compiler.dataproperties.RequestedLocalProperties;
import eu.stratosphere.pact.compiler.operators.BinaryUnionOpDescriptor;
import eu.stratosphere.pact.compiler.operators.OperatorDescriptorDual;
import eu.stratosphere.pact.compiler.operators.OperatorDescriptorDual.LocalPropertiesPair;
import eu.stratosphere.pact.compiler.plan.candidate.Channel;
import eu.stratosphere.pact.compiler.plan.candidate.PlanNode;
import eu.stratosphere.pact.generic.contract.Contract;
import eu.stratosphere.pact.generic.contract.DualInputContract;
import eu.stratosphere.pact.generic.stub.AbstractStub;
......@@ -37,7 +42,6 @@ import eu.stratosphere.pact.runtime.shipping.ShipStrategyType;
* input.
*/
public class BinaryUnionNode extends TwoInputNode {
public BinaryUnionNode(OptimizerNode pred1, OptimizerNode pred2) {
super(new UnionPlaceholderContract());
......@@ -58,140 +62,136 @@ public class BinaryUnionNode extends TwoInputNode {
}
}
/* (non-Javadoc)
* @see eu.stratosphere.pact.compiler.plan.OptimizerNode#getName()
*/
@Override
public String getName() {
return "Union";
}
/* (non-Javadoc)
* @see eu.stratosphere.pact.compiler.plan.OptimizerNode#setInputs(java.util.Map)
*/
@Override
public void setInputs(Map<Contract, OptimizerNode> contractToNode) {
throw new UnsupportedOperationException();
}
/* (non-Javadoc)
* @see eu.stratosphere.pact.compiler.plan.TwoInputNode#getPossibleProperties()
*/
@Override
protected List<OperatorDescriptorDual> getPossibleProperties() {
return Collections.<OperatorDescriptorDual>singletonList(new BinaryUnionOpDescriptor());
return new ArrayList<OperatorDescriptorDual>();
}
@Override
public void computeUnionOfInterestingPropertiesFromSuccessors() {
super.computeUnionOfInterestingPropertiesFromSuccessors();
// clear all local properties, as they are destroyed anyways
getInterestingProperties().getLocalProperties().clear();
}
/* (non-Javadoc)
* @see eu.stratosphere.pact.compiler.plan.OptimizerNode#readStubAnnotations()
*/
@Override
protected void readStubAnnotations() {
//DO NOTHING
//do not read annotations for union nodes, as this node is artificially generated
}
/* (non-Javadoc)
* @see eu.stratosphere.pact.compiler.plan.OptimizerNode#readConstantAnnotation()
*/
@Override
protected void readConstantAnnotation() {
//DO NOTHING
}
/* (non-Javadoc)
* @see eu.stratosphere.pact.compiler.plan.OptimizerNode#isFieldConstant(int, int)
*/
@Override
public boolean isFieldConstant(int input, int fieldNumber) {
return true;
public void computeInterestingPropertiesForInputs(CostEstimator estimator) {
final InterestingProperties props = getInterestingProperties();
// if no other properties exist, add the pruned trivials back
if (props.getGlobalProperties().isEmpty()) {
props.addGlobalProperties(new RequestedGlobalProperties());
}
props.addLocalProperties(new RequestedLocalProperties());
this.input1.setInterestingProperties(props.clone());
this.input2.setInterestingProperties(props.clone());
// make these interesting properties the only allowed properties for the candidates
this.possibleProperties.clear();
for (RequestedGlobalProperties gprops : props.getGlobalProperties()) {
this.possibleProperties.add(new BinaryUnionOpDescriptor(gprops));
}
}
/*
* (non-Javadoc)
* @see eu.stratosphere.pact.compiler.plan.OptimizerNode#computeOutputEstimates(eu.stratosphere.pact.compiler.DataStatistics)
*/
@Override
public void computeOutputEstimates(DataStatistics statistics) {
this.estimatedNumRecords = 0;
this.estimatedOutputSize = 0;
// init estimated cardinalities with the fields of the first input
// remove the field which are unknown for other inputs later on
for (FieldSet fieldSet : getIncomingConnections().get(0).getSource().getEstimatedCardinalities().keySet()) {
this.estimatedCardinality.put(fieldSet, -1L);
protected void addLocalCandidates(Channel c1, Channel c2, RequestedGlobalProperties rgps1,
RequestedGlobalProperties rgps2, List<PlanNode> target, LocalPropertiesPair[] validLocalCombinations,
CostEstimator estimator)
{
if (!(rgps1.equals(rgps2))) {
return;
}
// get the global properties and clear unique fields (not preserved anyways during the union)
GlobalProperties p1 = c1.getGlobalProperties();
GlobalProperties p2 = c2.getGlobalProperties();
p1.clearUniqueFieldCombinations();
p2.clearUniqueFieldCombinations();
for (PactConnection inConn : getIncomingConnections()) {
OptimizerNode inputPact = inConn.getSource();
// sum up estimatedNumRecords for inputs
long estimatedNumRecordForInput = inputPact.estimatedNumRecords;
if (estimatedNumRecordForInput != -1 && this.estimatedNumRecords != -1) {
this.estimatedNumRecords += estimatedNumRecordForInput;
}
else {
this.estimatedNumRecords = -1;
}
// sum up estimatedOutputSize for inputs
long estimatedOutputSizeForInput = inputPact.estimatedOutputSize;
if (estimatedOutputSizeForInput != -1 && this.estimatedOutputSize != -1) {
this.estimatedOutputSize += estimatedOutputSizeForInput;
}
else {
this.estimatedOutputSize = -1;
}
// adjust the partitionings, if they exist but are not equal. this may happen when both channels have a
// partitioning that fulfills the requirements, but both are incompatible. For example may a property requirement
// be ANY_PARTITIONING on fields (0) and one channel is range partitioned on that field, the other is hash
// partitioned on that field.
if (!rgps1.isTrivial() && !(p1.equals(p2))) {
final int dop = getDegreeOfParallelism();
final int subPerInstance = getSubtasksPerInstance();
final int numInstances = dop / subPerInstance + (dop % subPerInstance == 0 ? 0 : 1);
final int inDop1 = getFirstPredecessorNode().getDegreeOfParallelism();
final int inSubPerInstance1 = getFirstPredecessorNode().getSubtasksPerInstance();
final int inNumInstances1 = inDop1 / inSubPerInstance1 + (inDop1 % inSubPerInstance1 == 0 ? 0 : 1);
final int inDop2 = getSecondPredecessorNode().getDegreeOfParallelism();
final int inSubPerInstance2 = getSecondPredecessorNode().getSubtasksPerInstance();
final int inNumInstances2 = inDop2 / inSubPerInstance2 + (inDop2 % inSubPerInstance2 == 0 ? 0 : 1);
final boolean globalDopChange1 = numInstances != inNumInstances1;
final boolean globalDopChange2 = numInstances != inNumInstances2;
//sum up cardinalities or remove them if they are unknown
Set<FieldSet> toRemove = new HashSet<FieldSet>();
for (Entry<FieldSet, Long> cardinality : this.estimatedCardinality.entrySet()) {
long inputCard = inputPact.getEstimatedCardinality(cardinality.getKey());
if (inputCard == -1) {
toRemove.add(cardinality.getKey());
}
else {
//to be conservative for joins we use the max for new column cardinality
inputCard = Math.max(inputCard, cardinality.getValue());
cardinality.setValue(inputCard);
if (c1.getShipStrategy() == ShipStrategyType.FORWARD && c2.getShipStrategy() != ShipStrategyType.FORWARD) {
// adjust c2 to c1
c2 = c2.clone();
p1.parameterizeChannel(c2,globalDopChange2);
} else if (c2.getShipStrategy() == ShipStrategyType.FORWARD && c1.getShipStrategy() != ShipStrategyType.FORWARD) {
// adjust c1 to c2
c1 = c1.clone();
p2.parameterizeChannel(c1,globalDopChange1);
} else if (c1.getShipStrategy() == ShipStrategyType.FORWARD && c2.getShipStrategy() == ShipStrategyType.FORWARD) {
boolean adjustC1 = c1.getEstimatedOutputSize() <= 0 || c2.getEstimatedOutputSize() <= 0 ||
c1.getEstimatedOutputSize() <= c2.getEstimatedOutputSize();
if (adjustC1) {
c2 = c2.clone();
p1.parameterizeChannel(c2, globalDopChange2);
} else {
c1 = c1.clone();
p2.parameterizeChannel(c1, globalDopChange1);
}
} else {
// this should never happen, as it implies both realize a different strategy, which is
// excluded by the check that the required strategies must match
throw new CompilerException("Bug in Plan Enumeration for Union Node.");
}
this.estimatedCardinality.keySet().removeAll(toRemove);
}
super.addLocalCandidates(c1, c2, rgps1, rgps2, target, validLocalCombinations, estimator);
}
@Override
protected void readStubAnnotations() {}
@Override
protected void readConstantAnnotation() {}
@Override
public boolean isFieldConstant(int input, int fieldNumber) {
return true;
}
@Override
public void computeOutputEstimates(DataStatistics statistics) {
OptimizerNode in1 = getFirstPredecessorNode();
OptimizerNode in2 = getSecondPredecessorNode();
this.estimatedNumRecords = in1.estimatedNumRecords > 0 && in2.estimatedNumRecords > 0 ?
in1.estimatedNumRecords + in2.estimatedNumRecords : -1;
this.estimatedOutputSize = in1.estimatedOutputSize > 0 && in2.estimatedOutputSize > 0 ?
in1.estimatedOutputSize + in2.estimatedOutputSize : -1;
}
/*
* (non-Javadoc)
* @see eu.stratosphere.pact.compiler.plan.OptimizerNode#computeNumberOfStubCalls()
*/
@Override
protected long computeNumberOfStubCalls() {
return this.estimatedNumRecords;
}
/*
* (non-Javadoc)
* @see eu.stratosphere.pact.compiler.plan.OptimizerNode#computeNumberOfStubCalls()
*/
@Override
protected double computeAverageRecordWidth() {
if (this.estimatedNumRecords == -1 || this.estimatedOutputSize == -1) return -1;
......@@ -212,8 +212,7 @@ public class BinaryUnionNode extends TwoInputNode {
private static final class MockStub extends AbstractStub {}
private static final class UnionPlaceholderContract extends DualInputContract<MockStub>
{
private static final class UnionPlaceholderContract extends DualInputContract<MockStub> {
private UnionPlaceholderContract() {
super(MockStub.class, "UnionPlaceholderContract");
}
......
......@@ -328,7 +328,7 @@ public abstract class TwoInputNode extends OptimizerNode {
* @see eu.stratosphere.pact.compiler.plan.OptimizerNode#getAlternativePlans()
*/
@Override
final public List<PlanNode> getAlternativePlans(CostEstimator estimator) {
public List<PlanNode> getAlternativePlans(CostEstimator estimator) {
// check if we have a cached version
if (this.cachedPlans != null) {
return this.cachedPlans;
......
......@@ -42,7 +42,7 @@ public class CustomCompensatableDotProductCoGroup extends AbstractStub implement
@Override
public void open(Configuration parameters) throws Exception {
workerIndex = ConfigUtils.asInteger("pact.parallel.task.id", parameters);
workerIndex = getRuntimeContext().getIndexOfThisSubtask();
currentIteration = ConfigUtils.asInteger("pact.iterations.currentIteration", parameters);
failingIteration = ConfigUtils.asInteger("compensation.failingIteration", parameters);
failingWorkers = ConfigUtils.asIntSet("compensation.failingWorker", parameters);
......
......@@ -40,7 +40,7 @@ public class CustomCompensatableDotProductMatch extends AbstractStub implements
@Override
public void open(Configuration parameters) throws Exception {
int workerIndex = ConfigUtils.asInteger("pact.parallel.task.id", parameters);
int workerIndex = getRuntimeContext().getIndexOfThisSubtask();
int currentIteration = ConfigUtils.asInteger("pact.iterations.currentIteration", parameters);
int failingIteration = ConfigUtils.asInteger("compensation.failingIteration", parameters);
Set<Integer> failingWorkers = ConfigUtils.asIntSet("compensation.failingWorker", parameters);
......
......@@ -29,7 +29,7 @@ public class CustomCompensatingMap extends AbstractStub implements GenericMapper
int failingIteration = ConfigUtils.asInteger("compensation.failingIteration", parameters);
isFailureIteration = currentIteration == failingIteration + 1;
int workerIndex = ConfigUtils.asInteger("pact.parallel.task.id", parameters);
int workerIndex = getRuntimeContext().getIndexOfThisSubtask();
Set<Integer> failingWorkers = ConfigUtils.asIntSet("compensation.failingWorker", parameters);
isFailingWorker = failingWorkers.contains(workerIndex);
......
......@@ -67,6 +67,7 @@ public abstract class AbstractIterativePactTask<S extends Stub, OT> extends Regu
// if the class is null, the driver has no user code
if (userCodeFunctionType != null && (this.stub == null || REINSTANTIATE_STUB_PER_ITERATION)) {
this.stub = initStub(userCodeFunctionType);
this.stub.setRuntimeContext(getRuntimeContext());
}
} catch (Exception e) {
throw new RuntimeException("Initializing the user code and the configuration failed" +
......
......@@ -53,7 +53,9 @@ public enum DriverStrategy
// the second input is inner loop, the first input is outer loop and stream-processed
NESTEDLOOP_STREAMED_OUTER_FIRST(CrossDriver.class, null, PIPELINED, MATERIALIZING, false),
// the first input is inner loop, the second input is outer loop and stream-processed
NESTEDLOOP_STREAMED_OUTER_SECOND(CrossDriver.class, null, MATERIALIZING, PIPELINED, false);
NESTEDLOOP_STREAMED_OUTER_SECOND(CrossDriver.class, null, MATERIALIZING, PIPELINED, false),
// union utility op
UNION(null, null, FULL_DAM, FULL_DAM, false);
// --------------------------------------------------------------------------------------------
......
......@@ -261,11 +261,7 @@ public class RegularPactTask<S extends Stub, OT> extends AbstractTask implements
LOG.info(formatLogString("Start PACT code."));
// sanity check the input setup
final int numInputs = this.config.getNumInputs();
if (numInputs != this.driver.getNumberOfInputs()) {
throw new Exception("Inconsistent config data: Number of inputs inconsistent with the driver requirements.");
}
final int numInputs = this.driver.getNumberOfInputs();
// whatever happens in this scope, make sure that the local strategies are cleaned up!
// note that the initialization of the local strategies is in the try-finally block as well,
// so that the thread that creates them catches its own errors that may happen in that process.
......@@ -501,14 +497,17 @@ public class RegularPactTask<S extends Stub, OT> extends AbstractTask implements
final int numInputs = this.driver.getNumberOfInputs();
final MutableReader<?>[] inputReaders = new MutableReader[numInputs];
int numGates = 0;
for (int i = 0; i < numInputs; i++) {
// ---------------- create the input readers ---------------------
// in case where a logical input unions multiple physical inputs, create a union reader
final int groupSize = this.config.getGroupSize(i);
if (groupSize < 2) {
numGates += groupSize;
if (groupSize == 1) {
// non-union case
inputReaders[i] = new MutableRecordReader<Record>(this);
} else {
} else if (groupSize > 1){
// union case
@SuppressWarnings("unchecked")
MutableRecordReader<Record>[] readers = new MutableRecordReader[groupSize];
......@@ -516,9 +515,16 @@ public class RegularPactTask<S extends Stub, OT> extends AbstractTask implements
readers[j] = new MutableRecordReader<Record>(this);
}
inputReaders[i] = new MutableUnionRecordReader<Record>(readers);
} else {
throw new Exception("Illegal input group size in task configuration: " + groupSize);
}
}
this.inputReaders = inputReaders;
// final sanity check
if (numGates != this.config.getNumInputs()) {
throw new Exception("Illegal configuration: Number of input gates and group sizes are not consistent.");
}
}
/**
......@@ -755,13 +761,12 @@ public class RegularPactTask<S extends Stub, OT> extends AbstractTask implements
if (serializer.getClass() == PactRecordSerializer.class) {
// pact record specific deserialization
@SuppressWarnings("unchecked")
MutableRecordReader<PactRecord> reader = (MutableRecordReader<PactRecord>) inputReader;
MutableReader<PactRecord> reader = (MutableReader<PactRecord>) inputReader;
return new PactRecordNepheleReaderIterator(reader, readerInterruptionBehavior(inputIndex));
} else {
// generic data type serialization
@SuppressWarnings("unchecked")
MutableRecordReader<DeserializationDelegate<?>> reader =
(MutableRecordReader<DeserializationDelegate<?>>) inputReader;
MutableReader<DeserializationDelegate<?>> reader = (MutableReader<DeserializationDelegate<?>>) inputReader;
@SuppressWarnings({ "unchecked", "rawtypes" })
final MutableObjectIterator<?> iter = new NepheleReaderIterator(reader, serializer,
readerInterruptionBehavior(inputIndex));
......
......@@ -190,6 +190,9 @@ public abstract class CancellingTestBase {
case CANCELED:
exitLoop = true;
break;
case SCHEDULED: // okay
case RUNNING:
break;
default:
throw new Exception("Bug: Unrecognized Job Status.");
}
......
......@@ -50,9 +50,7 @@ import eu.stratosphere.pact.test.util.TestBase;
* @author Fabian Hueske
*/
@RunWith(Parameterized.class)
public class CoGroupITCase extends TestBase
{
public class CoGroupITCase extends TestBase {
private static final Log LOG = LogFactory.getLog(CoGroupITCase.class);
public CoGroupITCase(String clusterConfig, Configuration testConfig) {
......
......@@ -46,13 +46,8 @@ import eu.stratosphere.pact.test.contracts.io.ContractITCaseIOFormats.ContractIT
import eu.stratosphere.pact.test.contracts.io.ContractITCaseIOFormats.ContractITCaseOutputFormat;
import eu.stratosphere.pact.test.util.TestBase;
/**
* @author Matthias Ringwald
*/
@RunWith(Parameterized.class)
public class UnionITCase extends TestBase
{
public class UnionITCase extends TestBase {
private static final Log LOG = LogFactory.getLog(UnionITCase.class);
public UnionITCase(String clusterConfig, Configuration testConfig) {
......@@ -107,7 +102,8 @@ public class UnionITCase extends TestBase
keyString = record.getField(0, keyString);
valueString = record.getField(1, valueString);
LOG.debug("Processed: [" + keyString.toString() + "," + valueString.getValue() + "]");
if (LOG.isDebugEnabled())
LOG.debug("Processed: [" + keyString.toString() + "," + valueString.getValue() + "]");
if (Integer.parseInt(keyString.toString()) + Integer.parseInt(valueString.toString()) < 10) {
......
......@@ -15,8 +15,8 @@
package eu.stratosphere.pact.test.pactPrograms;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
......@@ -118,10 +118,8 @@ public class MergeOnlyJoinITCase extends TestBase {
@Override
protected void postSubmit() throws Exception {
// Test results
compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
}
@Override
......@@ -135,8 +133,7 @@ public class MergeOnlyJoinITCase extends TestBase {
@Parameters
public static Collection<Object[]> getConfigurations() {
LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
ArrayList<Configuration> tConfigs = new ArrayList<Configuration>();
Configuration config = new Configuration();
config.setInteger("MergeOnlyJoinTest#NoSubtasks", 3);
......@@ -157,7 +154,6 @@ public class MergeOnlyJoinITCase extends TestBase {
}
private String[] splitInputString(String inputString, char splitChar, int noSplits) {
String splitString = inputString.toString();
String[] splits = new String[noSplits];
int partitionSize = (splitString.length() / noSplits) - 2;
......@@ -176,9 +172,6 @@ public class MergeOnlyJoinITCase extends TestBase {
}
splits[noSplits - 1] = splitString;
return splits;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册