提交 2bb3e982 编写于 作者: S StephanEwen

Add workset iteration translation tests.

Add check that solution set is joined with on correct keys.
上级 96eead00
......@@ -102,7 +102,7 @@ public class DefaultCostEstimator extends CostEstimator {
} else {
costs.addNetworkCost(replicationFactor * estOutShipSize);
}
costs.addHeuristicNetworkCost(HEURISTIC_COST_BASE * replicationFactor);
costs.addHeuristicNetworkCost(HEURISTIC_COST_BASE * replicationFactor * 100);
}
// --------------------------------------------------------------------------------------------
......
......@@ -395,34 +395,51 @@ public class PlanJSONDumpGenerator {
if (p.getDriverStrategy() != null) {
switch (p.getDriverStrategy()) {
case NONE:
case BINARY_NO_OP:
break;
case UNARY_NO_OP:
locString = "No-Op";
break;
case COLLECTOR_MAP:
case MAP:
case FLAT_MAP:
locString = "Map";
break;
case SORTED_GROUP_COMBINE:
locString = "Sorted Combine";
break;
case SORTED_GROUP_REDUCE:
locString = "Sorted Group Reduce";
case ALL_REDUCE:
locString = "Reduce All";
break;
case ALL_GROUP_REDUCE:
case ALL_GROUP_REDUCE:
case ALL_GROUP_COMBINE:
locString = "Group Reduce All";
break;
case SORTED_REDUCE:
locString = "Sorted Incremental Reduce";
locString = "Sorted Reduce";
break;
case ALL_REDUCE:
locString = "Incremental Reduce All";
case SORTED_PARTIAL_REDUCE:
locString = "Sorted Combine/Reduce";
break;
case SORTED_GROUP_REDUCE:
locString = "Sorted Group Reduce";
break;
case SORTED_GROUP_COMBINE:
locString = "Sorted Combine";
break;
case HYBRIDHASH_BUILD_FIRST:
locString = "Hybrid Hash (build: " + child1name + ")";
break;
case HYBRIDHASH_BUILD_SECOND:
locString = "Hybrid Hash (build: " + child2name + ")";
break;
case NESTEDLOOP_BLOCKED_OUTER_FIRST:
locString = "Nested Loops (Blocked Outer: " + child1name + ")";
break;
......@@ -435,12 +452,15 @@ public class PlanJSONDumpGenerator {
case NESTEDLOOP_STREAMED_OUTER_SECOND:
locString = "Nested Loops (Streamed Outer: " + child2name + ")";
break;
case MERGE:
locString = "Merge";
break;
case CO_GROUP:
locString = "Co-Group";
break;
default:
throw new CompilerException("Unknown local strategy '" + p.getDriverStrategy().name()
+ "' in JSON generator.");
......
......@@ -40,6 +40,7 @@ import eu.stratosphere.pact.runtime.shipping.ShipStrategyType;
* Tests that validate optimizer choices when using operators that are requesting certain specific execution
* strategies.
*/
@SuppressWarnings("serial")
public class AdditionalOperatorsTest extends CompilerTestBase {
@Test
......
......@@ -48,8 +48,7 @@ import eu.stratosphere.pact.compiler.util.IdentityReduce;
import eu.stratosphere.types.IntValue;
import eu.stratosphere.types.LongValue;
/**
*/
@SuppressWarnings("serial")
public class BranchingPlansCompilerTest extends CompilerTestBase {
......
......@@ -50,7 +50,9 @@ import eu.stratosphere.util.Visitor;
/**
*
*/
public abstract class CompilerTestBase {
public abstract class CompilerTestBase implements java.io.Serializable {
private static final long serialVersionUID = 1L;
protected static final String IN_FILE = OperatingSystem.isWindows() ? "file:/c:/" : "file:///dev/random";
......@@ -64,15 +66,15 @@ public abstract class CompilerTestBase {
// ------------------------------------------------------------------------
protected DataStatistics dataStats;
protected transient DataStatistics dataStats;
protected PactCompiler withStatsCompiler;
protected transient PactCompiler withStatsCompiler;
protected PactCompiler noStatsCompiler;
protected transient PactCompiler noStatsCompiler;
protected InstanceTypeDescription instanceType;
protected transient InstanceTypeDescription instanceType;
private int statCounter;
private transient int statCounter;
// ------------------------------------------------------------------------
......
......@@ -47,6 +47,7 @@ import eu.stratosphere.util.Visitor;
* parallelism between tasks is increased or decreased.
* </ul>
*/
@SuppressWarnings("serial")
public class DOPChangeTest extends CompilerTestBase {
/**
......
......@@ -48,6 +48,7 @@ import eu.stratosphere.types.StringValue;
* This test case has been created to validate that correct strategies are used if orders within groups are
* requested.
*/
@SuppressWarnings("serial")
public class GroupOrderTest extends CompilerTestBase {
@Test
......
......@@ -36,6 +36,7 @@ import eu.stratosphere.types.IntValue;
* <li> Ticket 158
* </ul>
*/
@SuppressWarnings("serial")
public class HardPlansCompilationTest extends CompilerTestBase
{
......
......@@ -32,6 +32,7 @@ import eu.stratosphere.pact.compiler.util.IdentityReduce;
* This test case has been created to validate a bug that occurred when
* the ReduceOperator was used without a grouping key.
*/
@SuppressWarnings("serial")
public class ReduceAllTest extends CompilerTestBase {
@Test
......
......@@ -44,6 +44,7 @@ import eu.stratosphere.util.Collector;
import eu.stratosphere.util.Visitor;
@SuppressWarnings("serial")
public class UnionPropertyPropagationTest extends CompilerTestBase {
@SuppressWarnings("deprecation")
......
/***********************************************************************************************************************
*
* Copyright (C) 2013 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.pact.compiler;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.Iterator;
import org.junit.Test;
import eu.stratosphere.api.common.InvalidProgramException;
import eu.stratosphere.api.common.Plan;
import eu.stratosphere.api.common.operators.util.FieldList;
import eu.stratosphere.api.java.DataSet;
import eu.stratosphere.api.java.DeltaIteration;
import eu.stratosphere.api.java.ExecutionEnvironment;
import eu.stratosphere.api.java.functions.GroupReduceFunction;
import eu.stratosphere.api.java.functions.JoinFunction;
import eu.stratosphere.api.java.functions.MapFunction;
import eu.stratosphere.api.java.tuple.Tuple3;
import eu.stratosphere.compiler.plan.DualInputPlanNode;
import eu.stratosphere.compiler.plan.OptimizedPlan;
import eu.stratosphere.compiler.plan.SingleInputPlanNode;
import eu.stratosphere.compiler.plantranslate.NepheleJobGraphGenerator;
import eu.stratosphere.pact.runtime.shipping.ShipStrategyType;
import eu.stratosphere.util.Collector;
/**
* Tests that validate optimizer choices when using operators that are requesting certain specific execution
* strategies.
*/
@SuppressWarnings("serial")
public class WorksetIterationsJavaApiCompilerTest extends CompilerTestBase {
private static final String JOIN_WITH_INVARIANT_NAME = "Test Join Invariant";
private static final String JOIN_WITH_SOLUTION_SET = "Test Join SolutionSet";
private static final String NEXT_WORKSET_REDUCER_NAME = "Test Reduce Workset";
private static final String SOLUTION_DELTA_MAPPER_NAME = "Test Map Delta";
@Test
public void testJavaApiWithDeferredSoltionSetUpdateWithMapper() {
try {
Plan plan = getJavaTestPlan(false, true);
OptimizedPlan oPlan = compileNoStats(plan);
OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(oPlan);
DualInputPlanNode joinWithInvariantNode = resolver.getNode(JOIN_WITH_INVARIANT_NAME);
DualInputPlanNode joinWithSolutionSetNode = resolver.getNode(JOIN_WITH_SOLUTION_SET);
SingleInputPlanNode worksetReducer = resolver.getNode(NEXT_WORKSET_REDUCER_NAME);
SingleInputPlanNode deltaMapper = resolver.getNode(SOLUTION_DELTA_MAPPER_NAME);
// iteration preserves partitioning in reducer, so the first partitioning is out of the loop,
// the in-loop partitioning is before the final reducer
// verify joinWithInvariant
assertEquals(ShipStrategyType.FORWARD, joinWithInvariantNode.getInput1().getShipStrategy());
assertEquals(ShipStrategyType.PARTITION_HASH, joinWithInvariantNode.getInput2().getShipStrategy());
assertEquals(new FieldList(1, 2), joinWithInvariantNode.getKeysForInput1());
assertEquals(new FieldList(1, 2), joinWithInvariantNode.getKeysForInput2());
// verify joinWithSolutionSet
assertEquals(ShipStrategyType.PARTITION_HASH, joinWithSolutionSetNode.getInput1().getShipStrategy());
assertEquals(ShipStrategyType.FORWARD, joinWithSolutionSetNode.getInput2().getShipStrategy());
assertEquals(new FieldList(0, 1), joinWithSolutionSetNode.getKeysForInput1());
// verify reducer
assertEquals(ShipStrategyType.PARTITION_HASH, worksetReducer.getInput().getShipStrategy());
assertEquals(new FieldList(1, 2), worksetReducer.getKeys());
// currently, the system may partition before or after the mapper
ShipStrategyType ss1 = deltaMapper.getInput().getShipStrategy();
ShipStrategyType ss2 = deltaMapper.getOutgoingChannels().get(0).getShipStrategy();
assertTrue( (ss1 == ShipStrategyType.FORWARD && ss2 == ShipStrategyType.PARTITION_HASH) ||
(ss2 == ShipStrategyType.FORWARD && ss1 == ShipStrategyType.PARTITION_HASH) );
new NepheleJobGraphGenerator().compileJobGraph(oPlan);
}
catch (Exception e) {
System.err.println(e.getMessage());
e.printStackTrace();
fail("Test errored: " + e.getMessage());
}
}
@Test
public void testRecordApiWithDeferredSoltionSetUpdateWithNonPreservingJoin() {
try {
Plan plan = getJavaTestPlan(false, false);
OptimizedPlan oPlan = compileNoStats(plan);
OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(oPlan);
DualInputPlanNode joinWithInvariantNode = resolver.getNode(JOIN_WITH_INVARIANT_NAME);
DualInputPlanNode joinWithSolutionSetNode = resolver.getNode(JOIN_WITH_SOLUTION_SET);
SingleInputPlanNode worksetReducer = resolver.getNode(NEXT_WORKSET_REDUCER_NAME);
// iteration preserves partitioning in reducer, so the first partitioning is out of the loop,
// the in-loop partitioning is before the final reducer
// verify joinWithInvariant
assertEquals(ShipStrategyType.FORWARD, joinWithInvariantNode.getInput1().getShipStrategy());
assertEquals(ShipStrategyType.PARTITION_HASH, joinWithInvariantNode.getInput2().getShipStrategy());
assertEquals(new FieldList(1, 2), joinWithInvariantNode.getKeysForInput1());
assertEquals(new FieldList(1, 2), joinWithInvariantNode.getKeysForInput2());
// verify joinWithSolutionSet
assertEquals(ShipStrategyType.PARTITION_HASH, joinWithSolutionSetNode.getInput1().getShipStrategy());
assertEquals(ShipStrategyType.FORWARD, joinWithSolutionSetNode.getInput2().getShipStrategy());
assertEquals(new FieldList(0, 1), joinWithSolutionSetNode.getKeysForInput1());
// verify reducer
assertEquals(ShipStrategyType.PARTITION_HASH, worksetReducer.getInput().getShipStrategy());
assertEquals(new FieldList(1, 2), worksetReducer.getKeys());
// verify solution delta
assertEquals(2, joinWithSolutionSetNode.getOutgoingChannels().size());
assertEquals(ShipStrategyType.PARTITION_HASH, joinWithSolutionSetNode.getOutgoingChannels().get(0).getShipStrategy());
assertEquals(ShipStrategyType.PARTITION_HASH, joinWithSolutionSetNode.getOutgoingChannels().get(1).getShipStrategy());
new NepheleJobGraphGenerator().compileJobGraph(oPlan);
}
catch (Exception e) {
System.err.println(e.getMessage());
e.printStackTrace();
fail("Test errored: " + e.getMessage());
}
}
@Test
public void testRecordApiWithDirectSoltionSetUpdate() {
try {
Plan plan = getJavaTestPlan(true, false);
OptimizedPlan oPlan = compileNoStats(plan);
OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(oPlan);
DualInputPlanNode joinWithInvariantNode = resolver.getNode(JOIN_WITH_INVARIANT_NAME);
DualInputPlanNode joinWithSolutionSetNode = resolver.getNode(JOIN_WITH_SOLUTION_SET);
SingleInputPlanNode worksetReducer = resolver.getNode(NEXT_WORKSET_REDUCER_NAME);
// iteration preserves partitioning in reducer, so the first partitioning is out of the loop,
// the in-loop partitioning is before the final reducer
// verify joinWithInvariant
assertEquals(ShipStrategyType.FORWARD, joinWithInvariantNode.getInput1().getShipStrategy());
assertEquals(ShipStrategyType.PARTITION_HASH, joinWithInvariantNode.getInput2().getShipStrategy());
assertEquals(new FieldList(1, 2), joinWithInvariantNode.getKeysForInput1());
assertEquals(new FieldList(1, 2), joinWithInvariantNode.getKeysForInput2());
// verify joinWithSolutionSet
assertEquals(ShipStrategyType.PARTITION_HASH, joinWithSolutionSetNode.getInput1().getShipStrategy());
assertEquals(ShipStrategyType.FORWARD, joinWithSolutionSetNode.getInput2().getShipStrategy());
assertEquals(new FieldList(0, 1), joinWithSolutionSetNode.getKeysForInput1());
// verify reducer
assertEquals(ShipStrategyType.FORWARD, worksetReducer.getInput().getShipStrategy());
assertEquals(new FieldList(1, 2), worksetReducer.getKeys());
// verify solution delta
assertEquals(1, joinWithSolutionSetNode.getOutgoingChannels().size());
assertEquals(ShipStrategyType.FORWARD, joinWithSolutionSetNode.getOutgoingChannels().get(0).getShipStrategy());
new NepheleJobGraphGenerator().compileJobGraph(oPlan);
}
catch (Exception e) {
System.err.println(e.getMessage());
e.printStackTrace();
fail("Test errored: " + e.getMessage());
}
}
@Test
public void testRejectPlanIfSolutionSetKeysAndJoinKeysDontMatch() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
@SuppressWarnings("unchecked")
DataSet<Tuple3<Long, Long, Long>> solutionSetInput = env.fromElements(new Tuple3<Long, Long, Long>(1L, 2L, 3L)).name("Solution Set");
@SuppressWarnings("unchecked")
DataSet<Tuple3<Long, Long, Long>> worksetInput = env.fromElements(new Tuple3<Long, Long, Long>(1L, 2L, 3L)).name("Workset");
@SuppressWarnings("unchecked")
DataSet<Tuple3<Long, Long, Long>> invariantInput = env.fromElements(new Tuple3<Long, Long, Long>(1L, 2L, 3L)).name("Invariant Input");
DeltaIteration<Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>> iter = solutionSetInput.iterateDelta(worksetInput, 100, 1, 2);
DataSet<Tuple3<Long, Long, Long>> result =
iter.getWorkset().join(invariantInput)
.where(1, 2)
.equalTo(1, 2)
.with(new JoinFunction<Tuple3<Long,Long,Long>, Tuple3<Long, Long, Long>, Tuple3<Long,Long,Long>>() {
public Tuple3<Long, Long, Long> join(Tuple3<Long, Long, Long> first, Tuple3<Long, Long, Long> second) {
return first;
}
});
try {
result.join(iter.getSolutionSet())
.where(1, 0)
.equalTo(0, 2)
.with(new JoinFunction<Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>>() {
public Tuple3<Long, Long, Long> join(Tuple3<Long, Long, Long> first, Tuple3<Long, Long, Long> second) {
return second;
}
});
fail("The join should be rejected with key type mismatches.");
}
catch (InvalidProgramException e) {
// expected!
}
}
catch (Exception e) {
System.err.println(e.getMessage());
e.printStackTrace();
fail("Test errored: " + e.getMessage());
}
}
private Plan getJavaTestPlan(boolean joinPreservesSolutionSet, boolean mapBeforeSolutionDelta) {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
@SuppressWarnings("unchecked")
DataSet<Tuple3<Long, Long, Long>> solutionSetInput = env.fromElements(new Tuple3<Long, Long, Long>(1L, 2L, 3L)).name("Solution Set");
@SuppressWarnings("unchecked")
DataSet<Tuple3<Long, Long, Long>> worksetInput = env.fromElements(new Tuple3<Long, Long, Long>(1L, 2L, 3L)).name("Workset");
@SuppressWarnings("unchecked")
DataSet<Tuple3<Long, Long, Long>> invariantInput = env.fromElements(new Tuple3<Long, Long, Long>(1L, 2L, 3L)).name("Invariant Input");
DeltaIteration<Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>> iter = solutionSetInput.iterateDelta(worksetInput, 100, 1, 2);
DataSet<Tuple3<Long, Long, Long>> joinedWithSolutionSet =
iter.getWorkset().join(invariantInput)
.where(1, 2)
.equalTo(1, 2)
.with(new JoinFunction<Tuple3<Long,Long,Long>, Tuple3<Long, Long, Long>, Tuple3<Long,Long,Long>>() {
public Tuple3<Long, Long, Long> join(Tuple3<Long, Long, Long> first, Tuple3<Long, Long, Long> second) {
return first;
}
})
.name(JOIN_WITH_INVARIANT_NAME)
.join(iter.getSolutionSet())
.where(1, 0)
.equalTo(1, 2)
.with(new JoinFunction<Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>>() {
public Tuple3<Long, Long, Long> join(Tuple3<Long, Long, Long> first, Tuple3<Long, Long, Long> second) {
return second;
}
})
.name(JOIN_WITH_SOLUTION_SET)
.withConstantSetSecond(joinPreservesSolutionSet ? new String[] {"0->0", "1->1", "2->2" } : null);
DataSet<Tuple3<Long, Long, Long>> nextWorkset = joinedWithSolutionSet.groupBy(1, 2)
.reduceGroup(new GroupReduceFunction<Tuple3<Long,Long,Long>, Tuple3<Long,Long,Long>>() {
public void reduce(Iterator<Tuple3<Long, Long, Long>> values, Collector<Tuple3<Long, Long, Long>> out) {}
})
.name(NEXT_WORKSET_REDUCER_NAME)
.withConstantSet("1->1","2->2","0->0");
DataSet<Tuple3<Long, Long, Long>> nextSolutionSet = mapBeforeSolutionDelta ?
joinedWithSolutionSet.map(new MapFunction<Tuple3<Long, Long, Long>,Tuple3<Long, Long, Long>>() { public Tuple3<Long, Long, Long> map(Tuple3<Long, Long, Long> value) { return value; } })
.name(SOLUTION_DELTA_MAPPER_NAME).withConstantSet("0->0","1->1","2->2") :
joinedWithSolutionSet;
iter.closeWith(nextSolutionSet, nextWorkset)
.print();
return env.createProgramPlan();
}
}
......@@ -48,7 +48,9 @@ import eu.stratosphere.types.LongValue;
* Tests that validate optimizer choices when using operators that are requesting certain specific execution
* strategies.
*/
public class WorksetIterationsCompilerTest extends CompilerTestBase {
public class WorksetIterationsRecordApiCompilerTest extends CompilerTestBase {
private static final long serialVersionUID = 1L;
private static final String ITERATION_NAME = "Test Workset Iteration";
private static final String JOIN_WITH_INVARIANT_NAME = "Test Join Invariant";
......@@ -59,8 +61,8 @@ public class WorksetIterationsCompilerTest extends CompilerTestBase {
private final FieldList list0 = new FieldList(0);
@Test
public void testWithDeferredSoltionSetUpdateWithMapper() {
Plan plan = getTestPlan(false, true);
public void testRecordApiWithDeferredSoltionSetUpdateWithMapper() {
Plan plan = getRecordTestPlan(false, true);
OptimizedPlan oPlan;
try {
......@@ -105,8 +107,8 @@ public class WorksetIterationsCompilerTest extends CompilerTestBase {
}
@Test
public void testWithDeferredSoltionSetUpdateWithNonPreservingJoin() {
Plan plan = getTestPlan(false, false);
public void testRecordApiWithDeferredSoltionSetUpdateWithNonPreservingJoin() {
Plan plan = getRecordTestPlan(false, false);
OptimizedPlan oPlan;
try {
......@@ -149,8 +151,8 @@ public class WorksetIterationsCompilerTest extends CompilerTestBase {
}
@Test
public void testWithDirectSoltionSetUpdate() {
Plan plan = getTestPlan(true, false);
public void testRecordApiWithDirectSoltionSetUpdate() {
Plan plan = getRecordTestPlan(true, false);
OptimizedPlan oPlan;
try {
......@@ -191,7 +193,7 @@ public class WorksetIterationsCompilerTest extends CompilerTestBase {
new NepheleJobGraphGenerator().compileJobGraph(oPlan);
}
private Plan getTestPlan(boolean joinPreservesSolutionSet, boolean mapBeforeSolutionDelta) {
private Plan getRecordTestPlan(boolean joinPreservesSolutionSet, boolean mapBeforeSolutionDelta) {
FileDataSource solutionSetInput = new FileDataSource(new DummyInputFormat(), IN_FILE, "Solution Set");
FileDataSource worksetInput = new FileDataSource(new DummyInputFormat(), IN_FILE, "Workset");
......
......@@ -32,7 +32,7 @@ public class FieldList extends FieldSet {
add(columnIndex);
}
public FieldList(int[] columnIndexes) {
public FieldList(int... columnIndexes) {
this();
addAll(columnIndexes);
}
......
......@@ -14,6 +14,9 @@
**********************************************************************************************************************/
package eu.stratosphere.api.java;
import java.util.Arrays;
import eu.stratosphere.api.common.InvalidProgramException;
import eu.stratosphere.api.java.operators.Keys;
import eu.stratosphere.api.java.typeutils.TypeInformation;
......@@ -42,7 +45,7 @@ public class DeltaIteration<ST, WT> {
DeltaIteration(ExecutionEnvironment context, TypeInformation<ST> type, DataSet<ST> solutionSet, DataSet<WT> workset, Keys<ST> keys, int maxIterations) {
initialSolutionSet = solutionSet;
initialWorkset = workset;
solutionSetPlaceholder = new SolutionSetPlaceHolder<ST>(context, solutionSet.getType());
solutionSetPlaceholder = new SolutionSetPlaceHolder<ST>(context, solutionSet.getType(), this);
worksetPlaceholder = new WorksetPlaceHolder<WT>(context, workset.getType());
this.keys = keys;
this.maxIterations = maxIterations;
......@@ -131,8 +134,19 @@ public class DeltaIteration<ST, WT> {
* @param <ST> The type of the elements in the solution set.
*/
public static class SolutionSetPlaceHolder<ST> extends DataSet<ST>{
private SolutionSetPlaceHolder(ExecutionEnvironment context, TypeInformation<ST> type) {
private final DeltaIteration<ST, ?> deltaIteration;
private SolutionSetPlaceHolder(ExecutionEnvironment context, TypeInformation<ST> type, DeltaIteration<ST, ?> deltaIteration) {
super(context, type);
this.deltaIteration = deltaIteration;
}
public void checkJoinKeyFields(int[] keyFields) {
int[] ssKeys = deltaIteration.keys.computeLogicalKeyPositions();
if (!Arrays.equals(ssKeys, keyFields)) {
throw new InvalidProgramException("The solution set must be joind with using the keys with which elements are identified.");
}
}
}
......
......@@ -20,8 +20,10 @@ import java.util.Arrays;
import eu.stratosphere.api.common.InvalidProgramException;
import eu.stratosphere.api.common.operators.Operator;
import eu.stratosphere.api.java.DataSet;
import eu.stratosphere.api.java.DeltaIteration.SolutionSetPlaceHolder;
import eu.stratosphere.api.java.functions.JoinFunction;
import eu.stratosphere.api.java.functions.KeySelector;
import eu.stratosphere.api.java.operators.Keys.FieldPositionKeys;
import eu.stratosphere.api.java.operators.translation.KeyExtractingMapper;
import eu.stratosphere.api.java.operators.translation.PlanJoinOperator;
import eu.stratosphere.api.java.operators.translation.PlanMapOperator;
......@@ -688,6 +690,26 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
throw new InvalidProgramException("The pair of join keys are not compatible with each other.");
}
// sanity check solution set key mismatches
if (input1 instanceof SolutionSetPlaceHolder) {
if (keys1 instanceof FieldPositionKeys) {
int[] positions = ((FieldPositionKeys<?>) keys1).computeLogicalKeyPositions();
((SolutionSetPlaceHolder<?>) input1).checkJoinKeyFields(positions);
} else {
throw new InvalidProgramException("Currently, the solution set may only be joined with using tuple field positions.");
}
}
if (input2 instanceof SolutionSetPlaceHolder) {
if (keys2 instanceof FieldPositionKeys) {
int[] positions = ((FieldPositionKeys<?>) keys2).computeLogicalKeyPositions();
((SolutionSetPlaceHolder<?>) input2).checkJoinKeyFields(positions);
} else {
throw new InvalidProgramException("Currently, the solution set may only be joined with using tuple field positions.");
}
}
return new DefaultJoin<I1, I2>(input1, input2, keys1, keys2, joinHint);
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册