diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java index 8ee4973c96bcebf617e06e0c39d995f64a019a1f..6064899f35e520c53a630c4c7580d09a86b8e657 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java @@ -148,7 +148,8 @@ public class JobManagerHARecoveryTest { mySubmittedJobGraphStore, new StandaloneCheckpointRecoveryFactory(), new SavepointStore(new HeapStateStore()), - jobRecoveryTimeout); + jobRecoveryTimeout, + Option.apply(null)); jobManager = system.actorOf(jobManagerProps, "jobmanager"); ActorGateway gateway = new AkkaActorGateway(jobManager, leaderSessionID); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerMetricTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerMetricTest.java index 5759888bedb75caa775fe6607fbb26011d436a0d..077f3dc198961eefe40879e7a8286d1cc5297c3b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerMetricTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerMetricTest.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.jobmanager; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; @@ -74,7 +73,7 @@ public class JobManagerMetricTest { flink.submitJobDetached(jobGraph); Future jobRunning = flink.getLeaderGateway(deadline.timeLeft()) - .ask(new TestingJobManagerMessages.NotifyWhenJobStatus(jobGraph.getJobID(), JobStatus.RUNNING), deadline.timeLeft()); + .ask(new TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobGraph.getJobID()), deadline.timeLeft()); Await.ready(jobRunning, deadline.timeLeft()); MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java index afc46a7cb28750a63c1b66d9dc63106aa89dbc81..6b64d6f0e2fdbf3c7b6c24b3a2dbaeab611db324 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java @@ -52,6 +52,7 @@ import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import scala.Option; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; @@ -198,7 +199,8 @@ public class JobManagerLeaderElectionTest extends TestLogger { submittedJobGraphStore, checkpointRecoveryFactory, savepointStore, - AkkaUtils.getDefaultTimeout() + AkkaUtils.getDefaultTimeout(), + Option.apply(null) ); } }