提交 5a86a0a1 编写于 作者: U Ufuk Celebi

[FLINK-3011] [runtime] Fix cancel during restart

上级 ceabbd07
......@@ -711,6 +711,26 @@ public class ExecutionGraph implements Serializable {
return;
}
}
// Executions are being canceled. Go into cancelling and wait for
// all vertices to be in their final state.
else if (current == JobStatus.FAILING) {
if (transitionState(current, JobStatus.CANCELLING)) {
return;
}
}
// All vertices have been cancelled and it's safe to directly go
// into the canceled state.
else if (current == JobStatus.RESTARTING) {
synchronized (progressLock) {
if (transitionState(current, JobStatus.CANCELED)) {
postRunCleanup();
progressLock.notifyAll();
LOG.info("Canceled during restart.");
return;
}
}
}
else {
// no need to treat other states
return;
......@@ -747,9 +767,16 @@ public class ExecutionGraph implements Serializable {
public void restart() {
try {
synchronized (progressLock) {
if (state != JobStatus.RESTARTING) {
JobStatus current = state;
if (current == JobStatus.CANCELED) {
LOG.info("Canceled job during restart. Aborting restart.");
return;
}
else if (current != JobStatus.RESTARTING) {
throw new IllegalStateException("Can only restart job from state restarting.");
}
if (scheduler == null) {
throw new IllegalStateException("The execution graph has not been scheduled before - scheduler is null.");
}
......
......@@ -21,6 +21,7 @@ package org.apache.flink.runtime.executiongraph;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
......@@ -37,6 +38,9 @@ import java.util.concurrent.TimeUnit;
import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleActorGateway;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.spy;
public class ExecutionGraphRestartTest {
......@@ -158,4 +162,141 @@ public class ExecutionGraphRestartTest {
fail("Failed to wait until all execution attempts left the state DEPLOYING.");
}
}
@Test
public void testCancelWhileRestarting() throws Exception {
Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
Instance instance = ExecutionGraphTestUtils.getInstance(
new SimpleActorGateway(TestingUtils.directExecutionContext()),
NUM_TASKS);
scheduler.newInstanceAvailable(instance);
// Blocking program
ExecutionGraph executionGraph = new ExecutionGraph(
TestingUtils.defaultExecutionContext(),
new JobID(),
"TestJob",
new Configuration(),
AkkaUtils.getDefaultTimeout());
JobVertex jobVertex = new JobVertex("NoOpInvokable");
jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
jobVertex.setParallelism(NUM_TASKS);
JobGraph jobGraph = new JobGraph("TestJob", jobVertex);
// We want to manually control the restart and delay
executionGraph.setNumberOfRetriesLeft(Integer.MAX_VALUE);
executionGraph.setDelayBeforeRetrying(Integer.MAX_VALUE);
executionGraph.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
assertEquals(JobStatus.CREATED, executionGraph.getState());
executionGraph.scheduleForExecution(scheduler);
assertEquals(JobStatus.RUNNING, executionGraph.getState());
// Kill the instance and wait for the job to restart
instance.markDead();
Deadline deadline = TestingUtils.TESTING_DURATION().fromNow();
while (deadline.hasTimeLeft() &&
executionGraph.getState() != JobStatus.RESTARTING) {
Thread.sleep(100);
}
assertEquals(JobStatus.RESTARTING, executionGraph.getState());
// Canceling needs to abort the restart
executionGraph.cancel();
assertEquals(JobStatus.CANCELED, executionGraph.getState());
// The restart has been aborted
executionGraph.restart();
assertEquals(JobStatus.CANCELED, executionGraph.getState());
}
@Test
public void testCancelWhileFailing() throws Exception {
Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
Instance instance = ExecutionGraphTestUtils.getInstance(
new SimpleActorGateway(TestingUtils.directExecutionContext()),
NUM_TASKS);
scheduler.newInstanceAvailable(instance);
// Blocking program
ExecutionGraph executionGraph = new ExecutionGraph(
TestingUtils.defaultExecutionContext(),
new JobID(),
"TestJob",
new Configuration(),
AkkaUtils.getDefaultTimeout());
// Spy on the graph
executionGraph = spy(executionGraph);
// Do nothing here, because we don't want to transition out of
// the FAILING state.
doNothing().when(executionGraph).jobVertexInFinalState();
JobVertex jobVertex = new JobVertex("NoOpInvokable");
jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
jobVertex.setParallelism(NUM_TASKS);
JobGraph jobGraph = new JobGraph("TestJob", jobVertex);
// We want to manually control the restart and delay
executionGraph.setNumberOfRetriesLeft(Integer.MAX_VALUE);
executionGraph.setDelayBeforeRetrying(Integer.MAX_VALUE);
executionGraph.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
assertEquals(JobStatus.CREATED, executionGraph.getState());
executionGraph.scheduleForExecution(scheduler);
assertEquals(JobStatus.RUNNING, executionGraph.getState());
// Kill the instance...
instance.markDead();
Deadline deadline = TestingUtils.TESTING_DURATION().fromNow();
// ...and wait for all vertices to be in state FAILED. The
// jobVertexInFinalState does nothing, that's why we don't wait on the
// job status.
boolean success = false;
while (deadline.hasTimeLeft() && !success) {
success = true;
for (ExecutionVertex vertex : executionGraph.getAllExecutionVertices()) {
if (vertex.getExecutionState() != ExecutionState.FAILED) {
success = false;
Thread.sleep(100);
break;
}
}
}
// Still in failing
assertEquals(JobStatus.FAILING, executionGraph.getState());
// The cancel call needs to change the state to CANCELLING
executionGraph.cancel();
assertEquals(JobStatus.CANCELLING, executionGraph.getState());
// Unspy and finalize the job state
doCallRealMethod().when(executionGraph).jobVertexInFinalState();
executionGraph.jobVertexInFinalState();
assertEquals(JobStatus.CANCELED, executionGraph.getState());
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册