提交 e217a8c6 编写于 作者: I ifndef-SleePy 提交者: Piotr Nowojski

Revert "[hotfix] Harden ResumeCheckpointManuallyITCase"

This reverts commit 300263e1.
上级 6adbe94b
...@@ -29,7 +29,6 @@ import org.apache.flink.configuration.HighAvailabilityOptions; ...@@ -29,7 +29,6 @@ import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
...@@ -298,15 +297,13 @@ public class ResumeCheckpointManuallyITCase extends TestLogger { ...@@ -298,15 +297,13 @@ public class ResumeCheckpointManuallyITCase extends TestLogger {
private static String runJobAndGetExternalizedCheckpoint(StateBackend backend, File checkpointDir, @Nullable String externalCheckpoint, ClusterClient<?> client) throws Exception { private static String runJobAndGetExternalizedCheckpoint(StateBackend backend, File checkpointDir, @Nullable String externalCheckpoint, ClusterClient<?> client) throws Exception {
JobGraph initialJobGraph = getJobGraph(backend, externalCheckpoint); JobGraph initialJobGraph = getJobGraph(backend, externalCheckpoint);
NotifyingInfiniteTupleSource.countDownLatch = new CountDownLatch(PARALLELISM); NotifyingInfiniteTupleSource.countDownLatch = new CountDownLatch(PARALLELISM);
NotifyingInfiniteTupleSource.checkpointCompletedLatch = new CountDownLatch(PARALLELISM);
ClientUtils.submitJob(client, initialJobGraph); ClientUtils.submitJob(client, initialJobGraph);
// wait until all sources have been started // wait until all sources have been started
NotifyingInfiniteTupleSource.countDownLatch.await(); NotifyingInfiniteTupleSource.countDownLatch.await();
// wait the checkpoint completing
NotifyingInfiniteTupleSource.checkpointCompletedLatch.await();
waitUntilExternalizedCheckpointCreated(checkpointDir, initialJobGraph.getJobID());
client.cancel(initialJobGraph.getJobID()).get(); client.cancel(initialJobGraph.getJobID()).get();
waitUntilCanceled(initialJobGraph.getJobID(), client); waitUntilCanceled(initialJobGraph.getJobID(), client);
...@@ -322,6 +319,16 @@ public class ResumeCheckpointManuallyITCase extends TestLogger { ...@@ -322,6 +319,16 @@ public class ResumeCheckpointManuallyITCase extends TestLogger {
} }
} }
private static void waitUntilExternalizedCheckpointCreated(File checkpointDir, JobID jobId) throws InterruptedException, IOException {
while (true) {
Thread.sleep(50);
Optional<Path> externalizedCheckpoint = findExternalizedCheckpoint(checkpointDir, jobId);
if (externalizedCheckpoint.isPresent()) {
break;
}
}
}
private static Optional<Path> findExternalizedCheckpoint(File checkpointDir, JobID jobId) throws IOException { private static Optional<Path> findExternalizedCheckpoint(File checkpointDir, JobID jobId) throws IOException {
try (Stream<Path> checkpoints = Files.list(checkpointDir.toPath().resolve(jobId.toString()))) { try (Stream<Path> checkpoints = Files.list(checkpointDir.toPath().resolve(jobId.toString()))) {
return checkpoints return checkpoints
...@@ -373,16 +380,12 @@ public class ResumeCheckpointManuallyITCase extends TestLogger { ...@@ -373,16 +380,12 @@ public class ResumeCheckpointManuallyITCase extends TestLogger {
/** /**
* Infinite source which notifies when all of its sub tasks have been started via the count down latch. * Infinite source which notifies when all of its sub tasks have been started via the count down latch.
*/ */
public static class NotifyingInfiniteTupleSource public static class NotifyingInfiniteTupleSource extends ManualWindowSpeedITCase.InfiniteTupleSource {
extends ManualWindowSpeedITCase.InfiniteTupleSource
implements CheckpointListener {
private static final long serialVersionUID = 8120981235081181746L; private static final long serialVersionUID = 8120981235081181746L;
private static CountDownLatch countDownLatch; private static CountDownLatch countDownLatch;
private static CountDownLatch checkpointCompletedLatch;
public NotifyingInfiniteTupleSource(int numKeys) { public NotifyingInfiniteTupleSource(int numKeys) {
super(numKeys); super(numKeys);
} }
...@@ -395,12 +398,5 @@ public class ResumeCheckpointManuallyITCase extends TestLogger { ...@@ -395,12 +398,5 @@ public class ResumeCheckpointManuallyITCase extends TestLogger {
super.run(out); super.run(out);
} }
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
if (checkpointCompletedLatch != null) {
checkpointCompletedLatch.countDown();
}
}
} }
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册