From 6ab2a89619d797eee217b7eb4364a259931279d0 Mon Sep 17 00:00:00 2001 From: Roman Khachatryan Date: Mon, 28 Dec 2020 15:23:19 +0100 Subject: [PATCH] [hotfix][tests] Disable alignment timeout by default in UnalignedCheckpointITCase Most of the bugs in UC are revealed with higher back-pressure which is not created with alignment timeout. This change disables it by default and adds a new test (p=20) with the timeout enabled. --- .../checkpointing/UnalignedCheckpointITCase.java | 12 +++++++++++- .../checkpointing/UnalignedCheckpointTestBase.java | 8 +++++++- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java index 73a1458909e..903998ae62f 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java @@ -119,6 +119,10 @@ public class UnalignedCheckpointITCase extends UnalignedCheckpointTestBase { "parallel pipeline with mixed channels, p = 20", createPipelineSettings(20, 10, true) }, + new Object[] { + "parallel pipeline with mixed channels, p = 20, timeout=1", + createPipelineSettings(20, 10, true, 1) + }, new Object[] {"Parallel cogroup, p = 5", createCogroupSettings(5)}, new Object[] {"Parallel cogroup, p = 10", createCogroupSettings(10)}, new Object[] {"Parallel union, p = 5", createUnionSettings(5)}, @@ -128,6 +132,11 @@ public class UnalignedCheckpointITCase extends UnalignedCheckpointTestBase { private static UnalignedSettings createPipelineSettings( int parallelism, int slotsPerTaskManager, boolean slotSharing) { + return createPipelineSettings(parallelism, slotsPerTaskManager, slotSharing, 0); + } + + private static UnalignedSettings createPipelineSettings( + int parallelism, int slotsPerTaskManager, boolean slotSharing, int timeout) { int numShuffles = 4; return new UnalignedSettings(UnalignedCheckpointITCase::createPipeline) .setParallelism(parallelism) @@ -135,7 +144,8 @@ public class UnalignedCheckpointITCase extends UnalignedCheckpointTestBase { .setNumSlots(slotSharing ? parallelism : parallelism * numShuffles) .setNumBuffers(getNumBuffers(parallelism, numShuffles)) .setSlotsPerTaskManager(slotsPerTaskManager) - .setExpectedFailures(5); + .setExpectedFailures(5) + .setAlignmentTimeout(timeout); } private static UnalignedSettings createCogroupSettings(int parallelism) { diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java index bb5a9fab632..ec550209f37 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java @@ -516,6 +516,7 @@ public abstract class UnalignedCheckpointTestBase extends TestLogger { private int numBuffers; private int expectedFailures = 0; private final DagCreator dagCreator; + private int alignmentTimeout = 0; public UnalignedSettings(DagCreator dagCreator) { this.dagCreator = dagCreator; @@ -561,6 +562,11 @@ public abstract class UnalignedCheckpointTestBase extends TestLogger { return this; } + public UnalignedSettings setAlignmentTimeout(int alignmentTimeout) { + this.alignmentTimeout = alignmentTimeout; + return this; + } + public StreamExecutionEnvironment createEnvironment(File checkpointDir) { Configuration conf = new Configuration(); @@ -591,7 +597,7 @@ public abstract class UnalignedCheckpointTestBase extends TestLogger { final LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(parallelism, conf); env.enableCheckpointing(100); - env.getCheckpointConfig().setAlignmentTimeout(1); + env.getCheckpointConfig().setAlignmentTimeout(alignmentTimeout); env.setParallelism(parallelism); env.setRestartStrategy( RestartStrategies.fixedDelayRestart( -- GitLab