提交 6ab2a896 编写于 作者: R Roman Khachatryan 提交者: Piotr Nowojski

[hotfix][tests] Disable alignment timeout by default in UnalignedCheckpointITCase

Most of the bugs in UC are revealed with higher back-pressure which is not created with alignment timeout.
This change disables it by default and adds a new test (p=20) with the timeout enabled.
上级 bd403e2c
......@@ -119,6 +119,10 @@ public class UnalignedCheckpointITCase extends UnalignedCheckpointTestBase {
"parallel pipeline with mixed channels, p = 20",
createPipelineSettings(20, 10, true)
},
new Object[] {
"parallel pipeline with mixed channels, p = 20, timeout=1",
createPipelineSettings(20, 10, true, 1)
},
new Object[] {"Parallel cogroup, p = 5", createCogroupSettings(5)},
new Object[] {"Parallel cogroup, p = 10", createCogroupSettings(10)},
new Object[] {"Parallel union, p = 5", createUnionSettings(5)},
......@@ -128,6 +132,11 @@ public class UnalignedCheckpointITCase extends UnalignedCheckpointTestBase {
private static UnalignedSettings createPipelineSettings(
int parallelism, int slotsPerTaskManager, boolean slotSharing) {
return createPipelineSettings(parallelism, slotsPerTaskManager, slotSharing, 0);
}
private static UnalignedSettings createPipelineSettings(
int parallelism, int slotsPerTaskManager, boolean slotSharing, int timeout) {
int numShuffles = 4;
return new UnalignedSettings(UnalignedCheckpointITCase::createPipeline)
.setParallelism(parallelism)
......@@ -135,7 +144,8 @@ public class UnalignedCheckpointITCase extends UnalignedCheckpointTestBase {
.setNumSlots(slotSharing ? parallelism : parallelism * numShuffles)
.setNumBuffers(getNumBuffers(parallelism, numShuffles))
.setSlotsPerTaskManager(slotsPerTaskManager)
.setExpectedFailures(5);
.setExpectedFailures(5)
.setAlignmentTimeout(timeout);
}
private static UnalignedSettings createCogroupSettings(int parallelism) {
......
......@@ -516,6 +516,7 @@ public abstract class UnalignedCheckpointTestBase extends TestLogger {
private int numBuffers;
private int expectedFailures = 0;
private final DagCreator dagCreator;
private int alignmentTimeout = 0;
public UnalignedSettings(DagCreator dagCreator) {
this.dagCreator = dagCreator;
......@@ -561,6 +562,11 @@ public abstract class UnalignedCheckpointTestBase extends TestLogger {
return this;
}
public UnalignedSettings setAlignmentTimeout(int alignmentTimeout) {
this.alignmentTimeout = alignmentTimeout;
return this;
}
public StreamExecutionEnvironment createEnvironment(File checkpointDir) {
Configuration conf = new Configuration();
......@@ -591,7 +597,7 @@ public abstract class UnalignedCheckpointTestBase extends TestLogger {
final LocalStreamEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment(parallelism, conf);
env.enableCheckpointing(100);
env.getCheckpointConfig().setAlignmentTimeout(1);
env.getCheckpointConfig().setAlignmentTimeout(alignmentTimeout);
env.setParallelism(parallelism);
env.setRestartStrategy(
RestartStrategies.fixedDelayRestart(
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册