diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java index f9411372c7cd03b51eb8fdbce52b53f018112da2..ebd57a37a7c1691d210a8c5e5d9b3b6ef29da12c 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java @@ -29,7 +29,6 @@ import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; -import org.apache.flink.runtime.state.CheckpointListener; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; @@ -298,15 +297,13 @@ public class ResumeCheckpointManuallyITCase extends TestLogger { private static String runJobAndGetExternalizedCheckpoint(StateBackend backend, File checkpointDir, @Nullable String externalCheckpoint, ClusterClient client) throws Exception { JobGraph initialJobGraph = getJobGraph(backend, externalCheckpoint); NotifyingInfiniteTupleSource.countDownLatch = new CountDownLatch(PARALLELISM); - NotifyingInfiniteTupleSource.checkpointCompletedLatch = new CountDownLatch(PARALLELISM); ClientUtils.submitJob(client, initialJobGraph); // wait until all sources have been started NotifyingInfiniteTupleSource.countDownLatch.await(); - // wait the checkpoint completing - NotifyingInfiniteTupleSource.checkpointCompletedLatch.await(); + waitUntilExternalizedCheckpointCreated(checkpointDir, initialJobGraph.getJobID()); client.cancel(initialJobGraph.getJobID()).get(); waitUntilCanceled(initialJobGraph.getJobID(), client); @@ -322,6 +319,16 @@ public class ResumeCheckpointManuallyITCase extends TestLogger { } } + private static void waitUntilExternalizedCheckpointCreated(File checkpointDir, JobID jobId) throws InterruptedException, IOException { + while (true) { + Thread.sleep(50); + Optional externalizedCheckpoint = findExternalizedCheckpoint(checkpointDir, jobId); + if (externalizedCheckpoint.isPresent()) { + break; + } + } + } + private static Optional findExternalizedCheckpoint(File checkpointDir, JobID jobId) throws IOException { try (Stream checkpoints = Files.list(checkpointDir.toPath().resolve(jobId.toString()))) { return checkpoints @@ -373,16 +380,12 @@ public class ResumeCheckpointManuallyITCase extends TestLogger { /** * Infinite source which notifies when all of its sub tasks have been started via the count down latch. */ - public static class NotifyingInfiniteTupleSource - extends ManualWindowSpeedITCase.InfiniteTupleSource - implements CheckpointListener { + public static class NotifyingInfiniteTupleSource extends ManualWindowSpeedITCase.InfiniteTupleSource { private static final long serialVersionUID = 8120981235081181746L; private static CountDownLatch countDownLatch; - private static CountDownLatch checkpointCompletedLatch; - public NotifyingInfiniteTupleSource(int numKeys) { super(numKeys); } @@ -395,12 +398,5 @@ public class ResumeCheckpointManuallyITCase extends TestLogger { super.run(out); } - - @Override - public void notifyCheckpointComplete(long checkpointId) throws Exception { - if (checkpointCompletedLatch != null) { - checkpointCompletedLatch.countDown(); - } - } } }