提交 4285102e 编写于 作者: S StephanEwen

Added extra versions of the Cross contract that determine which side should be...

Added extra versions of the Cross contract that determine which side should be broadcastet and how to pisk the local strategies.
上级 1096d71e
......@@ -99,7 +99,7 @@ public class CrossContract extends GenericCrossContract<CrossStub> implements Re
*
* @param udf The {@link CrossStub} implementation for this Cross contract.
*/
private Builder(UserCodeWrapper<CrossStub> udf) {
protected Builder(UserCodeWrapper<CrossStub> udf) {
this.udf = udf;
this.inputs1 = new ArrayList<Contract>();
this.inputs2 = new ArrayList<Contract>();
......
/***********************************************************************************************************************
*
* Copyright (C) 2010 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.pact.common.contract;
import eu.stratosphere.pact.common.stubs.CrossStub;
import eu.stratosphere.pact.generic.contract.GenericCrossContract.CrossWithLarge;
import eu.stratosphere.pact.generic.contract.UserCodeClassWrapper;
import eu.stratosphere.pact.generic.contract.UserCodeObjectWrapper;
import eu.stratosphere.pact.generic.contract.UserCodeWrapper;
/**
* This operator represents a Cartesian-Product operation. Of the two inputs, the first is expected to be large
* and the second is expected to be small.
*
* @see CrossStub
*/
public class CrossWithLargeContract extends CrossContract implements CrossWithLarge {
/**
* Creates a Builder with the provided {@link CrossStub} implementation.
*
* @param udf The {@link CrossStub} implementation for this Cross contract.
*/
public static Builder builder(CrossStub udf) {
return new Builder(new UserCodeObjectWrapper<CrossStub>(udf));
}
/**
* Creates a Builder with the provided {@link CrossStub} implementation.
*
* @param udf The {@link CrossStub} implementation for this Cross contract.
*/
public static Builder builder(Class<? extends CrossStub> udf) {
return new Builder(new UserCodeClassWrapper<CrossStub>(udf));
}
/**
* The private constructor that only gets invoked from the Builder.
* @param builder
*/
protected CrossWithLargeContract(Builder builder) {
super(builder);
}
// --------------------------------------------------------------------------------------------
/**
* Builder pattern, straight from Joshua Bloch's Effective Java (2nd Edition).
*/
public static class Builder extends CrossContract.Builder {
/**
* Creates a Builder with the provided {@link CrossStub} implementation.
*
* @param udf The {@link CrossStub} implementation for this Cross contract.
*/
private Builder(UserCodeWrapper<CrossStub> udf) {
super(udf);
}
/**
* Creates and returns a CrossContract from using the values given
* to the builder.
*
* @return The created contract
*/
public CrossWithLargeContract build() {
return new CrossWithLargeContract(this);
}
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.pact.common.contract;
import eu.stratosphere.pact.common.stubs.CrossStub;
import eu.stratosphere.pact.generic.contract.GenericCrossContract.CrossWithSmall;
import eu.stratosphere.pact.generic.contract.UserCodeClassWrapper;
import eu.stratosphere.pact.generic.contract.UserCodeObjectWrapper;
import eu.stratosphere.pact.generic.contract.UserCodeWrapper;
/**
* This operator represents a Cartesian-Product operation. Of the two inputs, the first is expected to be large
* and the second is expected to be small.
*
* @see CrossStub
*/
public class CrossWithSmallContract extends CrossContract implements CrossWithSmall {
/**
* Creates a Builder with the provided {@link CrossStub} implementation.
*
* @param udf The {@link CrossStub} implementation for this Cross contract.
*/
public static Builder builder(CrossStub udf) {
return new Builder(new UserCodeObjectWrapper<CrossStub>(udf));
}
/**
* Creates a Builder with the provided {@link CrossStub} implementation.
*
* @param udf The {@link CrossStub} implementation for this Cross contract.
*/
public static Builder builder(Class<? extends CrossStub> udf) {
return new Builder(new UserCodeClassWrapper<CrossStub>(udf));
}
/**
* The private constructor that only gets invoked from the Builder.
* @param builder
*/
protected CrossWithSmallContract(Builder builder) {
super(builder);
}
// --------------------------------------------------------------------------------------------
/**
* Builder pattern, straight from Joshua Bloch's Effective Java (2nd Edition).
*/
public static class Builder extends CrossContract.Builder {
/**
* Creates a Builder with the provided {@link CrossStub} implementation.
*
* @param udf The {@link CrossStub} implementation for this Cross contract.
*/
private Builder(UserCodeWrapper<CrossStub> udf) {
super(udf);
}
/**
* Creates and returns a CrossContract from using the values given
* to the builder.
*
* @return The created contract
*/
public CrossWithSmallContract build() {
return new CrossWithSmallContract(this);
}
}
}
......@@ -30,8 +30,8 @@ import eu.stratosphere.pact.generic.stub.GenericCrosser;
*
* @see CrossStub
*/
public class GenericCrossContract<T extends GenericCrosser<?, ?, ?>> extends DualInputContract<T>
{
public class GenericCrossContract<T extends GenericCrosser<?, ?, ?>> extends DualInputContract<T> {
public GenericCrossContract(UserCodeWrapper<T> udf, String name) {
super(udf, name);
}
......@@ -43,4 +43,16 @@ public class GenericCrossContract<T extends GenericCrosser<?, ?, ?>> extends Dua
public GenericCrossContract(Class<? extends T> udf, String name) {
this(new UserCodeClassWrapper<T>(udf), name);
}
// --------------------------------------------------------------------------------------------
/**
* Marker interface to declare the second input as the smaller one.
*/
public static interface CrossWithSmall {}
/**
* Marker interface to declare the second input as the larger one.
*/
public static interface CrossWithLarge {}
}
......@@ -28,26 +28,42 @@ import eu.stratosphere.pact.compiler.plan.TwoInputNode;
import eu.stratosphere.pact.compiler.plan.candidate.Channel;
import eu.stratosphere.pact.compiler.plan.candidate.DualInputPlanNode;
/**
*
*/
public abstract class CartesianProductDescriptor extends OperatorDescriptorDual {
private final boolean allowBroadcastFirst;
private final boolean allowBroadcastSecond;
protected CartesianProductDescriptor(boolean allowBroadcastFirst, boolean allowBroadcastSecond) {
if (!(allowBroadcastFirst | allowBroadcastSecond))
throw new IllegalArgumentException();
this.allowBroadcastFirst = allowBroadcastFirst;
this.allowBroadcastSecond = allowBroadcastSecond;
}
@Override
protected List<GlobalPropertiesPair> createPossibleGlobalProperties() {
ArrayList<GlobalPropertiesPair> pairs = new ArrayList<GlobalPropertiesPair>();
{ // replicate second
RequestedGlobalProperties any1 = new RequestedGlobalProperties();
RequestedGlobalProperties replicated2 = new RequestedGlobalProperties();
replicated2.setFullyReplicated();
pairs.add(new GlobalPropertiesPair(any1, replicated2));
}
{ // replicate first
if (this.allowBroadcastFirst) {
// replicate first
RequestedGlobalProperties replicated1 = new RequestedGlobalProperties();
replicated1.setFullyReplicated();
RequestedGlobalProperties any2 = new RequestedGlobalProperties();
pairs.add(new GlobalPropertiesPair(replicated1, any2));
}
if (this.allowBroadcastSecond) {
// replicate second
RequestedGlobalProperties any1 = new RequestedGlobalProperties();
RequestedGlobalProperties replicated2 = new RequestedGlobalProperties();
replicated2.setFullyReplicated();
pairs.add(new GlobalPropertiesPair(any1, replicated2));
}
return pairs;
}
......
......@@ -23,6 +23,14 @@ import eu.stratosphere.pact.runtime.task.DriverStrategy;
*/
public class CrossBlockOuterFirstDescriptor extends CartesianProductDescriptor {
public CrossBlockOuterFirstDescriptor() {
this(true, true);
}
public CrossBlockOuterFirstDescriptor(boolean allowBroadcastFirst, boolean allowBroadcastSecond) {
super(allowBroadcastFirst, allowBroadcastSecond);
}
@Override
public DriverStrategy getStrategy() {
return DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_FIRST;
......
......@@ -23,6 +23,14 @@ import eu.stratosphere.pact.runtime.task.DriverStrategy;
*/
public class CrossBlockOuterSecondDescriptor extends CartesianProductDescriptor {
public CrossBlockOuterSecondDescriptor() {
this(true, true);
}
public CrossBlockOuterSecondDescriptor(boolean allowBroadcastFirst, boolean allowBroadcastSecond) {
super(allowBroadcastFirst, allowBroadcastSecond);
}
@Override
public DriverStrategy getStrategy() {
return DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_SECOND;
......
......@@ -23,6 +23,14 @@ import eu.stratosphere.pact.runtime.task.DriverStrategy;
*/
public class CrossStreamOuterFirstDescriptor extends CartesianProductDescriptor {
public CrossStreamOuterFirstDescriptor() {
this(true, true);
}
public CrossStreamOuterFirstDescriptor(boolean allowBroadcastFirst, boolean allowBroadcastSecond) {
super(allowBroadcastFirst, allowBroadcastSecond);
}
@Override
public DriverStrategy getStrategy() {
return DriverStrategy.NESTEDLOOP_STREAMED_OUTER_FIRST;
......
......@@ -23,6 +23,14 @@ import eu.stratosphere.pact.runtime.task.DriverStrategy;
*/
public class CrossStreamOuterSecondDescriptor extends CartesianProductDescriptor {
public CrossStreamOuterSecondDescriptor() {
this(true, true);
}
public CrossStreamOuterSecondDescriptor(boolean allowBroadcastFirst, boolean allowBroadcastSecond) {
super(allowBroadcastFirst, allowBroadcastSecond);
}
@Override
public DriverStrategy getStrategy() {
return DriverStrategy.NESTEDLOOP_STREAMED_OUTER_SECOND;
......
......@@ -33,8 +33,8 @@ import eu.stratosphere.pact.generic.contract.GenericCrossContract;
/**
* The Optimizer representation of a <i>Cross</i> contract node.
*/
public class CrossNode extends TwoInputNode
{
public class CrossNode extends TwoInputNode {
/**
* Creates a new CrossNode for the given contract.
*
......@@ -56,46 +56,55 @@ public class CrossNode extends TwoInputNode
return (GenericCrossContract<?>) super.getPactContract();
}
/*
* (non-Javadoc)
* @see eu.stratosphere.pact.compiler.plan.OptimizerNode#getName()
*/
@Override
public String getName() {
return "Cross";
}
/* (non-Javadoc)
* @see eu.stratosphere.pact.compiler.plan.TwoInputNode#getPossibleProperties()
*/
@Override
protected List<OperatorDescriptorDual> getPossibleProperties() {
Configuration conf = getPactContract().getParameters();
String localStrategy = conf.getString(PactCompiler.HINT_LOCAL_STRATEGY, null);
if (localStrategy != null) {
final OperatorDescriptorDual fixedDriverStrat;
if (PactCompiler.HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_FIRST.equals(localStrategy)) {
fixedDriverStrat = new CrossBlockOuterFirstDescriptor();
} else if (PactCompiler.HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_SECOND.equals(localStrategy)) {
fixedDriverStrat = new CrossBlockOuterSecondDescriptor();
} else if (PactCompiler.HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_FIRST.equals(localStrategy)) {
fixedDriverStrat = new CrossStreamOuterFirstDescriptor();
} else if (PactCompiler.HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_SECOND.equals(localStrategy)) {
fixedDriverStrat = new CrossStreamOuterSecondDescriptor();
} else {
throw new CompilerException("Invalid local strategy hint for cross contract: " + localStrategy);
}
return Collections.singletonList(fixedDriverStrat);
} else {
GenericCrossContract<?> operation = getPactContract();
if (operation instanceof GenericCrossContract.CrossWithSmall) {
ArrayList<OperatorDescriptorDual> list = new ArrayList<OperatorDescriptorDual>();
list.add(new CrossBlockOuterFirstDescriptor());
list.add(new CrossBlockOuterSecondDescriptor());
list.add(new CrossStreamOuterFirstDescriptor());
list.add(new CrossStreamOuterSecondDescriptor());
list.add(new CrossBlockOuterSecondDescriptor(false, true));
list.add(new CrossStreamOuterFirstDescriptor(false, true));
return list;
}
else if (operation instanceof GenericCrossContract.CrossWithLarge) {
ArrayList<OperatorDescriptorDual> list = new ArrayList<OperatorDescriptorDual>();
list.add(new CrossBlockOuterFirstDescriptor(true, false));
list.add(new CrossStreamOuterSecondDescriptor(true, false));
return list;
}
else {
Configuration conf = operation.getParameters();
String localStrategy = conf.getString(PactCompiler.HINT_LOCAL_STRATEGY, null);
if (localStrategy != null) {
final OperatorDescriptorDual fixedDriverStrat;
if (PactCompiler.HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_FIRST.equals(localStrategy)) {
fixedDriverStrat = new CrossBlockOuterFirstDescriptor();
} else if (PactCompiler.HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_SECOND.equals(localStrategy)) {
fixedDriverStrat = new CrossBlockOuterSecondDescriptor();
} else if (PactCompiler.HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_FIRST.equals(localStrategy)) {
fixedDriverStrat = new CrossStreamOuterFirstDescriptor();
} else if (PactCompiler.HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_SECOND.equals(localStrategy)) {
fixedDriverStrat = new CrossStreamOuterSecondDescriptor();
} else {
throw new CompilerException("Invalid local strategy hint for cross contract: " + localStrategy);
}
return Collections.singletonList(fixedDriverStrat);
} else {
ArrayList<OperatorDescriptorDual> list = new ArrayList<OperatorDescriptorDual>();
list.add(new CrossBlockOuterFirstDescriptor());
list.add(new CrossBlockOuterSecondDescriptor());
list.add(new CrossStreamOuterFirstDescriptor());
list.add(new CrossStreamOuterSecondDescriptor());
return list;
}
}
}
/**
......
/***********************************************************************************************************************
*
* Copyright (C) 2013 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.pact.compiler;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import org.junit.Test;
import eu.stratosphere.pact.common.contract.CrossContract;
import eu.stratosphere.pact.common.contract.CrossWithLargeContract;
import eu.stratosphere.pact.common.contract.CrossWithSmallContract;
import eu.stratosphere.pact.common.contract.FileDataSink;
import eu.stratosphere.pact.common.contract.FileDataSource;
import eu.stratosphere.pact.common.plan.Plan;
import eu.stratosphere.pact.compiler.plan.candidate.Channel;
import eu.stratosphere.pact.compiler.plan.candidate.DualInputPlanNode;
import eu.stratosphere.pact.compiler.plan.candidate.OptimizedPlan;
import eu.stratosphere.pact.compiler.util.DummyCrossStub;
import eu.stratosphere.pact.compiler.util.DummyInputFormat;
import eu.stratosphere.pact.compiler.util.DummyOutputFormat;
import eu.stratosphere.pact.runtime.shipping.ShipStrategyType;
/**
* Tests that validate optimizer choices when using operators that are requesting certain specific execution
* strategies.
*/
public class AdditionalOperatorsTest extends CompilerTestBase {
@Test
public void testCrossWithSmall() {
// construct the plan
FileDataSource source1 = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source 1");
FileDataSource source2 = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source 2");
CrossContract cross = CrossWithSmallContract.builder(new DummyCrossStub())
.input1(source1).input2(source2)
.name("Cross").build();
FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, cross, "Sink");
Plan plan = new Plan(sink);
plan.setDefaultParallelism(DEFAULT_PARALLELISM);
try {
OptimizedPlan oPlan = compileNoStats(plan);
OptimizerPlanNodeResolver resolver = new OptimizerPlanNodeResolver(oPlan);
DualInputPlanNode crossPlanNode = resolver.getNode("Cross");
Channel in1 = crossPlanNode.getInput1();
Channel in2 = crossPlanNode.getInput2();
assertEquals(ShipStrategyType.FORWARD, in1.getShipStrategy());
assertEquals(ShipStrategyType.BROADCAST, in2.getShipStrategy());
} catch(CompilerException ce) {
ce.printStackTrace();
fail("The pact compiler is unable to compile this plan correctly.");
}
}
@Test
public void testCrossWithLarge() {
// construct the plan
FileDataSource source1 = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source 1");
FileDataSource source2 = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source 2");
CrossContract cross= CrossWithLargeContract.builder(new DummyCrossStub())
.input1(source1).input2(source2)
.name("Cross").build();
FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, cross, "Sink");
Plan plan = new Plan(sink);
plan.setDefaultParallelism(DEFAULT_PARALLELISM);
try {
OptimizedPlan oPlan = compileNoStats(plan);
OptimizerPlanNodeResolver resolver = new OptimizerPlanNodeResolver(oPlan);
DualInputPlanNode crossPlanNode = resolver.getNode("Cross");
Channel in1 = crossPlanNode.getInput1();
Channel in2 = crossPlanNode.getInput2();
assertEquals(ShipStrategyType.BROADCAST, in1.getShipStrategy());
assertEquals(ShipStrategyType.FORWARD, in2.getShipStrategy());
} catch(CompilerException ce) {
ce.printStackTrace();
fail("The pact compiler is unable to compile this plan correctly.");
}
}
}
......@@ -15,7 +15,7 @@
package eu.stratosphere.pact.compiler;
import static org.junit.Assert.*;
import static org.junit.Assert.fail;
import org.junit.Test;
......@@ -29,12 +29,11 @@ import eu.stratosphere.pact.compiler.util.DummyInputFormat;
import eu.stratosphere.pact.compiler.util.DummyOutputFormat;
import eu.stratosphere.pact.compiler.util.IdentityReduce;
/**
* This test case has been created to validate a bug that occurred when
* the ReduceContract was used without a grouping key.
*/
public class ReduceAllTest extends CompilerTestBase {
public class ReduceAllTest extends CompilerTestBase {
@Test
public void testReduce() {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册