diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index aae0b7ce69be8abf0b941332af5672da468306ba..1e5d02cb954314f94365d10bad5976eb97bd19cb 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -711,6 +711,26 @@ public class ExecutionGraph implements Serializable { return; } } + // Executions are being canceled. Go into cancelling and wait for + // all vertices to be in their final state. + else if (current == JobStatus.FAILING) { + if (transitionState(current, JobStatus.CANCELLING)) { + return; + } + } + // All vertices have been cancelled and it's safe to directly go + // into the canceled state. + else if (current == JobStatus.RESTARTING) { + synchronized (progressLock) { + if (transitionState(current, JobStatus.CANCELED)) { + postRunCleanup(); + progressLock.notifyAll(); + + LOG.info("Canceled during restart."); + return; + } + } + } else { // no need to treat other states return; @@ -747,9 +767,16 @@ public class ExecutionGraph implements Serializable { public void restart() { try { synchronized (progressLock) { - if (state != JobStatus.RESTARTING) { + JobStatus current = state; + + if (current == JobStatus.CANCELED) { + LOG.info("Canceled job during restart. Aborting restart."); + return; + } + else if (current != JobStatus.RESTARTING) { throw new IllegalStateException("Can only restart job from state restarting."); } + if (scheduler == null) { throw new IllegalStateException("The execution graph has not been scheduled before - scheduler is null."); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java index 57b1829b5a0d4bbca922a78f1e31c035b7f94d5a..a50aa2e5d5267014d92da7f53c2b89f4d40235d2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.executiongraph; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.instance.Instance; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobStatus; @@ -37,6 +38,9 @@ import java.util.concurrent.TimeUnit; import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleActorGateway; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; +import static org.mockito.Mockito.doCallRealMethod; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.spy; public class ExecutionGraphRestartTest { @@ -158,4 +162,141 @@ public class ExecutionGraphRestartTest { fail("Failed to wait until all execution attempts left the state DEPLOYING."); } } + + @Test + public void testCancelWhileRestarting() throws Exception { + Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext()); + + Instance instance = ExecutionGraphTestUtils.getInstance( + new SimpleActorGateway(TestingUtils.directExecutionContext()), + NUM_TASKS); + + scheduler.newInstanceAvailable(instance); + + // Blocking program + ExecutionGraph executionGraph = new ExecutionGraph( + TestingUtils.defaultExecutionContext(), + new JobID(), + "TestJob", + new Configuration(), + AkkaUtils.getDefaultTimeout()); + + JobVertex jobVertex = new JobVertex("NoOpInvokable"); + jobVertex.setInvokableClass(Tasks.NoOpInvokable.class); + jobVertex.setParallelism(NUM_TASKS); + + JobGraph jobGraph = new JobGraph("TestJob", jobVertex); + + // We want to manually control the restart and delay + executionGraph.setNumberOfRetriesLeft(Integer.MAX_VALUE); + executionGraph.setDelayBeforeRetrying(Integer.MAX_VALUE); + executionGraph.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources()); + + assertEquals(JobStatus.CREATED, executionGraph.getState()); + + executionGraph.scheduleForExecution(scheduler); + + assertEquals(JobStatus.RUNNING, executionGraph.getState()); + + // Kill the instance and wait for the job to restart + instance.markDead(); + + Deadline deadline = TestingUtils.TESTING_DURATION().fromNow(); + + while (deadline.hasTimeLeft() && + executionGraph.getState() != JobStatus.RESTARTING) { + + Thread.sleep(100); + } + + assertEquals(JobStatus.RESTARTING, executionGraph.getState()); + + // Canceling needs to abort the restart + executionGraph.cancel(); + + assertEquals(JobStatus.CANCELED, executionGraph.getState()); + + // The restart has been aborted + executionGraph.restart(); + + assertEquals(JobStatus.CANCELED, executionGraph.getState()); + } + + @Test + public void testCancelWhileFailing() throws Exception { + Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext()); + + Instance instance = ExecutionGraphTestUtils.getInstance( + new SimpleActorGateway(TestingUtils.directExecutionContext()), + NUM_TASKS); + + scheduler.newInstanceAvailable(instance); + + // Blocking program + ExecutionGraph executionGraph = new ExecutionGraph( + TestingUtils.defaultExecutionContext(), + new JobID(), + "TestJob", + new Configuration(), + AkkaUtils.getDefaultTimeout()); + + // Spy on the graph + executionGraph = spy(executionGraph); + + // Do nothing here, because we don't want to transition out of + // the FAILING state. + doNothing().when(executionGraph).jobVertexInFinalState(); + + JobVertex jobVertex = new JobVertex("NoOpInvokable"); + jobVertex.setInvokableClass(Tasks.NoOpInvokable.class); + jobVertex.setParallelism(NUM_TASKS); + + JobGraph jobGraph = new JobGraph("TestJob", jobVertex); + + // We want to manually control the restart and delay + executionGraph.setNumberOfRetriesLeft(Integer.MAX_VALUE); + executionGraph.setDelayBeforeRetrying(Integer.MAX_VALUE); + executionGraph.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources()); + + assertEquals(JobStatus.CREATED, executionGraph.getState()); + + executionGraph.scheduleForExecution(scheduler); + + assertEquals(JobStatus.RUNNING, executionGraph.getState()); + + // Kill the instance... + instance.markDead(); + + Deadline deadline = TestingUtils.TESTING_DURATION().fromNow(); + + // ...and wait for all vertices to be in state FAILED. The + // jobVertexInFinalState does nothing, that's why we don't wait on the + // job status. + boolean success = false; + while (deadline.hasTimeLeft() && !success) { + success = true; + for (ExecutionVertex vertex : executionGraph.getAllExecutionVertices()) { + if (vertex.getExecutionState() != ExecutionState.FAILED) { + success = false; + Thread.sleep(100); + break; + } + } + } + + // Still in failing + assertEquals(JobStatus.FAILING, executionGraph.getState()); + + // The cancel call needs to change the state to CANCELLING + executionGraph.cancel(); + + assertEquals(JobStatus.CANCELLING, executionGraph.getState()); + + // Unspy and finalize the job state + doCallRealMethod().when(executionGraph).jobVertexInFinalState(); + + executionGraph.jobVertexInFinalState(); + + assertEquals(JobStatus.CANCELED, executionGraph.getState()); + } }