提交 300263e1 编写于 作者: I ifndef-SleePy 提交者: Piotr Nowojski

[hotfix] Harden ResumeCheckpointManuallyITCase

The way of detecting external checkpoint is done or not is not strict.
The finalization of checkpoint might be interrupted by the cancallation.
上级 1a138626
......@@ -29,6 +29,7 @@ import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
......@@ -297,13 +298,15 @@ public class ResumeCheckpointManuallyITCase extends TestLogger {
private static String runJobAndGetExternalizedCheckpoint(StateBackend backend, File checkpointDir, @Nullable String externalCheckpoint, ClusterClient<?> client) throws Exception {
JobGraph initialJobGraph = getJobGraph(backend, externalCheckpoint);
NotifyingInfiniteTupleSource.countDownLatch = new CountDownLatch(PARALLELISM);
NotifyingInfiniteTupleSource.checkpointCompletedLatch = new CountDownLatch(PARALLELISM);
ClientUtils.submitJob(client, initialJobGraph);
// wait until all sources have been started
NotifyingInfiniteTupleSource.countDownLatch.await();
// wait the checkpoint completing
NotifyingInfiniteTupleSource.checkpointCompletedLatch.await();
waitUntilExternalizedCheckpointCreated(checkpointDir, initialJobGraph.getJobID());
client.cancel(initialJobGraph.getJobID()).get();
waitUntilCanceled(initialJobGraph.getJobID(), client);
......@@ -319,16 +322,6 @@ public class ResumeCheckpointManuallyITCase extends TestLogger {
}
}
private static void waitUntilExternalizedCheckpointCreated(File checkpointDir, JobID jobId) throws InterruptedException, IOException {
while (true) {
Thread.sleep(50);
Optional<Path> externalizedCheckpoint = findExternalizedCheckpoint(checkpointDir, jobId);
if (externalizedCheckpoint.isPresent()) {
break;
}
}
}
private static Optional<Path> findExternalizedCheckpoint(File checkpointDir, JobID jobId) throws IOException {
try (Stream<Path> checkpoints = Files.list(checkpointDir.toPath().resolve(jobId.toString()))) {
return checkpoints
......@@ -380,12 +373,16 @@ public class ResumeCheckpointManuallyITCase extends TestLogger {
/**
* Infinite source which notifies when all of its sub tasks have been started via the count down latch.
*/
public static class NotifyingInfiniteTupleSource extends ManualWindowSpeedITCase.InfiniteTupleSource {
public static class NotifyingInfiniteTupleSource
extends ManualWindowSpeedITCase.InfiniteTupleSource
implements CheckpointListener {
private static final long serialVersionUID = 8120981235081181746L;
private static CountDownLatch countDownLatch;
private static CountDownLatch checkpointCompletedLatch;
public NotifyingInfiniteTupleSource(int numKeys) {
super(numKeys);
}
......@@ -398,5 +395,12 @@ public class ResumeCheckpointManuallyITCase extends TestLogger {
super.run(out);
}
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
if (checkpointCompletedLatch != null) {
checkpointCompletedLatch.countDown();
}
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册