提交 6420c1c2 编写于 作者: T Till Rohrmann

[FLINK-3800] [runtime] Introduce SUSPENDED job status

The SUSPENDED job status is a new ExecutionGraph state which can be reached from all
non-terminal states when calling suspend on the ExecutionGraph. Unlike the FAILED,
FINISHED and CANCELED state, the SUSPENDED state does not trigger the deletion of the
job from the HA storage. Therefore, this state can be used to handle the loss of
leadership or the shutdown of a JobManager so that the ExecutionGraph is stopped but
can still be recovered. SUSPENDED is also a terminal state but it can be differentiated as
a locally terminal state from FAILED, CANCELED and FINISHED which are globally
terminal states.

Add test case for suspend signal

Add test case for suspending restarting job

Add test case for HA job recovery when losing leadership

Add online documentation for the job status

Add ASF license header to job_status.svg

Not throw exception when calling ExecutionGraph.restart and job is in state SUSPENDED

This closes #2096.
上级 cfe62934
此差异已折叠。
......@@ -74,7 +74,28 @@ Besides the vertices, the ExecutionGraph also contains the {% gh_link /flink-run
<img src="fig/job_and_execution_graph.svg" alt="JobGraph and ExecutionGraph" height="400px" style="text-align: center;"/>
</div>
During its execution, each parallel task goes through multiple stages, from *created* to *finished* or *failed*. The diagram below illustrates the
Each ExecutionGraph has a job status associated with it.
This job status indicates the current state of the job execution.
A Flink job is first in the *created* state, then switches to *running* and upon completion of all work it switches to *finished*.
In case of failures, a job switches first to *failing* where it cancels all running tasks.
If all job vertices have reached a final state and the job is not restartable, then the job transitions to *failed*.
If the job can be restarted, then it will enter the *restarting* state.
Once the job has been completely restarted, it will reach the *created* state.
In case that the user cancels the job, it will go into the *cancelling* state.
This also entails the cancellation of all currently running tasks.
Once all running tasks have reached a final state, the job transitions to the state *cancelled*.
Unlike the states *finished*, *canceled* and *failed* which denote a globally terminal state and, thus, trigger the clean up of the job, the *suspended* state is only locally terminal.
Locally terminal means that the execution of the job has been terminated on the respective JobManager but another JobManager of the Flink cluster can retrieve the job from the persistent HA store and restart it.
Consequently, a job which reaches the *suspended* state won't be completely cleaned up.
<div style="text-align: center;">
<img src="fig/job_status.svg" alt="States and Transitions of Flink job" height="500px" style="text-align: center;"/>
</div>
During the execution of the ExecutionGraph, each parallel task goes through multiple stages, from *created* to *finished* or *failed*. The diagram below illustrates the
states and possible transitions between them. A task may be executed multiple times (for example in the course of failure recovery).
For that reason, the execution of an ExecutionVertex is tracked in an {% gh_link /flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java "Execution" %}. Each ExecutionVertex has a current Execution, and prior Executions.
......
......@@ -163,7 +163,7 @@ public class BackPressureStatsTracker {
}
if (!pendingStats.contains(vertex) &&
!vertex.getGraph().getState().isTerminalState()) {
!vertex.getGraph().getState().isGloballyTerminalState()) {
ExecutionContext executionContext = vertex.getGraph().getExecutionContext();
......@@ -245,7 +245,7 @@ public class BackPressureStatsTracker {
// Job finished, ignore.
JobStatus jobState = vertex.getGraph().getState();
if (jobState.isTerminalState()) {
if (jobState.isGloballyTerminalState()) {
LOG.debug("Ignoring sample, because job is in state " + jobState + ".");
} else if (success != null) {
OperatorBackPressureStats stats = createStatsFromSample(success);
......
......@@ -66,7 +66,7 @@ public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler {
// times and duration
final long jobStartTime = graph.getStatusTimestamp(JobStatus.CREATED);
final long jobEndTime = graph.getState().isTerminalState() ?
final long jobEndTime = graph.getState().isGloballyTerminalState() ?
graph.getStatusTimestamp(graph.getState()) : -1L;
gen.writeNumberField("start-time", jobStartTime);
gen.writeNumberField("end-time", jobEndTime);
......
......@@ -31,7 +31,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
*
* @param <T> Type of state
*/
class HeapStateStore<T extends Serializable> implements StateStore<T> {
public class HeapStateStore<T extends Serializable> implements StateStore<T> {
private final ConcurrentMap<String, T> stateMap = new ConcurrentHashMap<>();
......
......@@ -797,18 +797,50 @@ public class ExecutionGraph implements Serializable {
}
}
public void fail(Throwable t) {
if (t instanceof SuppressRestartsException) {
if (restartStrategy != null) {
// disable the restart strategy in case that we have seen a SuppressRestartsException
// it basically overrides the restart behaviour of a the root cause
restartStrategy.disable();
/**
* Suspends the current ExecutionGraph.
*
* The JobStatus will be directly set to SUSPENDED iff the current state is not a terminal
* state. All ExecutionJobVertices will be canceled and the postRunCleanup is executed.
*
* The SUSPENDED state is a local terminal state which stops the execution of the job but does
* not remove the job from the HA job store so that it can be recovered by another JobManager.
*
* @param suspensionCause Cause of the suspension
*/
public void suspend(Throwable suspensionCause) {
while (true) {
JobStatus currentState = state;
if (currentState.isGloballyTerminalState()) {
// stay in a terminal state
return;
} else if (transitionState(currentState, JobStatus.SUSPENDED, suspensionCause)) {
this.failureCause = suspensionCause;
for (ExecutionJobVertex ejv: verticesInCreationOrder) {
ejv.cancel();
}
synchronized (progressLock) {
postRunCleanup();
progressLock.notifyAll();
LOG.info("Job {} has been suspended.", getJobID());
}
return;
}
}
}
public void fail(Throwable t) {
while (true) {
JobStatus current = state;
if (current == JobStatus.FAILING || current.isTerminalState()) {
// stay in these states
if (current == JobStatus.FAILING ||
current == JobStatus.SUSPENDED ||
current.isGloballyTerminalState()) {
return;
} else if (current == JobStatus.RESTARTING && transitionState(current, JobStatus.FAILED, t)) {
synchronized (progressLock) {
......@@ -849,6 +881,9 @@ public class ExecutionGraph implements Serializable {
} else if (current == JobStatus.FAILED) {
LOG.info("Failed job during restart. Aborting restart.");
return;
} else if (current == JobStatus.SUSPENDED) {
LOG.info("Suspended job during restart. Aborting restart.");
return;
} else if (current != JobStatus.RESTARTING) {
throw new IllegalStateException("Can only restart job from state restarting.");
}
......@@ -947,7 +982,7 @@ public class ExecutionGraph implements Serializable {
* This method cleans fields that are irrelevant for the archived execution attempt.
*/
public void prepareForArchiving() {
if (!state.isTerminalState()) {
if (!state.isGloballyTerminalState()) {
throw new IllegalStateException("Can only archive the job from a terminal state");
}
......@@ -984,7 +1019,7 @@ public class ExecutionGraph implements Serializable {
*/
public void waitUntilFinished() throws InterruptedException {
synchronized (progressLock) {
while (!state.isTerminalState()) {
while (!state.isGloballyTerminalState()) {
progressLock.wait();
}
}
......@@ -1037,23 +1072,28 @@ public class ExecutionGraph implements Serializable {
}
}
else if (current == JobStatus.FAILING) {
if (restartStrategy.canRestart() && transitionState(current, JobStatus.RESTARTING)) {
// double check in case that in the meantime a SuppressRestartsException was thrown
if (restartStrategy.canRestart()) {
restartStrategy.restart(this);
break;
} else {
fail(new Exception("ExecutionGraph went into RESTARTING state but " +
"then the restart strategy was disabled."));
}
} else if (!restartStrategy.canRestart() && transitionState(current, JobStatus.FAILED, failureCause)) {
boolean allowRestart = !(failureCause instanceof SuppressRestartsException);
if (allowRestart && restartStrategy.canRestart() && transitionState(current, JobStatus.RESTARTING)) {
restartStrategy.restart(this);
break;
} else if ((!allowRestart || !restartStrategy.canRestart()) && transitionState(current, JobStatus.FAILED, failureCause)) {
postRunCleanup();
break;
}
}
else if (current == JobStatus.SUSPENDED) {
// we've already cleaned up when entering the SUSPENDED state
break;
}
else if (current.isGloballyTerminalState()) {
LOG.warn("Job has entered globally terminal state without waiting for all " +
"job vertices to reach final state.");
break;
}
else {
fail(new Exception("ExecutionGraph went into final state from state " + current));
break;
}
}
// done transitioning the state
......
......@@ -41,7 +41,6 @@ public class FixedDelayRestartStrategy implements RestartStrategy {
private final int maxNumberRestartAttempts;
private final long delayBetweenRestartAttempts;
private int currentRestartAttempt;
private boolean disabled = false;
public FixedDelayRestartStrategy(
int maxNumberRestartAttempts,
......@@ -61,7 +60,7 @@ public class FixedDelayRestartStrategy implements RestartStrategy {
@Override
public boolean canRestart() {
return !disabled && currentRestartAttempt < maxNumberRestartAttempts;
return currentRestartAttempt < maxNumberRestartAttempts;
}
@Override
......@@ -84,11 +83,6 @@ public class FixedDelayRestartStrategy implements RestartStrategy {
}, executionGraph.getExecutionContext());
}
@Override
public void disable() {
disabled = true;
}
/**
* Creates a FixedDelayRestartStrategy from the given Configuration.
*
......
......@@ -36,9 +36,6 @@ public class NoRestartStrategy implements RestartStrategy {
throw new RuntimeException("NoRestartStrategy does not support restart.");
}
@Override
public void disable() {}
/**
* Creates a NoRestartStrategy instance.
*
......
......@@ -38,9 +38,4 @@ public interface RestartStrategy {
* @param executionGraph The ExecutionGraph to be restarted
*/
void restart(ExecutionGraph executionGraph);
/**
* Disables the restart strategy.
*/
void disable();
}
......@@ -24,38 +24,52 @@ package org.apache.flink.runtime.jobgraph;
public enum JobStatus {
/** Job is newly created, no task has started to run. */
CREATED(false),
CREATED(TerminalState.NON_TERMINAL),
/** Some tasks are scheduled or running, some may be pending, some may be finished. */
RUNNING(false),
RUNNING(TerminalState.NON_TERMINAL),
/** The job has failed and is currently waiting for the cleanup to complete */
FAILING(false),
FAILING(TerminalState.NON_TERMINAL),
/** The job has failed with a non-recoverable task failure */
FAILED(true),
FAILED(TerminalState.GLOBALLY),
/** Job is being cancelled */
CANCELLING(false),
CANCELLING(TerminalState.NON_TERMINAL),
/** Job has been cancelled */
CANCELED(true),
CANCELED(TerminalState.GLOBALLY),
/** All of the job's tasks have successfully finished. */
FINISHED(true),
FINISHED(TerminalState.GLOBALLY),
/** The job is currently undergoing a reset and total restart */
RESTARTING(false);
RESTARTING(TerminalState.NON_TERMINAL),
/**
* The job has been suspended which means that it has been stopped but not been removed from a
* potential HA job store.
*/
SUSPENDED(TerminalState.LOCALLY);
// --------------------------------------------------------------------------------------------
enum TerminalState {
NON_TERMINAL,
LOCALLY,
GLOBALLY
}
private final boolean terminalState;
private final TerminalState terminalState;
JobStatus(boolean terminalState) {
JobStatus(TerminalState terminalState) {
this.terminalState = terminalState;
}
public boolean isTerminalState() {
return terminalState;
public boolean isGloballyTerminalState() {
return terminalState == TerminalState.GLOBALLY;
}
}
......@@ -168,7 +168,7 @@ public final class WebMonitorUtils {
JobStatus status = job.getState();
long started = job.getStatusTimestamp(JobStatus.CREATED);
long finished = status.isTerminalState() ? job.getStatusTimestamp(status) : -1L;
long finished = status.isGloballyTerminalState() ? job.getStatusTimestamp(status) : -1L;
int[] countsPerStatus = new int[ExecutionState.values().length];
long lastChanged = 0;
......
......@@ -210,8 +210,7 @@ class JobManager(
log.info(s"Stopping JobManager $getAddress.")
val newFuturesToComplete = cancelAndClearEverything(
new Exception("The JobManager is shutting down."),
removeJobFromStateBackend = true)
new Exception("The JobManager is shutting down."))
implicit val executionContext = context.dispatcher
......@@ -307,8 +306,7 @@ class JobManager(
log.info(s"JobManager ${self.path.toSerializationFormat} was revoked leadership.")
val newFuturesToComplete = cancelAndClearEverything(
new Exception("JobManager is no longer the leader."),
removeJobFromStateBackend = false)
new Exception("JobManager is no longer the leader."))
futuresToComplete = Some(futuresToComplete.getOrElse(Seq()) ++ newFuturesToComplete)
......@@ -746,7 +744,7 @@ class JobManager(
s"Status of job $jobID (${executionGraph.getJobName}) changed to $newJobStatus.",
error)
if (newJobStatus.isTerminalState()) {
if (newJobStatus.isGloballyTerminalState()) {
jobInfo.end = timeStamp
future{
......@@ -951,7 +949,7 @@ class JobManager(
case RemoveCachedJob(jobID) =>
currentJobs.get(jobID) match {
case Some((graph, info)) =>
if (graph.getState.isTerminalState) {
if (graph.getState.isGloballyTerminalState) {
removeJob(graph.getJobID, removeJobFromStateBackend = true) match {
case Some(futureToComplete) =>
futuresToComplete = Some(futuresToComplete.getOrElse(Seq()) :+ futureToComplete)
......@@ -1632,23 +1630,11 @@ class JobManager(
*
* @param cause Cause for the cancelling.
*/
private def cancelAndClearEverything(
cause: Throwable,
removeJobFromStateBackend: Boolean)
private def cancelAndClearEverything(cause: Throwable)
: Seq[Future[Unit]] = {
val futures = for ((jobID, (eg, jobInfo)) <- currentJobs) yield {
future {
if (removeJobFromStateBackend) {
try {
submittedJobGraphs.removeJobGraph(jobID)
}
catch {
case t: Throwable =>
log.error("Error during submitted job graph clean up.", t)
}
}
eg.fail(cause)
eg.suspend(cause)
if (jobInfo.listeningBehaviour != ListeningBehaviour.DETACHED) {
jobInfo.client ! decorateMessage(
......@@ -1667,7 +1653,6 @@ class JobManager(
}
override def revokeLeadership(): Unit = {
leaderSessionID = None
self ! decorateMessage(RevokeLeadership)
}
......
......@@ -18,6 +18,7 @@
package org.apache.flink.runtime.executiongraph;
import akka.dispatch.Futures;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.ExecutionConfigTest;
import org.apache.flink.api.common.JobID;
......@@ -40,10 +41,14 @@ import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;
import scala.concurrent.impl.Promise;
import java.util.Iterator;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleActorGateway;
......@@ -667,6 +672,122 @@ public class ExecutionGraphRestartTest extends TestLogger {
assertEquals(JobStatus.RESTARTING, eg.getState());
}
/**
* Tests that a suspend call while restarting a job, will abort the restarting.
*
* @throws Exception
*/
@Test
public void testSuspendWhileRestarting() throws Exception {
FiniteDuration timeout = new FiniteDuration(1, TimeUnit.MINUTES);
Deadline deadline = timeout.fromNow();
Instance instance = ExecutionGraphTestUtils.getInstance(
new SimpleActorGateway(TestingUtils.directExecutionContext()),
NUM_TASKS);
Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
scheduler.newInstanceAvailable(instance);
JobVertex sender = new JobVertex("Task");
sender.setInvokableClass(Tasks.NoOpInvokable.class);
sender.setParallelism(NUM_TASKS);
JobGraph jobGraph = new JobGraph("Pointwise job", sender);
ControllableRestartStrategy controllableRestartStrategy = new ControllableRestartStrategy(timeout);
ExecutionGraph eg = new ExecutionGraph(
TestingUtils.defaultExecutionContext(),
new JobID(),
"Test job",
new Configuration(),
ExecutionConfigTest.getSerializedConfig(),
AkkaUtils.getDefaultTimeout(),
controllableRestartStrategy);
eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
assertEquals(JobStatus.CREATED, eg.getState());
eg.scheduleForExecution(scheduler);
assertEquals(JobStatus.RUNNING, eg.getState());
instance.markDead();
Await.ready(controllableRestartStrategy.getReachedCanRestart(), deadline.timeLeft());
assertEquals(JobStatus.RESTARTING, eg.getState());
eg.suspend(new Exception("Test exception"));
assertEquals(JobStatus.SUSPENDED, eg.getState());
controllableRestartStrategy.unlockRestart();
Await.ready(controllableRestartStrategy.getRestartDone(), deadline.timeLeft());
assertEquals(JobStatus.SUSPENDED, eg.getState());
}
private static class ControllableRestartStrategy implements RestartStrategy {
private Promise<Boolean> reachedCanRestart = new Promise.DefaultPromise<>();
private Promise<Boolean> doRestart = new Promise.DefaultPromise<>();
private Promise<Boolean> restartDone = new Promise.DefaultPromise<>();
private volatile Exception exception = null;
private FiniteDuration timeout;
public ControllableRestartStrategy(FiniteDuration timeout) {
this.timeout = timeout;
}
public void unlockRestart() {
doRestart.success(true);
}
public Exception getException() {
return exception;
}
public Future<Boolean> getReachedCanRestart() {
return reachedCanRestart.future();
}
public Future<Boolean> getRestartDone() {
return restartDone.future();
}
@Override
public boolean canRestart() {
reachedCanRestart.success(true);
return true;
}
@Override
public void restart(final ExecutionGraph executionGraph) {
Futures.future(new Callable<Object>() {
@Override
public Object call() throws Exception {
try {
Await.ready(doRestart.future(), timeout);
executionGraph.restart();
} catch (Exception e) {
exception = e;
}
restartDone.success(true);
return null;
}
}, TestingUtils.defaultExecutionContext());
}
}
private static void restartAfterFailure(ExecutionGraph eg, FiniteDuration timeout, boolean haltAfterRestart) throws InterruptedException {
eg.getAllExecutionVertices().iterator().next().fail(new Exception("Test Exception"));
......
......@@ -196,7 +196,73 @@ public class ExecutionGraphSignalsTest {
for (int i = 0; i < mockEJV.length; ++i) {
verify(mockEJV[i], times(times)).cancel();
}
}
/**
* Tests that suspend cancels the ExecutionJobVertices and transitions to SUSPENDED state.
* Tests also that one cannot leave the SUSPENDED state to enter a terminal state.
*/
@Test
public void testSuspend() throws Exception {
Assert.assertEquals(JobStatus.CREATED, eg.getState());
Exception testException = new Exception("Test exception");
eg.suspend(testException);
verifyCancel(1);
Assert.assertEquals(JobStatus.SUSPENDED, eg.getState());
f.set(eg, JobStatus.RUNNING);
eg.suspend(testException);
verifyCancel(2);
Assert.assertEquals(JobStatus.SUSPENDED, eg.getState());
f.set(eg, JobStatus.FAILING);
eg.suspend(testException);
verifyCancel(3);
Assert.assertEquals(JobStatus.SUSPENDED, eg.getState());
f.set(eg, JobStatus.CANCELLING);
eg.suspend(testException);
verifyCancel(4);
Assert.assertEquals(JobStatus.SUSPENDED, eg.getState());
f.set(eg, JobStatus.FAILED);
eg.suspend(testException);
verifyCancel(4);
Assert.assertEquals(JobStatus.FAILED, eg.getState());
f.set(eg, JobStatus.FINISHED);
eg.suspend(testException);
verifyCancel(4);
Assert.assertEquals(JobStatus.FINISHED, eg.getState());
f.set(eg, JobStatus.CANCELED);
eg.suspend(testException);
verifyCancel(4);
Assert.assertEquals(JobStatus.CANCELED, eg.getState());
f.set(eg, JobStatus.SUSPENDED);
eg.fail(testException);
Assert.assertEquals(JobStatus.SUSPENDED, eg.getState());
eg.cancel();
Assert.assertEquals(JobStatus.SUSPENDED, eg.getState());
}
// test that all source tasks receive STOP signal
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.jobmanager;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.testkit.JavaTestKit;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.akka.ListeningBehaviour;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.checkpoint.HeapStateStore;
import org.apache.flink.runtime.checkpoint.SavepointStore;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.instance.InstanceManager;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.taskmanager.TaskManager;
import org.apache.flink.runtime.testingUtils.TestingJobManager;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingMessages;
import org.apache.flink.runtime.testingUtils.TestingTaskManager;
import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import scala.Int;
import scala.Option;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class JobManagerHARecoveryTest {
private static ActorSystem system;
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
@BeforeClass
public static void setup() {
system = AkkaUtils.createLocalActorSystem(new Configuration());
}
@AfterClass
public static void teardown() {
JavaTestKit.shutdownActorSystem(system);
}
/**
* Tests that the persisted job is not removed from the SubmittedJobGraphStore if the JobManager
* loses its leadership. Furthermore, it tests that the job manager can recover the job from
* the SubmittedJobGraphStore.
*
* @throws Exception
*/
@Test
public void testJobRecoveryWhenLosingLeadership() throws Exception {
FiniteDuration timeout = new FiniteDuration(30, TimeUnit.SECONDS);
FiniteDuration jobRecoveryTimeout = new FiniteDuration(3, TimeUnit.SECONDS);
Deadline deadline = new FiniteDuration(2, TimeUnit.MINUTES).fromNow();
Configuration flinkConfiguration = new Configuration();
UUID leaderSessionID = UUID.randomUUID();
UUID newLeaderSessionID = UUID.randomUUID();
int slots = 2;
ActorRef archive = null;
ActorRef jobManager = null;
ActorRef taskManager = null;
flinkConfiguration.setString(ConfigConstants.RECOVERY_MODE, "zookeeper");
flinkConfiguration.setString(ConfigConstants.ZOOKEEPER_RECOVERY_PATH, temporaryFolder.newFolder().toString());
flinkConfiguration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, slots);
try {
Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
MySubmittedJobGraphStore mySubmittedJobGraphStore = new MySubmittedJobGraphStore();
TestingLeaderElectionService myLeaderElectionService = new TestingLeaderElectionService();
TestingLeaderRetrievalService myLeaderRetrievalService = new TestingLeaderRetrievalService();
InstanceManager instanceManager = new InstanceManager();
instanceManager.addInstanceListener(scheduler);
archive = system.actorOf(Props.create(
MemoryArchivist.class,
10), "archive");
Props jobManagerProps = Props.create(
TestingJobManager.class,
flinkConfiguration,
new ForkJoinPool(),
instanceManager,
scheduler,
new BlobLibraryCacheManager(new BlobServer(flinkConfiguration), 3600000),
archive,
new FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(Int.MaxValue(), 100),
timeout,
myLeaderElectionService,
mySubmittedJobGraphStore,
new StandaloneCheckpointRecoveryFactory(),
new SavepointStore(new HeapStateStore()),
jobRecoveryTimeout);
jobManager = system.actorOf(jobManagerProps, "jobmanager");
ActorGateway gateway = new AkkaActorGateway(jobManager, leaderSessionID);
taskManager = TaskManager.startTaskManagerComponentsAndActor(
flinkConfiguration,
ResourceID.generate(),
system,
"localhost",
Option.apply("taskmanager"),
Option.apply((LeaderRetrievalService) myLeaderRetrievalService),
true,
TestingTaskManager.class);
ActorGateway tmGateway = new AkkaActorGateway(taskManager, leaderSessionID);
Future<Object> tmAlive = tmGateway.ask(TestingMessages.getAlive(), deadline.timeLeft());
Await.ready(tmAlive, deadline.timeLeft());
JobVertex sourceJobVertex = new JobVertex("Source");
sourceJobVertex.setInvokableClass(BlockingInvokable.class);
sourceJobVertex.setParallelism(slots);
JobGraph jobGraph = new JobGraph("TestingJob", sourceJobVertex);
Future<Object> isLeader = gateway.ask(
TestingJobManagerMessages.getNotifyWhenLeader(),
deadline.timeLeft());
Future<Object> isConnectedToJobManager = tmGateway.ask(
new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager),
deadline.timeLeft());
// tell jobManager that he's the leader
myLeaderElectionService.isLeader(leaderSessionID);
// tell taskManager who's the leader
myLeaderRetrievalService.notifyListener(gateway.path(), leaderSessionID);
Await.ready(isLeader, deadline.timeLeft());
Await.ready(isConnectedToJobManager, deadline.timeLeft());
// submit blocking job
Future<Object> jobSubmitted = gateway.ask(
new JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.DETACHED),
deadline.timeLeft());
Await.ready(jobSubmitted, deadline.timeLeft());
Future<Object> jobRemoved = gateway.ask(new TestingJobManagerMessages.NotifyWhenJobRemoved(jobGraph.getJobID()), deadline.timeLeft());
// Revoke leadership
myLeaderElectionService.notLeader();
// check that the job gets removed from the JobManager
Await.ready(jobRemoved, deadline.timeLeft());
// but stays in the submitted job graph store
assertTrue(mySubmittedJobGraphStore.contains(jobGraph.getJobID()));
Future<Object> jobRunning = gateway.ask(new TestingJobManagerMessages.NotifyWhenJobStatus(jobGraph.getJobID(), JobStatus.RUNNING), deadline.timeLeft());
// Make JobManager again a leader
myLeaderElectionService.isLeader(newLeaderSessionID);
// tell the TaskManager about it
myLeaderRetrievalService.notifyListener(gateway.path(), newLeaderSessionID);
// wait that the job is recovered and reaches state RUNNING
Await.ready(jobRunning, deadline.timeLeft());
Future<Object> jobFinished = gateway.ask(new TestingJobManagerMessages.NotifyWhenJobRemoved(jobGraph.getJobID()), deadline.timeLeft());
BlockingInvokable.unblock();
// wait til the job has finished
Await.ready(jobFinished, deadline.timeLeft());
// check that the job has been removed from the submitted job graph store
assertFalse(mySubmittedJobGraphStore.contains(jobGraph.getJobID()));
} finally {
if (archive != null) {
archive.tell(PoisonPill.getInstance(), ActorRef.noSender());
}
if (jobManager != null) {
jobManager.tell(PoisonPill.getInstance(), ActorRef.noSender());
}
if (taskManager != null) {
taskManager.tell(PoisonPill.getInstance(), ActorRef.noSender());
}
}
}
static class MySubmittedJobGraphStore implements SubmittedJobGraphStore {
Map<JobID, SubmittedJobGraph> storedJobs = new HashMap<>();
@Override
public void start(SubmittedJobGraphListener jobGraphListener) throws Exception {
}
@Override
public void stop() throws Exception {
}
@Override
public List<SubmittedJobGraph> recoverJobGraphs() throws Exception {
return new ArrayList<>(storedJobs.values());
}
@Override
public Option<SubmittedJobGraph> recoverJobGraph(JobID jobId) throws Exception {
if (storedJobs.containsKey(jobId)) {
return Option.apply(storedJobs.get(jobId));
} else {
return Option.apply(null);
}
}
@Override
public void putJobGraph(SubmittedJobGraph jobGraph) throws Exception {
storedJobs.put(jobGraph.getJobId(), jobGraph);
}
@Override
public void removeJobGraph(JobID jobId) throws Exception {
storedJobs.remove(jobId);
}
boolean contains(JobID jobId) {
return storedJobs.containsKey(jobId);
}
}
public static class BlockingInvokable extends AbstractInvokable {
private static boolean blocking = true;
private static Object lock = new Object();
@Override
public void invoke() throws Exception {
while(blocking) {
synchronized (lock) {
lock.wait();
}
}
}
public static void unblock() {
blocking = false;
synchronized (lock) {
lock.notifyAll();
}
}
}
}
......@@ -23,11 +23,11 @@ import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.Tasks;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
......@@ -70,7 +70,11 @@ public class LeaderChangeJobRecoveryTest extends TestLogger {
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTM);
cluster = new LeaderElectionRetrievalTestingCluster(configuration, true, false, new FixedDelayRestartStrategy(9999, 100));
configuration.setString(ConfigConstants.RESTART_STRATEGY, "fixeddelay");
configuration.setInteger(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 9999);
configuration.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "100 milli");
cluster = new LeaderElectionRetrievalTestingCluster(configuration, true, false);
cluster.start(false);
// wait for actors to be alive so that they have started their leader retrieval service
......@@ -170,7 +174,7 @@ public class LeaderChangeJobRecoveryTest extends TestLogger {
if (message instanceof ExecutionGraphMessages.JobStatusChanged) {
ExecutionGraphMessages.JobStatusChanged jobStatusChanged = (ExecutionGraphMessages.JobStatusChanged) message;
if (jobStatusChanged.newJobStatus().isTerminalState()) {
if (jobStatusChanged.newJobStatus().isGloballyTerminalState() || jobStatusChanged.newJobStatus() == JobStatus.SUSPENDED) {
terminalState.success(true);
}
}
......
......@@ -18,7 +18,6 @@
package org.apache.flink.runtime.leaderelection;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.instance.ActorGateway;
......@@ -35,6 +34,8 @@ import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
......@@ -47,6 +48,8 @@ import static org.junit.Assert.*;
public class LeaderChangeStateCleanupTest extends TestLogger {
private static Logger LOG = LoggerFactory.getLogger(LeaderChangeStateCleanupTest.class);
private static FiniteDuration timeout = TestingUtils.TESTING_DURATION();
private int numJMs = 2;
......@@ -68,7 +71,7 @@ public class LeaderChangeStateCleanupTest extends TestLogger {
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTM);
cluster = new LeaderElectionRetrievalTestingCluster(configuration, true, false, null);
cluster = new LeaderElectionRetrievalTestingCluster(configuration, true, false);
cluster.start(false); // TaskManagers don't have to register at the JobManager
cluster.waitForActorsToBeAlive(); // we only wait until all actors are alive
......@@ -225,11 +228,15 @@ public class LeaderChangeStateCleanupTest extends TestLogger {
Future<Object> jobRemoval = jm.ask(new NotifyWhenJobRemoved(job.getJobID()), timeout);
// make JM(0) again the leader --> this implies first a leadership revokal
LOG.info("Make JM(0) again the leader. This should first revoke the leadership.");
// make JM(0) again the leader --> this implies first a leadership revocation
cluster.grantLeadership(0, newLeaderSessionID);
Await.ready(jobRemoval, timeout);
LOG.info("Job removed.");
// The TMs should not be able to reconnect since they don't know the current leader
// session ID
try {
......@@ -239,6 +246,8 @@ public class LeaderChangeStateCleanupTest extends TestLogger {
// expected exception since the TMs have still the old leader session ID
}
LOG.info("Notify TMs about the new (old) leader.");
// notify the TMs about the new (old) leader
cluster.notifyRetrievalListeners(0, newLeaderSessionID);
......
......@@ -39,7 +39,6 @@ public class LeaderElectionRetrievalTestingCluster extends TestingCluster {
private final Configuration userConfiguration;
private final boolean useSingleActorSystem;
private final RestartStrategy restartStrategy;
public List<TestingLeaderElectionService> leaderElectionServices;
public List<TestingLeaderRetrievalService> leaderRetrievalServices;
......@@ -49,8 +48,7 @@ public class LeaderElectionRetrievalTestingCluster extends TestingCluster {
public LeaderElectionRetrievalTestingCluster(
Configuration userConfiguration,
boolean singleActorSystem,
boolean synchronousDispatcher,
RestartStrategy restartStrategy) {
boolean synchronousDispatcher) {
super(userConfiguration, singleActorSystem, synchronousDispatcher);
this.userConfiguration = userConfiguration;
......@@ -58,8 +56,6 @@ public class LeaderElectionRetrievalTestingCluster extends TestingCluster {
leaderElectionServices = new ArrayList<TestingLeaderElectionService>();
leaderRetrievalServices = new ArrayList<TestingLeaderRetrievalService>();
this.restartStrategy = restartStrategy;
}
@Override
......@@ -95,15 +91,6 @@ public class LeaderElectionRetrievalTestingCluster extends TestingCluster {
ConfigConstants.DEFAULT_LOCAL_NUMBER_JOB_MANAGER);
}
@Override
public RestartStrategy getRestartStrategy(RestartStrategy other) {
if (this.restartStrategy != null) {
return this.restartStrategy;
} else {
return other;
}
}
public void grantLeadership(int index, UUID leaderSessionID) {
if(leaderIndex >= 0) {
// first revoke leadership
......
......@@ -201,7 +201,7 @@ public class TaskCancelTest {
if (status.status() == JobStatus.RUNNING) {
return;
}
else if (status.status().isTerminalState()) {
else if (status.status().isGloballyTerminalState()) {
throw new Exception("JobStatus changed to " + status.status()
+ " while waiting for job to start running.");
}
......
......@@ -79,7 +79,7 @@ public class JobManagerActorTestUtils {
if (jobStatus == expectedJobStatus) {
return;
}
else if (jobStatus.isTerminalState()) {
else if (jobStatus.isGloballyTerminalState()) {
throw new IllegalStateException("Job is in terminal state " + jobStatus + ", "
+ "but was waiting for " + expectedJobStatus + ".");
}
......
......@@ -186,10 +186,6 @@ class TestingCluster(
None
}
def getRestartStrategy(restartStrategy: RestartStrategy) = {
restartStrategy
}
@throws(classOf[TimeoutException])
@throws(classOf[InterruptedException])
def waitForTaskManagersToBeAlive(): Unit = {
......
......@@ -86,7 +86,7 @@ public class JobManagerCommunicationUtils {
if (status == null) {
throw new Exception("Could not cancel job - no running jobs");
}
else if (status.getJobState().isTerminalState()) {
else if (status.getJobState().isGloballyTerminalState()) {
throw new Exception("Could not cancel job - job is not running any more");
}
......
......@@ -25,7 +25,6 @@ import akka.actor.UntypedActor;
import akka.testkit.TestActorRef;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.filefilter.TrueFileFilter;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
......@@ -46,6 +45,7 @@ import org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage
import org.apache.flink.runtime.messages.JobManagerMessages.SubmitJob;
import org.apache.flink.runtime.taskmanager.TaskManager;
import org.apache.flink.runtime.testingUtils.TestingCluster;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.JobManagerActorTestUtils;
import org.apache.flink.runtime.testutils.JobManagerProcess;
......@@ -119,10 +119,10 @@ public class JobManagerHAJobGraphRecoveryITCase extends TestLogger {
// ---------------------------------------------------------------------------------------------
/**
* Tests that the recovery state is cleaned up after a JobManager stops.
* Tests that the HA job is not cleaned up when the jobmanager is stopped.
*/
@Test
public void testJobManagerCleanUp() throws Exception {
public void testJobPersistencyWhenJobManagerShutdown() throws Exception {
Configuration config = ZooKeeperTestUtils.createZooKeeperRecoveryModeConfig(
ZooKeeper.getConnectString(), FileStateBackendBasePath.getPath());
......@@ -153,8 +153,9 @@ public class JobManagerHAJobGraphRecoveryITCase extends TestLogger {
flink.shutdown();
}
// Verify that everything is clean
verifyCleanRecoveryState(config);
// verify that the persisted job data has not been removed from ZooKeeper when the JM has
// been shutdown
verifyRecoveryState(config);
}
/**
......@@ -225,6 +226,14 @@ public class JobManagerHAJobGraphRecoveryITCase extends TestLogger {
if (!success) {
fail("Non-leading JM was still holding reference to the job graph.");
}
Future<Object> jobRemoved = leadingJobManager.ask(
new TestingJobManagerMessages.NotifyWhenJobRemoved(jobGraph.getJobID()),
deadline.timeLeft());
leadingJobManager.tell(new JobManagerMessages.CancelJob(jobGraph.getJobID()));
Await.ready(jobRemoved, deadline.timeLeft());
}
finally {
flink.shutdown();
......@@ -482,4 +491,36 @@ public class JobManagerHAJobGraphRecoveryITCase extends TestLogger {
}
}
/**
* Fails the test if the recovery state (file state backend and ZooKeeper) has been cleaned.
*/
private static void verifyRecoveryState(Configuration config) throws Exception {
// File state backend empty
Collection<File> stateHandles = FileUtils.listFiles(
FileStateBackendBasePath, TrueFileFilter.INSTANCE, TrueFileFilter.INSTANCE);
if (stateHandles.isEmpty()) {
fail("File state backend has been cleaned: " + stateHandles);
}
// ZooKeeper
String currentJobsPath = config.getString(
ConfigConstants.ZOOKEEPER_JOBGRAPHS_PATH,
ConfigConstants.DEFAULT_ZOOKEEPER_JOBGRAPHS_PATH);
Stat stat = ZooKeeper.getClient().checkExists().forPath(currentJobsPath);
if (stat.getCversion() == 0) {
// Sanity check: verify that some changes have been performed
fail("ZooKeeper state for '" + currentJobsPath + "' has not been modified during " +
"this test. What are you testing?");
}
if (stat.getNumChildren() == 0) {
// Children have been cleaned up?
fail("ZooKeeper path '" + currentJobsPath + "' has been cleaned: " +
ZooKeeper.getClient().getChildren().forPath(currentJobsPath));
}
}
}
......@@ -150,7 +150,7 @@ class YarnJobManager(
log.warn(s"Received job status for job ${jobStatus.jobID} but expected status for " +
s"job $stopWhenJobFinished")
} else {
if (jobStatus.status.isTerminalState) {
if (jobStatus.status.isGloballyTerminalState) {
log.info(s"Job with ID ${jobStatus.jobID} is in terminal state ${jobStatus.status}. " +
s"Shutting down YARN session")
if (jobStatus.status == JobStatus.FINISHED) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册