提交 3002258f 编写于 作者: S Stephan Ewen

[FLINK-1018] Fix cross pipelining/daming info to resolve cross-related streaming deadlocks.

上级 ec0b00d6
......@@ -23,12 +23,13 @@ import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.IterativeDataSet;
import org.apache.flink.compiler.plan.BulkIterationPlanNode;
import org.apache.flink.compiler.plan.DualInputPlanNode;
import org.apache.flink.compiler.plan.OptimizedPlan;
import org.apache.flink.compiler.plan.SingleInputPlanNode;
import org.apache.flink.compiler.plan.SinkPlanNode;
import org.apache.flink.compiler.plandump.PlanJSONDumpGenerator;
import org.apache.flink.compiler.testfunctions.IdentityMapper;
import org.apache.flink.compiler.testfunctions.SelectOneReducer;
import org.apache.flink.configuration.Configuration;
@SuppressWarnings("serial")
public class PipelineBreakerTest extends CompilerTestBase {
......@@ -134,4 +135,104 @@ public class PipelineBreakerTest extends CompilerTestBase {
fail(e.getMessage());
}
}
@Test
public void testPilelineBreakerWithCross() {
try {
{
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setDegreeOfParallelism(64);
DataSet<Long> initialSource = env.generateSequence(1, 10);
Configuration conf= new Configuration();
conf.setString(PactCompiler.HINT_LOCAL_STRATEGY, PactCompiler.HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_FIRST);
initialSource
.map(new IdentityMapper<Long>())
.cross(initialSource).withParameters(conf)
.print();
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
SinkPlanNode sink = op.getDataSinks().iterator().next();
DualInputPlanNode mapper = (DualInputPlanNode) sink.getInput().getSource();
assertTrue(mapper.getInput1().getTempMode().breaksPipeline());
}
{
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setDegreeOfParallelism(64);
DataSet<Long> initialSource = env.generateSequence(1, 10);
Configuration conf= new Configuration();
conf.setString(PactCompiler.HINT_LOCAL_STRATEGY, PactCompiler.HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_SECOND);
initialSource
.map(new IdentityMapper<Long>())
.cross(initialSource).withParameters(conf)
.print();
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
SinkPlanNode sink = op.getDataSinks().iterator().next();
DualInputPlanNode mapper = (DualInputPlanNode) sink.getInput().getSource();
assertTrue(mapper.getInput2().getTempMode().breaksPipeline());
}
{
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setDegreeOfParallelism(64);
DataSet<Long> initialSource = env.generateSequence(1, 10);
Configuration conf= new Configuration();
conf.setString(PactCompiler.HINT_LOCAL_STRATEGY, PactCompiler.HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_FIRST);
initialSource
.map(new IdentityMapper<Long>())
.cross(initialSource).withParameters(conf)
.print();
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
SinkPlanNode sink = op.getDataSinks().iterator().next();
DualInputPlanNode mapper = (DualInputPlanNode) sink.getInput().getSource();
assertTrue(mapper.getInput1().getTempMode().breaksPipeline());
}
{
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setDegreeOfParallelism(64);
DataSet<Long> initialSource = env.generateSequence(1, 10);
Configuration conf= new Configuration();
conf.setString(PactCompiler.HINT_LOCAL_STRATEGY, PactCompiler.HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_SECOND);
initialSource
.map(new IdentityMapper<Long>())
.cross(initialSource).withParameters(conf)
.print();
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
SinkPlanNode sink = op.getDataSinks().iterator().next();
DualInputPlanNode mapper = (DualInputPlanNode) sink.getInput().getSource();
assertTrue(mapper.getInput2().getTempMode().breaksPipeline());
}
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
}
......@@ -80,13 +80,13 @@ public enum DriverStrategy {
HYBRIDHASH_BUILD_SECOND_CACHED(BuildSecondCachedMatchDriver.class, null, MATERIALIZING, FULL_DAM, true),
// the second input is inner loop, the first input is outer loop and block-wise processed
NESTEDLOOP_BLOCKED_OUTER_FIRST(CrossDriver.class, null, MATERIALIZING, MATERIALIZING, false),
NESTEDLOOP_BLOCKED_OUTER_FIRST(CrossDriver.class, null, MATERIALIZING, FULL_DAM, false),
// the first input is inner loop, the second input is outer loop and block-wise processed
NESTEDLOOP_BLOCKED_OUTER_SECOND(CrossDriver.class, null, MATERIALIZING, MATERIALIZING, false),
NESTEDLOOP_BLOCKED_OUTER_SECOND(CrossDriver.class, null, FULL_DAM, MATERIALIZING, false),
// the second input is inner loop, the first input is outer loop and stream-processed
NESTEDLOOP_STREAMED_OUTER_FIRST(CrossDriver.class, null, PIPELINED, MATERIALIZING, false),
NESTEDLOOP_STREAMED_OUTER_FIRST(CrossDriver.class, null, PIPELINED, FULL_DAM, false),
// the first input is inner loop, the second input is outer loop and stream-processed
NESTEDLOOP_STREAMED_OUTER_SECOND(CrossDriver.class, null, MATERIALIZING, PIPELINED, false),
NESTEDLOOP_STREAMED_OUTER_SECOND(CrossDriver.class, null, FULL_DAM, PIPELINED, false),
// union utility op. unions happen implicitly on the network layer (in the readers) when bundeling streams
UNION(null, null, FULL_DAM, FULL_DAM, false);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册