diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java index dbf102d9839710c0d192bb7db9c003ee2bf598ea..5cac5e7b8eeb1a46c00bcb2020054049d0e5f3bc 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java @@ -21,6 +21,8 @@ package org.apache.flink.test.checkpointing; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.accumulators.LongCounter; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; @@ -34,7 +36,6 @@ import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunctio import org.apache.flink.util.TestLogger; import org.hamcrest.Matchers; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -50,7 +51,6 @@ import static org.junit.Assert.assertThat; /** * Integration test for performing the unaligned checkpoint. */ -@Ignore("Unstable") public class UnalignedCheckpointITCase extends TestLogger { public static final String NUM_COMPLETED_CHECKPOINTS = "numCompletedCheckpoints"; @@ -106,6 +106,7 @@ public class UnalignedCheckpointITCase extends TestLogger { final LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(parallelism, conf); env.enableCheckpointing(100); env.getCheckpointConfig().enableUnalignedCheckpoints(); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, Time.milliseconds(100))); return env; } @@ -140,10 +141,12 @@ public class UnalignedCheckpointITCase extends TestLogger { public void run(SourceContext ctx) throws Exception { int counter = 0; while (running) { - ctx.collect(counter++); + synchronized (ctx.getCheckpointLock()) { + ctx.collect(counter++); - if (numCompletedCheckpoints.getLocalValue() >= minCheckpoints) { - cancel(); + if (numCompletedCheckpoints.getLocalValue() >= minCheckpoints) { + cancel(); + } } }