提交 cfe62934 编写于 作者: T Till Rohrmann

[FLINK-4046] [runtime] Add direct state transition from RESTARTING to FAILED

A job can get stuck in FAILING if fail is called on a restarting job which has
not yet reset its ExecutionJobVertices, because these vertices would not call
jobVertexInFinalState. This method, however, must be called in order to transition
from FAILING to FAILED.

Accept state FAILED when calling ExecutionGraph.restart

This closes #2095.
上级 37defbb4
......@@ -810,8 +810,15 @@ public class ExecutionGraph implements Serializable {
JobStatus current = state;
if (current == JobStatus.FAILING || current.isTerminalState()) {
return;
}
else if (transitionState(current, JobStatus.FAILING, t)) {
} else if (current == JobStatus.RESTARTING && transitionState(current, JobStatus.FAILED, t)) {
synchronized (progressLock) {
postRunCleanup();
progressLock.notifyAll();
LOG.info("Job {} failed during restart.", getJobID());
return;
}
} else if (transitionState(current, JobStatus.FAILING, t)) {
this.failureCause = t;
if (!verticesInCreationOrder.isEmpty()) {
......@@ -839,8 +846,10 @@ public class ExecutionGraph implements Serializable {
if (current == JobStatus.CANCELED) {
LOG.info("Canceled job during restart. Aborting restart.");
return;
}
else if (current != JobStatus.RESTARTING) {
} else if (current == JobStatus.FAILED) {
LOG.info("Failed job during restart. Aborting restart.");
return;
} else if (current != JobStatus.RESTARTING) {
throw new IllegalStateException("Can only restart job from state restarting.");
}
......
......@@ -264,6 +264,65 @@ public class ExecutionGraphRestartTest extends TestLogger {
assertEquals(JobStatus.CANCELED, executionGraph.getState());
}
@Test
public void testFailWhileRestarting() throws Exception {
Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
Instance instance = ExecutionGraphTestUtils.getInstance(
new SimpleActorGateway(TestingUtils.directExecutionContext()),
NUM_TASKS);
scheduler.newInstanceAvailable(instance);
// Blocking program
ExecutionGraph executionGraph = new ExecutionGraph(
TestingUtils.defaultExecutionContext(),
new JobID(),
"TestJob",
new Configuration(),
ExecutionConfigTest.getSerializedConfig(),
AkkaUtils.getDefaultTimeout(),
// We want to manually control the restart and delay
new FixedDelayRestartStrategy(Integer.MAX_VALUE, Long.MAX_VALUE));
JobVertex jobVertex = new JobVertex("NoOpInvokable");
jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
jobVertex.setParallelism(NUM_TASKS);
JobGraph jobGraph = new JobGraph("TestJob", jobVertex);
executionGraph.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
assertEquals(JobStatus.CREATED, executionGraph.getState());
executionGraph.scheduleForExecution(scheduler);
assertEquals(JobStatus.RUNNING, executionGraph.getState());
// Kill the instance and wait for the job to restart
instance.markDead();
Deadline deadline = TestingUtils.TESTING_DURATION().fromNow();
while (deadline.hasTimeLeft() &&
executionGraph.getState() != JobStatus.RESTARTING) {
Thread.sleep(100);
}
assertEquals(JobStatus.RESTARTING, executionGraph.getState());
// Canceling needs to abort the restart
executionGraph.fail(new Exception("Test exception"));
assertEquals(JobStatus.FAILED, executionGraph.getState());
// The restart has been aborted
executionGraph.restart();
assertEquals(JobStatus.FAILED, executionGraph.getState());
}
@Test
public void testCancelWhileFailing() throws Exception {
Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册