提交 6c9aa290 编写于 作者: S Stephan Ewen

Add compiler test for solution sets depending on the delta set.

上级 8563d511
......@@ -38,6 +38,8 @@ import org.apache.flink.compiler.plan.BulkIterationPlanNode;
import org.apache.flink.compiler.plan.Channel;
import org.apache.flink.compiler.plan.OptimizedPlan;
import org.apache.flink.compiler.plan.WorksetIterationPlanNode;
import org.apache.flink.compiler.plandump.PlanJSONDumpGenerator;
import org.apache.flink.compiler.testfunctions.IdentityMapper;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.apache.flink.util.Collector;
......@@ -45,6 +47,37 @@ import org.apache.flink.util.Collector;
@SuppressWarnings({"serial", "unchecked"})
public class IterationsCompilerTest extends CompilerTestBase {
@Test
public void testSolutionSetDeltaDependsOnBroadcastVariable() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<Long, Long>> source =
env.generateSequence(1, 1000).map(new DuplicateValueScalar<Long>());
DataSet<Tuple2<Long, Long>> invariantInput =
env.generateSequence(1, 1000).map(new DuplicateValueScalar<Long>());
// iteration from here
DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iter = source.iterateDelta(source, 1000, 1);
DataSet<Tuple2<Long, Long>> result =
invariantInput
.map(new IdentityMapper<Tuple2<Long, Long>>()).withBroadcastSet(iter.getWorkset(), "bc data")
.join(iter.getSolutionSet()).where(0).equalTo(1).projectFirst(1).projectSecond(1).types(Long.class, Long.class);
iter.closeWith(result.map(new IdentityMapper<Tuple2<Long,Long>>()), result).print();
OptimizedPlan p = compileNoStats(env.createProgramPlan());
new PlanJSONDumpGenerator().getOptimizerPlanAsJSON(p);
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
@Test
public void testTwoIterationsWithMapperInbetween() throws Exception {
try {
......@@ -261,4 +294,12 @@ public class IterationsCompilerTest extends CompilerTestBase {
return new Tuple2<Long, Long>(value.f0, value.f0);
}
}
public static final class DuplicateValueScalar<T> extends MapFunction<T, Tuple2<T, T>> {
@Override
public Tuple2<T, T> map(T value) {
return new Tuple2<T, T>(value, value);
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册