From 6420c1c264ed3ce0c32ba164c2cdb85ccdccf265 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Thu, 9 Jun 2016 11:37:14 +0200 Subject: [PATCH] [FLINK-3800] [runtime] Introduce SUSPENDED job status The SUSPENDED job status is a new ExecutionGraph state which can be reached from all non-terminal states when calling suspend on the ExecutionGraph. Unlike the FAILED, FINISHED and CANCELED state, the SUSPENDED state does not trigger the deletion of the job from the HA storage. Therefore, this state can be used to handle the loss of leadership or the shutdown of a JobManager so that the ExecutionGraph is stopped but can still be recovered. SUSPENDED is also a terminal state but it can be differentiated as a locally terminal state from FAILED, CANCELED and FINISHED which are globally terminal states. Add test case for suspend signal Add test case for suspending restarting job Add test case for HA job recovery when losing leadership Add online documentation for the job status Add ASF license header to job_status.svg Not throw exception when calling ExecutionGraph.restart and job is in state SUSPENDED This closes #2096. --- docs/internals/fig/job_status.svg | 973 ++++++++++++++++++ docs/internals/job_scheduling.md | 23 +- .../webmonitor/BackPressureStatsTracker.java | 4 +- .../handlers/JobDetailsHandler.java | 2 +- .../runtime/checkpoint/HeapStateStore.java | 2 +- .../executiongraph/ExecutionGraph.java | 80 +- .../restart/FixedDelayRestartStrategy.java | 8 +- .../restart/NoRestartStrategy.java | 3 - .../restart/RestartStrategy.java | 5 - .../flink/runtime/jobgraph/JobStatus.java | 38 +- .../runtime/webmonitor/WebMonitorUtils.java | 2 +- .../flink/runtime/jobmanager/JobManager.scala | 27 +- .../ExecutionGraphRestartTest.java | 121 +++ .../ExecutionGraphSignalsTest.java | 66 ++ .../jobmanager/JobManagerHARecoveryTest.java | 310 ++++++ .../LeaderChangeJobRecoveryTest.java | 10 +- .../LeaderChangeStateCleanupTest.java | 15 +- ...LeaderElectionRetrievalTestingCluster.java | 15 +- .../runtime/taskmanager/TaskCancelTest.java | 2 +- .../testutils/JobManagerActorTestUtils.java | 2 +- .../runtime/testingUtils/TestingCluster.scala | 4 - .../JobManagerCommunicationUtils.java | 2 +- .../JobManagerHAJobGraphRecoveryITCase.java | 51 +- .../apache/flink/yarn/YarnJobManager.scala | 2 +- 24 files changed, 1660 insertions(+), 107 deletions(-) create mode 100644 docs/internals/fig/job_status.svg create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java diff --git a/docs/internals/fig/job_status.svg b/docs/internals/fig/job_status.svg new file mode 100644 index 00000000000..a1093bcda1d --- /dev/null +++ b/docs/internals/fig/job_status.svg @@ -0,0 +1,973 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + image/svg+xml + + + + + + + + + Created + + + + Running + + + + Finished + + + + Failing + + + + Failed + + + + Cancelling + + + + Canceled + + + + Restarting + + + + Suspended + + + + + + + + + Schedule job + All job vertices in final state + All job vertices in final state + All job vertices in final state & not restartable + Fail job + Cancel job + + + + + + + + + + Restarted job + Suspend job + Suspend job + Suspend job + Suspend job + Cancel job + Fail job + Cancel job + All job verticesin final state &restartable + Fail job + + + diff --git a/docs/internals/job_scheduling.md b/docs/internals/job_scheduling.md index 1e1da97f0d5..9163678f074 100644 --- a/docs/internals/job_scheduling.md +++ b/docs/internals/job_scheduling.md @@ -74,7 +74,28 @@ Besides the vertices, the ExecutionGraph also contains the {% gh_link /flink-run JobGraph and ExecutionGraph -During its execution, each parallel task goes through multiple stages, from *created* to *finished* or *failed*. The diagram below illustrates the +Each ExecutionGraph has a job status associated with it. +This job status indicates the current state of the job execution. + +A Flink job is first in the *created* state, then switches to *running* and upon completion of all work it switches to *finished*. +In case of failures, a job switches first to *failing* where it cancels all running tasks. +If all job vertices have reached a final state and the job is not restartable, then the job transitions to *failed*. +If the job can be restarted, then it will enter the *restarting* state. +Once the job has been completely restarted, it will reach the *created* state. + +In case that the user cancels the job, it will go into the *cancelling* state. +This also entails the cancellation of all currently running tasks. +Once all running tasks have reached a final state, the job transitions to the state *cancelled*. + +Unlike the states *finished*, *canceled* and *failed* which denote a globally terminal state and, thus, trigger the clean up of the job, the *suspended* state is only locally terminal. +Locally terminal means that the execution of the job has been terminated on the respective JobManager but another JobManager of the Flink cluster can retrieve the job from the persistent HA store and restart it. +Consequently, a job which reaches the *suspended* state won't be completely cleaned up. + +
+States and Transitions of Flink job +
+ +During the execution of the ExecutionGraph, each parallel task goes through multiple stages, from *created* to *finished* or *failed*. The diagram below illustrates the states and possible transitions between them. A task may be executed multiple times (for example in the course of failure recovery). For that reason, the execution of an ExecutionVertex is tracked in an {% gh_link /flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java "Execution" %}. Each ExecutionVertex has a current Execution, and prior Executions. diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java index 34d8069bcc4..f890106c4fd 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java @@ -163,7 +163,7 @@ public class BackPressureStatsTracker { } if (!pendingStats.contains(vertex) && - !vertex.getGraph().getState().isTerminalState()) { + !vertex.getGraph().getState().isGloballyTerminalState()) { ExecutionContext executionContext = vertex.getGraph().getExecutionContext(); @@ -245,7 +245,7 @@ public class BackPressureStatsTracker { // Job finished, ignore. JobStatus jobState = vertex.getGraph().getState(); - if (jobState.isTerminalState()) { + if (jobState.isGloballyTerminalState()) { LOG.debug("Ignoring sample, because job is in state " + jobState + "."); } else if (success != null) { OperatorBackPressureStats stats = createStatsFromSample(success); diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java index 4f311285344..884b859f66f 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java @@ -66,7 +66,7 @@ public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler { // times and duration final long jobStartTime = graph.getStatusTimestamp(JobStatus.CREATED); - final long jobEndTime = graph.getState().isTerminalState() ? + final long jobEndTime = graph.getState().isGloballyTerminalState() ? graph.getStatusTimestamp(graph.getState()) : -1L; gen.writeNumberField("start-time", jobStartTime); gen.writeNumberField("end-time", jobEndTime); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/HeapStateStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/HeapStateStore.java index a0b3804110c..c679d50c0c2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/HeapStateStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/HeapStateStore.java @@ -31,7 +31,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * * @param Type of state */ -class HeapStateStore implements StateStore { +public class HeapStateStore implements StateStore { private final ConcurrentMap stateMap = new ConcurrentHashMap<>(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index b11f51d3475..e52188806e9 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -797,18 +797,50 @@ public class ExecutionGraph implements Serializable { } } - public void fail(Throwable t) { - if (t instanceof SuppressRestartsException) { - if (restartStrategy != null) { - // disable the restart strategy in case that we have seen a SuppressRestartsException - // it basically overrides the restart behaviour of a the root cause - restartStrategy.disable(); + /** + * Suspends the current ExecutionGraph. + * + * The JobStatus will be directly set to SUSPENDED iff the current state is not a terminal + * state. All ExecutionJobVertices will be canceled and the postRunCleanup is executed. + * + * The SUSPENDED state is a local terminal state which stops the execution of the job but does + * not remove the job from the HA job store so that it can be recovered by another JobManager. + * + * @param suspensionCause Cause of the suspension + */ + public void suspend(Throwable suspensionCause) { + while (true) { + JobStatus currentState = state; + + if (currentState.isGloballyTerminalState()) { + // stay in a terminal state + return; + } else if (transitionState(currentState, JobStatus.SUSPENDED, suspensionCause)) { + this.failureCause = suspensionCause; + + for (ExecutionJobVertex ejv: verticesInCreationOrder) { + ejv.cancel(); + } + + synchronized (progressLock) { + postRunCleanup(); + progressLock.notifyAll(); + + LOG.info("Job {} has been suspended.", getJobID()); + } + + return; } } + } + public void fail(Throwable t) { while (true) { JobStatus current = state; - if (current == JobStatus.FAILING || current.isTerminalState()) { + // stay in these states + if (current == JobStatus.FAILING || + current == JobStatus.SUSPENDED || + current.isGloballyTerminalState()) { return; } else if (current == JobStatus.RESTARTING && transitionState(current, JobStatus.FAILED, t)) { synchronized (progressLock) { @@ -849,6 +881,9 @@ public class ExecutionGraph implements Serializable { } else if (current == JobStatus.FAILED) { LOG.info("Failed job during restart. Aborting restart."); return; + } else if (current == JobStatus.SUSPENDED) { + LOG.info("Suspended job during restart. Aborting restart."); + return; } else if (current != JobStatus.RESTARTING) { throw new IllegalStateException("Can only restart job from state restarting."); } @@ -947,7 +982,7 @@ public class ExecutionGraph implements Serializable { * This method cleans fields that are irrelevant for the archived execution attempt. */ public void prepareForArchiving() { - if (!state.isTerminalState()) { + if (!state.isGloballyTerminalState()) { throw new IllegalStateException("Can only archive the job from a terminal state"); } @@ -984,7 +1019,7 @@ public class ExecutionGraph implements Serializable { */ public void waitUntilFinished() throws InterruptedException { synchronized (progressLock) { - while (!state.isTerminalState()) { + while (!state.isGloballyTerminalState()) { progressLock.wait(); } } @@ -1037,23 +1072,28 @@ public class ExecutionGraph implements Serializable { } } else if (current == JobStatus.FAILING) { - if (restartStrategy.canRestart() && transitionState(current, JobStatus.RESTARTING)) { - // double check in case that in the meantime a SuppressRestartsException was thrown - if (restartStrategy.canRestart()) { - restartStrategy.restart(this); - break; - } else { - fail(new Exception("ExecutionGraph went into RESTARTING state but " + - "then the restart strategy was disabled.")); - } - - } else if (!restartStrategy.canRestart() && transitionState(current, JobStatus.FAILED, failureCause)) { + boolean allowRestart = !(failureCause instanceof SuppressRestartsException); + + if (allowRestart && restartStrategy.canRestart() && transitionState(current, JobStatus.RESTARTING)) { + restartStrategy.restart(this); + break; + } else if ((!allowRestart || !restartStrategy.canRestart()) && transitionState(current, JobStatus.FAILED, failureCause)) { postRunCleanup(); break; } } + else if (current == JobStatus.SUSPENDED) { + // we've already cleaned up when entering the SUSPENDED state + break; + } + else if (current.isGloballyTerminalState()) { + LOG.warn("Job has entered globally terminal state without waiting for all " + + "job vertices to reach final state."); + break; + } else { fail(new Exception("ExecutionGraph went into final state from state " + current)); + break; } } // done transitioning the state diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java index 3406f4ee42f..ac06379b1bc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java @@ -41,7 +41,6 @@ public class FixedDelayRestartStrategy implements RestartStrategy { private final int maxNumberRestartAttempts; private final long delayBetweenRestartAttempts; private int currentRestartAttempt; - private boolean disabled = false; public FixedDelayRestartStrategy( int maxNumberRestartAttempts, @@ -61,7 +60,7 @@ public class FixedDelayRestartStrategy implements RestartStrategy { @Override public boolean canRestart() { - return !disabled && currentRestartAttempt < maxNumberRestartAttempts; + return currentRestartAttempt < maxNumberRestartAttempts; } @Override @@ -84,11 +83,6 @@ public class FixedDelayRestartStrategy implements RestartStrategy { }, executionGraph.getExecutionContext()); } - @Override - public void disable() { - disabled = true; - } - /** * Creates a FixedDelayRestartStrategy from the given Configuration. * diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java index 6cc5ee4b820..958d9ac0fab 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java @@ -36,9 +36,6 @@ public class NoRestartStrategy implements RestartStrategy { throw new RuntimeException("NoRestartStrategy does not support restart."); } - @Override - public void disable() {} - /** * Creates a NoRestartStrategy instance. * diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java index c9e6277e64c..2880c0128f2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java @@ -38,9 +38,4 @@ public interface RestartStrategy { * @param executionGraph The ExecutionGraph to be restarted */ void restart(ExecutionGraph executionGraph); - - /** - * Disables the restart strategy. - */ - void disable(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java index eb7d017c49a..52a2abe1b32 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java @@ -24,38 +24,52 @@ package org.apache.flink.runtime.jobgraph; public enum JobStatus { /** Job is newly created, no task has started to run. */ - CREATED(false), + CREATED(TerminalState.NON_TERMINAL), /** Some tasks are scheduled or running, some may be pending, some may be finished. */ - RUNNING(false), + RUNNING(TerminalState.NON_TERMINAL), /** The job has failed and is currently waiting for the cleanup to complete */ - FAILING(false), + FAILING(TerminalState.NON_TERMINAL), /** The job has failed with a non-recoverable task failure */ - FAILED(true), + FAILED(TerminalState.GLOBALLY), /** Job is being cancelled */ - CANCELLING(false), + CANCELLING(TerminalState.NON_TERMINAL), /** Job has been cancelled */ - CANCELED(true), + CANCELED(TerminalState.GLOBALLY), /** All of the job's tasks have successfully finished. */ - FINISHED(true), + FINISHED(TerminalState.GLOBALLY), /** The job is currently undergoing a reset and total restart */ - RESTARTING(false); + RESTARTING(TerminalState.NON_TERMINAL), + + /** + * The job has been suspended which means that it has been stopped but not been removed from a + * potential HA job store. + */ + SUSPENDED(TerminalState.LOCALLY); // -------------------------------------------------------------------------------------------- + + enum TerminalState { + NON_TERMINAL, + LOCALLY, + GLOBALLY + } - private final boolean terminalState; + private final TerminalState terminalState; - JobStatus(boolean terminalState) { + JobStatus(TerminalState terminalState) { this.terminalState = terminalState; } - public boolean isTerminalState() { - return terminalState; + public boolean isGloballyTerminalState() { + return terminalState == TerminalState.GLOBALLY; } } + + diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java index 6d89de013e5..37a91b3a548 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java @@ -168,7 +168,7 @@ public final class WebMonitorUtils { JobStatus status = job.getState(); long started = job.getStatusTimestamp(JobStatus.CREATED); - long finished = status.isTerminalState() ? job.getStatusTimestamp(status) : -1L; + long finished = status.isGloballyTerminalState() ? job.getStatusTimestamp(status) : -1L; int[] countsPerStatus = new int[ExecutionState.values().length]; long lastChanged = 0; diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 8ab887a5ccb..46f7ed21e8d 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -210,8 +210,7 @@ class JobManager( log.info(s"Stopping JobManager $getAddress.") val newFuturesToComplete = cancelAndClearEverything( - new Exception("The JobManager is shutting down."), - removeJobFromStateBackend = true) + new Exception("The JobManager is shutting down.")) implicit val executionContext = context.dispatcher @@ -307,8 +306,7 @@ class JobManager( log.info(s"JobManager ${self.path.toSerializationFormat} was revoked leadership.") val newFuturesToComplete = cancelAndClearEverything( - new Exception("JobManager is no longer the leader."), - removeJobFromStateBackend = false) + new Exception("JobManager is no longer the leader.")) futuresToComplete = Some(futuresToComplete.getOrElse(Seq()) ++ newFuturesToComplete) @@ -746,7 +744,7 @@ class JobManager( s"Status of job $jobID (${executionGraph.getJobName}) changed to $newJobStatus.", error) - if (newJobStatus.isTerminalState()) { + if (newJobStatus.isGloballyTerminalState()) { jobInfo.end = timeStamp future{ @@ -951,7 +949,7 @@ class JobManager( case RemoveCachedJob(jobID) => currentJobs.get(jobID) match { case Some((graph, info)) => - if (graph.getState.isTerminalState) { + if (graph.getState.isGloballyTerminalState) { removeJob(graph.getJobID, removeJobFromStateBackend = true) match { case Some(futureToComplete) => futuresToComplete = Some(futuresToComplete.getOrElse(Seq()) :+ futureToComplete) @@ -1632,23 +1630,11 @@ class JobManager( * * @param cause Cause for the cancelling. */ - private def cancelAndClearEverything( - cause: Throwable, - removeJobFromStateBackend: Boolean) + private def cancelAndClearEverything(cause: Throwable) : Seq[Future[Unit]] = { val futures = for ((jobID, (eg, jobInfo)) <- currentJobs) yield { future { - if (removeJobFromStateBackend) { - try { - submittedJobGraphs.removeJobGraph(jobID) - } - catch { - case t: Throwable => - log.error("Error during submitted job graph clean up.", t) - } - } - - eg.fail(cause) + eg.suspend(cause) if (jobInfo.listeningBehaviour != ListeningBehaviour.DETACHED) { jobInfo.client ! decorateMessage( @@ -1667,7 +1653,6 @@ class JobManager( } override def revokeLeadership(): Unit = { - leaderSessionID = None self ! decorateMessage(RevokeLeadership) } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java index 687a46afe50..26ba04fd647 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.executiongraph; +import akka.dispatch.Futures; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.ExecutionConfigTest; import org.apache.flink.api.common.JobID; @@ -40,10 +41,14 @@ import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.util.TestLogger; import org.junit.Test; +import scala.concurrent.Await; +import scala.concurrent.Future; import scala.concurrent.duration.Deadline; import scala.concurrent.duration.FiniteDuration; +import scala.concurrent.impl.Promise; import java.util.Iterator; +import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleActorGateway; @@ -667,6 +672,122 @@ public class ExecutionGraphRestartTest extends TestLogger { assertEquals(JobStatus.RESTARTING, eg.getState()); } + /** + * Tests that a suspend call while restarting a job, will abort the restarting. + * + * @throws Exception + */ + @Test + public void testSuspendWhileRestarting() throws Exception { + FiniteDuration timeout = new FiniteDuration(1, TimeUnit.MINUTES); + Deadline deadline = timeout.fromNow(); + + Instance instance = ExecutionGraphTestUtils.getInstance( + new SimpleActorGateway(TestingUtils.directExecutionContext()), + NUM_TASKS); + + Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext()); + scheduler.newInstanceAvailable(instance); + + JobVertex sender = new JobVertex("Task"); + sender.setInvokableClass(Tasks.NoOpInvokable.class); + sender.setParallelism(NUM_TASKS); + + JobGraph jobGraph = new JobGraph("Pointwise job", sender); + + ControllableRestartStrategy controllableRestartStrategy = new ControllableRestartStrategy(timeout); + + ExecutionGraph eg = new ExecutionGraph( + TestingUtils.defaultExecutionContext(), + new JobID(), + "Test job", + new Configuration(), + ExecutionConfigTest.getSerializedConfig(), + AkkaUtils.getDefaultTimeout(), + controllableRestartStrategy); + + eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources()); + + assertEquals(JobStatus.CREATED, eg.getState()); + + eg.scheduleForExecution(scheduler); + + assertEquals(JobStatus.RUNNING, eg.getState()); + + instance.markDead(); + + Await.ready(controllableRestartStrategy.getReachedCanRestart(), deadline.timeLeft()); + + assertEquals(JobStatus.RESTARTING, eg.getState()); + + eg.suspend(new Exception("Test exception")); + + assertEquals(JobStatus.SUSPENDED, eg.getState()); + + controllableRestartStrategy.unlockRestart(); + + Await.ready(controllableRestartStrategy.getRestartDone(), deadline.timeLeft()); + + assertEquals(JobStatus.SUSPENDED, eg.getState()); + } + + private static class ControllableRestartStrategy implements RestartStrategy { + + private Promise reachedCanRestart = new Promise.DefaultPromise<>(); + private Promise doRestart = new Promise.DefaultPromise<>(); + private Promise restartDone = new Promise.DefaultPromise<>(); + + private volatile Exception exception = null; + + private FiniteDuration timeout; + + public ControllableRestartStrategy(FiniteDuration timeout) { + this.timeout = timeout; + } + + public void unlockRestart() { + doRestart.success(true); + } + + public Exception getException() { + return exception; + } + + public Future getReachedCanRestart() { + return reachedCanRestart.future(); + } + + public Future getRestartDone() { + return restartDone.future(); + } + + @Override + public boolean canRestart() { + reachedCanRestart.success(true); + return true; + } + + @Override + public void restart(final ExecutionGraph executionGraph) { + Futures.future(new Callable() { + @Override + public Object call() throws Exception { + try { + + Await.ready(doRestart.future(), timeout); + executionGraph.restart(); + } catch (Exception e) { + exception = e; + } + + restartDone.success(true); + + return null; + } + }, TestingUtils.defaultExecutionContext()); + } + } + private static void restartAfterFailure(ExecutionGraph eg, FiniteDuration timeout, boolean haltAfterRestart) throws InterruptedException { eg.getAllExecutionVertices().iterator().next().fail(new Exception("Test Exception")); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java index 8b04fa39e9b..bcee5a10ad8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java @@ -196,7 +196,73 @@ public class ExecutionGraphSignalsTest { for (int i = 0; i < mockEJV.length; ++i) { verify(mockEJV[i], times(times)).cancel(); } + } + + /** + * Tests that suspend cancels the ExecutionJobVertices and transitions to SUSPENDED state. + * Tests also that one cannot leave the SUSPENDED state to enter a terminal state. + */ + @Test + public void testSuspend() throws Exception { + Assert.assertEquals(JobStatus.CREATED, eg.getState()); + Exception testException = new Exception("Test exception"); + + eg.suspend(testException); + + verifyCancel(1); + Assert.assertEquals(JobStatus.SUSPENDED, eg.getState()); + + f.set(eg, JobStatus.RUNNING); + + eg.suspend(testException); + + verifyCancel(2); + Assert.assertEquals(JobStatus.SUSPENDED, eg.getState()); + + f.set(eg, JobStatus.FAILING); + + eg.suspend(testException); + + verifyCancel(3); + Assert.assertEquals(JobStatus.SUSPENDED, eg.getState()); + + f.set(eg, JobStatus.CANCELLING); + + eg.suspend(testException); + + verifyCancel(4); + Assert.assertEquals(JobStatus.SUSPENDED, eg.getState()); + + f.set(eg, JobStatus.FAILED); + + eg.suspend(testException); + + verifyCancel(4); + Assert.assertEquals(JobStatus.FAILED, eg.getState()); + + f.set(eg, JobStatus.FINISHED); + + eg.suspend(testException); + + verifyCancel(4); + Assert.assertEquals(JobStatus.FINISHED, eg.getState()); + + f.set(eg, JobStatus.CANCELED); + + eg.suspend(testException); + + verifyCancel(4); + Assert.assertEquals(JobStatus.CANCELED, eg.getState()); + + f.set(eg, JobStatus.SUSPENDED); + + eg.fail(testException); + + Assert.assertEquals(JobStatus.SUSPENDED, eg.getState()); + + eg.cancel(); + Assert.assertEquals(JobStatus.SUSPENDED, eg.getState()); } // test that all source tasks receive STOP signal diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java new file mode 100644 index 00000000000..8ee4973c96b --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.PoisonPill; +import akka.actor.Props; +import akka.testkit.JavaTestKit; +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.akka.ListeningBehaviour; +import org.apache.flink.runtime.blob.BlobServer; +import org.apache.flink.runtime.checkpoint.HeapStateStore; +import org.apache.flink.runtime.checkpoint.SavepointStore; +import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; +import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.instance.AkkaActorGateway; +import org.apache.flink.runtime.instance.InstanceManager; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; +import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; +import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.taskmanager.TaskManager; +import org.apache.flink.runtime.testingUtils.TestingJobManager; +import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages; +import org.apache.flink.runtime.testingUtils.TestingMessages; +import org.apache.flink.runtime.testingUtils.TestingTaskManager; +import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import scala.Int; +import scala.Option; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.Deadline; +import scala.concurrent.duration.FiniteDuration; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class JobManagerHARecoveryTest { + + private static ActorSystem system; + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @BeforeClass + public static void setup() { + system = AkkaUtils.createLocalActorSystem(new Configuration()); + } + + @AfterClass + public static void teardown() { + JavaTestKit.shutdownActorSystem(system); + } + + /** + * Tests that the persisted job is not removed from the SubmittedJobGraphStore if the JobManager + * loses its leadership. Furthermore, it tests that the job manager can recover the job from + * the SubmittedJobGraphStore. + * + * @throws Exception + */ + @Test + public void testJobRecoveryWhenLosingLeadership() throws Exception { + + FiniteDuration timeout = new FiniteDuration(30, TimeUnit.SECONDS); + FiniteDuration jobRecoveryTimeout = new FiniteDuration(3, TimeUnit.SECONDS); + Deadline deadline = new FiniteDuration(2, TimeUnit.MINUTES).fromNow(); + Configuration flinkConfiguration = new Configuration(); + UUID leaderSessionID = UUID.randomUUID(); + UUID newLeaderSessionID = UUID.randomUUID(); + int slots = 2; + ActorRef archive = null; + ActorRef jobManager = null; + ActorRef taskManager = null; + + flinkConfiguration.setString(ConfigConstants.RECOVERY_MODE, "zookeeper"); + flinkConfiguration.setString(ConfigConstants.ZOOKEEPER_RECOVERY_PATH, temporaryFolder.newFolder().toString()); + flinkConfiguration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, slots); + + try { + + Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext()); + + MySubmittedJobGraphStore mySubmittedJobGraphStore = new MySubmittedJobGraphStore(); + TestingLeaderElectionService myLeaderElectionService = new TestingLeaderElectionService(); + TestingLeaderRetrievalService myLeaderRetrievalService = new TestingLeaderRetrievalService(); + + InstanceManager instanceManager = new InstanceManager(); + instanceManager.addInstanceListener(scheduler); + + archive = system.actorOf(Props.create( + MemoryArchivist.class, + 10), "archive"); + + Props jobManagerProps = Props.create( + TestingJobManager.class, + flinkConfiguration, + new ForkJoinPool(), + instanceManager, + scheduler, + new BlobLibraryCacheManager(new BlobServer(flinkConfiguration), 3600000), + archive, + new FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(Int.MaxValue(), 100), + timeout, + myLeaderElectionService, + mySubmittedJobGraphStore, + new StandaloneCheckpointRecoveryFactory(), + new SavepointStore(new HeapStateStore()), + jobRecoveryTimeout); + + jobManager = system.actorOf(jobManagerProps, "jobmanager"); + ActorGateway gateway = new AkkaActorGateway(jobManager, leaderSessionID); + + taskManager = TaskManager.startTaskManagerComponentsAndActor( + flinkConfiguration, + ResourceID.generate(), + system, + "localhost", + Option.apply("taskmanager"), + Option.apply((LeaderRetrievalService) myLeaderRetrievalService), + true, + TestingTaskManager.class); + + ActorGateway tmGateway = new AkkaActorGateway(taskManager, leaderSessionID); + + Future tmAlive = tmGateway.ask(TestingMessages.getAlive(), deadline.timeLeft()); + + Await.ready(tmAlive, deadline.timeLeft()); + + JobVertex sourceJobVertex = new JobVertex("Source"); + sourceJobVertex.setInvokableClass(BlockingInvokable.class); + sourceJobVertex.setParallelism(slots); + + JobGraph jobGraph = new JobGraph("TestingJob", sourceJobVertex); + + Future isLeader = gateway.ask( + TestingJobManagerMessages.getNotifyWhenLeader(), + deadline.timeLeft()); + + Future isConnectedToJobManager = tmGateway.ask( + new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager), + deadline.timeLeft()); + + // tell jobManager that he's the leader + myLeaderElectionService.isLeader(leaderSessionID); + // tell taskManager who's the leader + myLeaderRetrievalService.notifyListener(gateway.path(), leaderSessionID); + + Await.ready(isLeader, deadline.timeLeft()); + Await.ready(isConnectedToJobManager, deadline.timeLeft()); + + // submit blocking job + Future jobSubmitted = gateway.ask( + new JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.DETACHED), + deadline.timeLeft()); + + Await.ready(jobSubmitted, deadline.timeLeft()); + + Future jobRemoved = gateway.ask(new TestingJobManagerMessages.NotifyWhenJobRemoved(jobGraph.getJobID()), deadline.timeLeft()); + + // Revoke leadership + myLeaderElectionService.notLeader(); + + // check that the job gets removed from the JobManager + Await.ready(jobRemoved, deadline.timeLeft()); + // but stays in the submitted job graph store + assertTrue(mySubmittedJobGraphStore.contains(jobGraph.getJobID())); + + Future jobRunning = gateway.ask(new TestingJobManagerMessages.NotifyWhenJobStatus(jobGraph.getJobID(), JobStatus.RUNNING), deadline.timeLeft()); + + // Make JobManager again a leader + myLeaderElectionService.isLeader(newLeaderSessionID); + // tell the TaskManager about it + myLeaderRetrievalService.notifyListener(gateway.path(), newLeaderSessionID); + + // wait that the job is recovered and reaches state RUNNING + Await.ready(jobRunning, deadline.timeLeft()); + + Future jobFinished = gateway.ask(new TestingJobManagerMessages.NotifyWhenJobRemoved(jobGraph.getJobID()), deadline.timeLeft()); + + BlockingInvokable.unblock(); + + // wait til the job has finished + Await.ready(jobFinished, deadline.timeLeft()); + + // check that the job has been removed from the submitted job graph store + assertFalse(mySubmittedJobGraphStore.contains(jobGraph.getJobID())); + } finally { + if (archive != null) { + archive.tell(PoisonPill.getInstance(), ActorRef.noSender()); + } + + if (jobManager != null) { + jobManager.tell(PoisonPill.getInstance(), ActorRef.noSender()); + } + + if (taskManager != null) { + taskManager.tell(PoisonPill.getInstance(), ActorRef.noSender()); + } + } + } + + static class MySubmittedJobGraphStore implements SubmittedJobGraphStore { + Map storedJobs = new HashMap<>(); + + @Override + public void start(SubmittedJobGraphListener jobGraphListener) throws Exception { + + } + + @Override + public void stop() throws Exception { + + } + + @Override + public List recoverJobGraphs() throws Exception { + return new ArrayList<>(storedJobs.values()); + } + + @Override + public Option recoverJobGraph(JobID jobId) throws Exception { + if (storedJobs.containsKey(jobId)) { + return Option.apply(storedJobs.get(jobId)); + } else { + return Option.apply(null); + } + } + + @Override + public void putJobGraph(SubmittedJobGraph jobGraph) throws Exception { + storedJobs.put(jobGraph.getJobId(), jobGraph); + } + + @Override + public void removeJobGraph(JobID jobId) throws Exception { + storedJobs.remove(jobId); + } + + boolean contains(JobID jobId) { + return storedJobs.containsKey(jobId); + } + } + + public static class BlockingInvokable extends AbstractInvokable { + + private static boolean blocking = true; + private static Object lock = new Object(); + + @Override + public void invoke() throws Exception { + while(blocking) { + synchronized (lock) { + lock.wait(); + } + } + } + + public static void unblock() { + blocking = false; + + synchronized (lock) { + lock.notifyAll(); + } + } + } + +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java index ccd21562002..57de2cdc446 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java @@ -23,11 +23,11 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.executiongraph.ExecutionGraph; -import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.instance.AkkaActorGateway; import org.apache.flink.runtime.jobgraph.DistributionPattern; import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobmanager.Tasks; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; @@ -70,7 +70,11 @@ public class LeaderChangeJobRecoveryTest extends TestLogger { configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs); configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTM); - cluster = new LeaderElectionRetrievalTestingCluster(configuration, true, false, new FixedDelayRestartStrategy(9999, 100)); + configuration.setString(ConfigConstants.RESTART_STRATEGY, "fixeddelay"); + configuration.setInteger(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 9999); + configuration.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "100 milli"); + + cluster = new LeaderElectionRetrievalTestingCluster(configuration, true, false); cluster.start(false); // wait for actors to be alive so that they have started their leader retrieval service @@ -170,7 +174,7 @@ public class LeaderChangeJobRecoveryTest extends TestLogger { if (message instanceof ExecutionGraphMessages.JobStatusChanged) { ExecutionGraphMessages.JobStatusChanged jobStatusChanged = (ExecutionGraphMessages.JobStatusChanged) message; - if (jobStatusChanged.newJobStatus().isTerminalState()) { + if (jobStatusChanged.newJobStatus().isGloballyTerminalState() || jobStatusChanged.newJobStatus() == JobStatus.SUSPENDED) { terminalState.success(true); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java index 89f462c5d8d..19cc4442297 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.leaderelection; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.instance.ActorGateway; @@ -35,6 +34,8 @@ import org.apache.flink.util.TestLogger; import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; @@ -47,6 +48,8 @@ import static org.junit.Assert.*; public class LeaderChangeStateCleanupTest extends TestLogger { + private static Logger LOG = LoggerFactory.getLogger(LeaderChangeStateCleanupTest.class); + private static FiniteDuration timeout = TestingUtils.TESTING_DURATION(); private int numJMs = 2; @@ -68,7 +71,7 @@ public class LeaderChangeStateCleanupTest extends TestLogger { configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs); configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTM); - cluster = new LeaderElectionRetrievalTestingCluster(configuration, true, false, null); + cluster = new LeaderElectionRetrievalTestingCluster(configuration, true, false); cluster.start(false); // TaskManagers don't have to register at the JobManager cluster.waitForActorsToBeAlive(); // we only wait until all actors are alive @@ -225,11 +228,15 @@ public class LeaderChangeStateCleanupTest extends TestLogger { Future jobRemoval = jm.ask(new NotifyWhenJobRemoved(job.getJobID()), timeout); - // make JM(0) again the leader --> this implies first a leadership revokal + LOG.info("Make JM(0) again the leader. This should first revoke the leadership."); + + // make JM(0) again the leader --> this implies first a leadership revocation cluster.grantLeadership(0, newLeaderSessionID); Await.ready(jobRemoval, timeout); + LOG.info("Job removed."); + // The TMs should not be able to reconnect since they don't know the current leader // session ID try { @@ -239,6 +246,8 @@ public class LeaderChangeStateCleanupTest extends TestLogger { // expected exception since the TMs have still the old leader session ID } + LOG.info("Notify TMs about the new (old) leader."); + // notify the TMs about the new (old) leader cluster.notifyRetrievalListeners(0, newLeaderSessionID); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java index cd89fa69fea..e596166452c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java @@ -39,7 +39,6 @@ public class LeaderElectionRetrievalTestingCluster extends TestingCluster { private final Configuration userConfiguration; private final boolean useSingleActorSystem; - private final RestartStrategy restartStrategy; public List leaderElectionServices; public List leaderRetrievalServices; @@ -49,8 +48,7 @@ public class LeaderElectionRetrievalTestingCluster extends TestingCluster { public LeaderElectionRetrievalTestingCluster( Configuration userConfiguration, boolean singleActorSystem, - boolean synchronousDispatcher, - RestartStrategy restartStrategy) { + boolean synchronousDispatcher) { super(userConfiguration, singleActorSystem, synchronousDispatcher); this.userConfiguration = userConfiguration; @@ -58,8 +56,6 @@ public class LeaderElectionRetrievalTestingCluster extends TestingCluster { leaderElectionServices = new ArrayList(); leaderRetrievalServices = new ArrayList(); - - this.restartStrategy = restartStrategy; } @Override @@ -95,15 +91,6 @@ public class LeaderElectionRetrievalTestingCluster extends TestingCluster { ConfigConstants.DEFAULT_LOCAL_NUMBER_JOB_MANAGER); } - @Override - public RestartStrategy getRestartStrategy(RestartStrategy other) { - if (this.restartStrategy != null) { - return this.restartStrategy; - } else { - return other; - } - } - public void grantLeadership(int index, UUID leaderSessionID) { if(leaderIndex >= 0) { // first revoke leadership diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java index 690c042cca7..64b25e15edc 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java @@ -201,7 +201,7 @@ public class TaskCancelTest { if (status.status() == JobStatus.RUNNING) { return; } - else if (status.status().isTerminalState()) { + else if (status.status().isGloballyTerminalState()) { throw new Exception("JobStatus changed to " + status.status() + " while waiting for job to start running."); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerActorTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerActorTestUtils.java index 360cb1f485f..91114bab273 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerActorTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerActorTestUtils.java @@ -79,7 +79,7 @@ public class JobManagerActorTestUtils { if (jobStatus == expectedJobStatus) { return; } - else if (jobStatus.isTerminalState()) { + else if (jobStatus.isGloballyTerminalState()) { throw new IllegalStateException("Job is in terminal state " + jobStatus + ", " + "but was waiting for " + expectedJobStatus + "."); } diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala index 0c4ffb98570..763bd364d1b 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala @@ -186,10 +186,6 @@ class TestingCluster( None } - def getRestartStrategy(restartStrategy: RestartStrategy) = { - restartStrategy - } - @throws(classOf[TimeoutException]) @throws(classOf[InterruptedException]) def waitForTaskManagersToBeAlive(): Unit = { diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java index f2f761d8bff..028045afd12 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java @@ -86,7 +86,7 @@ public class JobManagerCommunicationUtils { if (status == null) { throw new Exception("Could not cancel job - no running jobs"); } - else if (status.getJobState().isTerminalState()) { + else if (status.getJobState().isGloballyTerminalState()) { throw new Exception("Could not cancel job - job is not running any more"); } diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java index b4ffbd4f224..5a10604fff4 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java @@ -25,7 +25,6 @@ import akka.actor.UntypedActor; import akka.testkit.TestActorRef; import org.apache.commons.io.FileUtils; import org.apache.commons.io.filefilter.TrueFileFilter; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; @@ -46,6 +45,7 @@ import org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage import org.apache.flink.runtime.messages.JobManagerMessages.SubmitJob; import org.apache.flink.runtime.taskmanager.TaskManager; import org.apache.flink.runtime.testingUtils.TestingCluster; +import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages; import org.apache.flink.runtime.testutils.CommonTestUtils; import org.apache.flink.runtime.testutils.JobManagerActorTestUtils; import org.apache.flink.runtime.testutils.JobManagerProcess; @@ -119,10 +119,10 @@ public class JobManagerHAJobGraphRecoveryITCase extends TestLogger { // --------------------------------------------------------------------------------------------- /** - * Tests that the recovery state is cleaned up after a JobManager stops. + * Tests that the HA job is not cleaned up when the jobmanager is stopped. */ @Test - public void testJobManagerCleanUp() throws Exception { + public void testJobPersistencyWhenJobManagerShutdown() throws Exception { Configuration config = ZooKeeperTestUtils.createZooKeeperRecoveryModeConfig( ZooKeeper.getConnectString(), FileStateBackendBasePath.getPath()); @@ -153,8 +153,9 @@ public class JobManagerHAJobGraphRecoveryITCase extends TestLogger { flink.shutdown(); } - // Verify that everything is clean - verifyCleanRecoveryState(config); + // verify that the persisted job data has not been removed from ZooKeeper when the JM has + // been shutdown + verifyRecoveryState(config); } /** @@ -225,6 +226,14 @@ public class JobManagerHAJobGraphRecoveryITCase extends TestLogger { if (!success) { fail("Non-leading JM was still holding reference to the job graph."); } + + Future jobRemoved = leadingJobManager.ask( + new TestingJobManagerMessages.NotifyWhenJobRemoved(jobGraph.getJobID()), + deadline.timeLeft()); + + leadingJobManager.tell(new JobManagerMessages.CancelJob(jobGraph.getJobID())); + + Await.ready(jobRemoved, deadline.timeLeft()); } finally { flink.shutdown(); @@ -482,4 +491,36 @@ public class JobManagerHAJobGraphRecoveryITCase extends TestLogger { } } + /** + * Fails the test if the recovery state (file state backend and ZooKeeper) has been cleaned. + */ + private static void verifyRecoveryState(Configuration config) throws Exception { + // File state backend empty + Collection stateHandles = FileUtils.listFiles( + FileStateBackendBasePath, TrueFileFilter.INSTANCE, TrueFileFilter.INSTANCE); + + if (stateHandles.isEmpty()) { + fail("File state backend has been cleaned: " + stateHandles); + } + + // ZooKeeper + String currentJobsPath = config.getString( + ConfigConstants.ZOOKEEPER_JOBGRAPHS_PATH, + ConfigConstants.DEFAULT_ZOOKEEPER_JOBGRAPHS_PATH); + + Stat stat = ZooKeeper.getClient().checkExists().forPath(currentJobsPath); + + if (stat.getCversion() == 0) { + // Sanity check: verify that some changes have been performed + fail("ZooKeeper state for '" + currentJobsPath + "' has not been modified during " + + "this test. What are you testing?"); + } + + if (stat.getNumChildren() == 0) { + // Children have been cleaned up? + fail("ZooKeeper path '" + currentJobsPath + "' has been cleaned: " + + ZooKeeper.getClient().getChildren().forPath(currentJobsPath)); + } + } + } diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala index f291c026d95..23b3adc1d7f 100644 --- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala +++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala @@ -150,7 +150,7 @@ class YarnJobManager( log.warn(s"Received job status for job ${jobStatus.jobID} but expected status for " + s"job $stopWhenJobFinished") } else { - if (jobStatus.status.isTerminalState) { + if (jobStatus.status.isGloballyTerminalState) { log.info(s"Job with ID ${jobStatus.jobID} is in terminal state ${jobStatus.status}. " + s"Shutting down YARN session") if (jobStatus.status == JobStatus.FINISHED) { -- GitLab