提交 e0061272 编写于 作者: S Stephan Ewen

[FLINK-6340] [flip-1] Add a termination future to the Execution

上级 aadfe45a
...@@ -121,6 +121,9 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution ...@@ -121,6 +121,9 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
private final ConcurrentLinkedQueue<PartialInputChannelDeploymentDescriptor> partialInputChannelDeploymentDescriptors; private final ConcurrentLinkedQueue<PartialInputChannelDeploymentDescriptor> partialInputChannelDeploymentDescriptors;
/** A future that completes once the Execution reaches a terminal ExecutionState */
private final FlinkCompletableFuture<ExecutionState> terminationFuture;
private volatile ExecutionState state = CREATED; private volatile ExecutionState state = CREATED;
private volatile SimpleSlot assignedResource; // once assigned, never changes until the execution is archived private volatile SimpleSlot assignedResource; // once assigned, never changes until the execution is archived
...@@ -161,6 +164,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution ...@@ -161,6 +164,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
markTimestamp(ExecutionState.CREATED, startTimestamp); markTimestamp(ExecutionState.CREATED, startTimestamp);
this.partialInputChannelDeploymentDescriptors = new ConcurrentLinkedQueue<>(); this.partialInputChannelDeploymentDescriptors = new ConcurrentLinkedQueue<>();
this.terminationFuture = new FlinkCompletableFuture<>();
} }
// -------------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------------
...@@ -234,6 +238,16 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution ...@@ -234,6 +238,16 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
this.taskState = checkpointStateHandles; this.taskState = checkpointStateHandles;
} }
/**
* Gets a future that completes once the task execution reaches a terminal state.
* The future will be completed with specific state that the execution reached.
*
* @return A future for the execution's termination
*/
public Future<ExecutionState> getTerminationFuture() {
return terminationFuture;
}
// -------------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------------
// Actions // Actions
// -------------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------------
...@@ -473,7 +487,8 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution ...@@ -473,7 +487,8 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
} }
} }
finally { finally {
vertex.executionCanceled(); vertex.executionCanceled(this);
terminationFuture.complete(CANCELED);
} }
return; return;
} }
...@@ -741,7 +756,8 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution ...@@ -741,7 +756,8 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
vertex.getExecutionGraph().deregisterExecution(this); vertex.getExecutionGraph().deregisterExecution(this);
} }
finally { finally {
vertex.executionFinished(); vertex.executionFinished(this);
terminationFuture.complete(FINISHED);
} }
return; return;
} }
...@@ -793,7 +809,8 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution ...@@ -793,7 +809,8 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
vertex.getExecutionGraph().deregisterExecution(this); vertex.getExecutionGraph().deregisterExecution(this);
} }
finally { finally {
vertex.executionCanceled(); vertex.executionCanceled(this);
terminationFuture.complete(CANCELED);
} }
return; return;
} }
...@@ -886,7 +903,8 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution ...@@ -886,7 +903,8 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
vertex.getExecutionGraph().deregisterExecution(this); vertex.getExecutionGraph().deregisterExecution(this);
} }
finally { finally {
vertex.executionFailed(t); vertex.executionFailed(this, t);
terminationFuture.complete(FAILED);
} }
if (!isCallback && (current == RUNNING || current == DEPLOYING)) { if (!isCallback && (current == RUNNING || current == DEPLOYING)) {
......
...@@ -40,6 +40,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot; ...@@ -40,6 +40,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker; import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore; import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook; import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
import org.apache.flink.runtime.concurrent.AcceptFunction;
import org.apache.flink.runtime.concurrent.BiFunction; import org.apache.flink.runtime.concurrent.BiFunction;
import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.concurrent.FutureUtils;
...@@ -89,6 +90,7 @@ import java.util.concurrent.ExecutionException; ...@@ -89,6 +90,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkArgument;
...@@ -188,6 +190,8 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive ...@@ -188,6 +190,8 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
/** Registered KvState instances reported by the TaskManagers. */ /** Registered KvState instances reported by the TaskManagers. */
private final KvStateLocationRegistry kvStateLocationRegistry; private final KvStateLocationRegistry kvStateLocationRegistry;
private int numVerticesTotal;
// ------ Configuration of the Execution ------- // ------ Configuration of the Execution -------
/** Flag to indicate whether the scheduler may queue tasks for execution, or needs to be able /** Flag to indicate whether the scheduler may queue tasks for execution, or needs to be able
...@@ -203,6 +207,8 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive ...@@ -203,6 +207,8 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
// ------ Execution status and progress. These values are volatile, and accessed under the lock ------- // ------ Execution status and progress. These values are volatile, and accessed under the lock -------
private final AtomicInteger verticesFinished;
/** Current status of the job execution */ /** Current status of the job execution */
private volatile JobStatus state = JobStatus.CREATED; private volatile JobStatus state = JobStatus.CREATED;
...@@ -210,9 +216,6 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive ...@@ -210,9 +216,6 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
* that was not recoverable and triggered job failure */ * that was not recoverable and triggered job failure */
private volatile Throwable failureCause; private volatile Throwable failureCause;
/** The number of job vertices that have reached a terminal state */
private volatile int numFinishedJobVertices;
// ------ Fields that are relevant to the execution and need to be cleared before archiving ------- // ------ Fields that are relevant to the execution and need to be cleared before archiving -------
/** The coordinator for checkpoints, if snapshot checkpoints are enabled */ /** The coordinator for checkpoints, if snapshot checkpoints are enabled */
...@@ -317,6 +320,8 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive ...@@ -317,6 +320,8 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
this.restartStrategy = restartStrategy; this.restartStrategy = restartStrategy;
this.kvStateLocationRegistry = new KvStateLocationRegistry(jobId, getAllVertices()); this.kvStateLocationRegistry = new KvStateLocationRegistry(jobId, getAllVertices());
this.verticesFinished = new AtomicInteger();
} }
// -------------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------------
...@@ -454,7 +459,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive ...@@ -454,7 +459,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
return jv.getTaskVertices(); return jv.getTaskVertices();
} }
else { else {
ArrayList<ExecutionVertex> all = new ArrayList<ExecutionVertex>(); ArrayList<ExecutionVertex> all = new ArrayList<>();
for (ExecutionJobVertex jv : jobVertices) { for (ExecutionJobVertex jv : jobVertices) {
if (jv.getGraph() != this) { if (jv.getGraph() != this) {
throw new IllegalArgumentException("Can only use ExecutionJobVertices of this ExecutionGraph"); throw new IllegalArgumentException("Can only use ExecutionJobVertices of this ExecutionGraph");
...@@ -586,6 +591,10 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive ...@@ -586,6 +591,10 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
}; };
} }
public int getTotalNumberOfVertices() {
return numVerticesTotal;
}
public Map<IntermediateDataSetID, IntermediateResult> getAllIntermediateResults() { public Map<IntermediateDataSetID, IntermediateResult> getAllIntermediateResults() {
return Collections.unmodifiableMap(this.intermediateResults); return Collections.unmodifiableMap(this.intermediateResults);
} }
...@@ -620,7 +629,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive ...@@ -620,7 +629,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
*/ */
public Map<String, Accumulator<?,?>> aggregateUserAccumulators() { public Map<String, Accumulator<?,?>> aggregateUserAccumulators() {
Map<String, Accumulator<?, ?>> userAccumulators = new HashMap<String, Accumulator<?, ?>>(); Map<String, Accumulator<?, ?>> userAccumulators = new HashMap<>();
for (ExecutionVertex vertex : getAllExecutionVertices()) { for (ExecutionVertex vertex : getAllExecutionVertices()) {
Map<String, Accumulator<?, ?>> next = vertex.getCurrentExecutionAttempt().getUserAccumulators(); Map<String, Accumulator<?, ?>> next = vertex.getCurrentExecutionAttempt().getUserAccumulators();
...@@ -657,7 +666,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive ...@@ -657,7 +666,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
Map<String, Accumulator<?, ?>> accumulatorMap = aggregateUserAccumulators(); Map<String, Accumulator<?, ?>> accumulatorMap = aggregateUserAccumulators();
Map<String, SerializedValue<Object>> result = new HashMap<String, SerializedValue<Object>>(); Map<String, SerializedValue<Object>> result = new HashMap<>();
for (Map.Entry<String, Accumulator<?, ?>> entry : accumulatorMap.entrySet()) { for (Map.Entry<String, Accumulator<?, ?>> entry : accumulatorMap.entrySet()) {
result.put(entry.getKey(), new SerializedValue<Object>(entry.getValue().getLocalValue())); result.put(entry.getKey(), new SerializedValue<Object>(entry.getValue().getLocalValue()));
} }
...@@ -713,6 +722,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive ...@@ -713,6 +722,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
} }
this.verticesInCreationOrder.add(ejv); this.verticesInCreationOrder.add(ejv);
this.numVerticesTotal += ejv.getParallelism();
} }
} }
...@@ -878,9 +888,23 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive ...@@ -878,9 +888,23 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
if (current == JobStatus.RUNNING || current == JobStatus.CREATED) { if (current == JobStatus.RUNNING || current == JobStatus.CREATED) {
if (transitionState(current, JobStatus.CANCELLING)) { if (transitionState(current, JobStatus.CANCELLING)) {
final ArrayList<Future<?>> futures = new ArrayList<>(verticesInCreationOrder.size());
// cancel all tasks (that still need cancelling)
for (ExecutionJobVertex ejv : verticesInCreationOrder) { for (ExecutionJobVertex ejv : verticesInCreationOrder) {
ejv.cancel(); futures.add(ejv.cancelWithFuture());
} }
// we build a future that is complete once all vertices have reached a terminal state
final ConjunctFuture allTerminal = FutureUtils.combineAll(futures);
allTerminal.thenAccept(new AcceptFunction<Void>() {
@Override
public void accept(Void value) {
allVerticesInTerminalState();
}
});
return; return;
} }
} }
...@@ -968,25 +992,32 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive ...@@ -968,25 +992,32 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
current == JobStatus.SUSPENDED || current == JobStatus.SUSPENDED ||
current.isGloballyTerminalState()) { current.isGloballyTerminalState()) {
return; return;
} else if (current == JobStatus.RESTARTING) { }
else if (current == JobStatus.RESTARTING) {
this.failureCause = t; this.failureCause = t;
if (tryRestartOrFail()) { if (tryRestartOrFail()) {
return; return;
} }
// concurrent job status change, let's check again }
} else if (transitionState(current, JobStatus.FAILING, t)) { else if (transitionState(current, JobStatus.FAILING, t)) {
this.failureCause = t; this.failureCause = t;
if (!verticesInCreationOrder.isEmpty()) { // we build a future that is complete once all vertices have reached a terminal state
// cancel all. what is failed will not cancel but stay failed final ArrayList<Future<?>> futures = new ArrayList<>(verticesInCreationOrder.size());
// cancel all tasks (that still need cancelling)
for (ExecutionJobVertex ejv : verticesInCreationOrder) { for (ExecutionJobVertex ejv : verticesInCreationOrder) {
ejv.cancel(); futures.add(ejv.cancelWithFuture());
} }
} else {
// set the state of the job to failed final ConjunctFuture allTerminal = FutureUtils.combineAll(futures);
transitionState(JobStatus.FAILING, JobStatus.FAILED, t); allTerminal.thenAccept(new AcceptFunction<Void>() {
@Override
public void accept(Void value) {
allVerticesInTerminalState();
} }
});
return; return;
} }
...@@ -1039,7 +1070,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive ...@@ -1039,7 +1070,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
stateTimestamps[i] = 0; stateTimestamps[i] = 0;
} }
} }
numFinishedJobVertices = 0;
transitionState(JobStatus.RESTARTING, JobStatus.CREATED); transitionState(JobStatus.RESTARTING, JobStatus.CREATED);
// if we have checkpointed state, reload it into the executions // if we have checkpointed state, reload it into the executions
...@@ -1097,9 +1128,24 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive ...@@ -1097,9 +1128,24 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
* For testing: This waits until the job execution has finished. * For testing: This waits until the job execution has finished.
*/ */
public void waitUntilFinished() throws InterruptedException { public void waitUntilFinished() throws InterruptedException {
synchronized (progressLock) { // we may need multiple attempts in the presence of failures / recovery
while (!state.isTerminalState()) { while (true) {
progressLock.wait(); for (ExecutionJobVertex ejv : verticesInCreationOrder) {
for (ExecutionVertex ev : ejv.getTaskVertices()) {
try {
ev.getCurrentExecutionAttempt().getTerminationFuture().get();
}
catch (ExecutionException e) {
// this should never happen
throw new RuntimeException(e);
}
}
}
// now that all vertices have been (at some point) in a terminal state,
// we need to check if the job as a whole has entered a final state
if (state.isTerminalState()) {
return;
} }
} }
} }
...@@ -1129,16 +1175,19 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive ...@@ -1129,16 +1175,19 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
} }
} }
void jobVertexInFinalState() { void vertexFinished() {
synchronized (progressLock) { int numFinished = verticesFinished.incrementAndGet();
if (numFinishedJobVertices >= verticesInCreationOrder.size()) { if (numFinished == numVerticesTotal) {
throw new IllegalStateException("All vertices are already finished, cannot transition vertex to finished."); // done :-)
allVerticesInTerminalState();
}
} }
numFinishedJobVertices++; void vertexUnFinished() {
verticesFinished.getAndDecrement();
if (numFinishedJobVertices == verticesInCreationOrder.size()) { }
private void allVerticesInTerminalState() {
// we are done, transition to the final state // we are done, transition to the final state
JobStatus current; JobStatus current;
while (true) { while (true) {
...@@ -1177,11 +1226,6 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive ...@@ -1177,11 +1226,6 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
} }
} }
// done transitioning the state // done transitioning the state
// also, notify waiters
progressLock.notifyAll();
}
}
} }
/** /**
......
...@@ -32,6 +32,7 @@ import org.apache.flink.core.io.LocatableInputSplit; ...@@ -32,6 +32,7 @@ import org.apache.flink.core.io.LocatableInputSplit;
import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.instance.SlotProvider; import org.apache.flink.runtime.instance.SlotProvider;
...@@ -96,8 +97,6 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable ...@@ -96,8 +97,6 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
private final int parallelism; private final int parallelism;
private final boolean[] finishedSubtasks;
private final SlotSharingGroup slotSharingGroup; private final SlotSharingGroup slotSharingGroup;
private final CoLocationGroup coLocationGroup; private final CoLocationGroup coLocationGroup;
...@@ -108,8 +107,6 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable ...@@ -108,8 +107,6 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
private int maxParallelism; private int maxParallelism;
private volatile int numSubtasksInFinalState;
/** /**
* Serialized task information which is for all sub tasks the same. Thus, it avoids to * Serialized task information which is for all sub tasks the same. Thus, it avoids to
* serialize the same information multiple times in order to create the * serialize the same information multiple times in order to create the
...@@ -231,8 +228,6 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable ...@@ -231,8 +228,6 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
catch (Throwable t) { catch (Throwable t) {
throw new JobException("Creating the input splits caused an error: " + t.getMessage(), t); throw new JobException("Creating the input splits caused an error: " + t.getMessage(), t);
} }
finishedSubtasks = new boolean[parallelism];
} }
/** /**
...@@ -360,10 +355,6 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable ...@@ -360,10 +355,6 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
return serializedTaskInformation; return serializedTaskInformation;
} }
public boolean isInFinalState() {
return numSubtasksInFinalState == parallelism;
}
@Override @Override
public ExecutionState getAggregateState() { public ExecutionState getAggregateState() {
int[] num = new int[ExecutionState.values().length]; int[] num = new int[ExecutionState.values().length];
...@@ -484,30 +475,40 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable ...@@ -484,30 +475,40 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
return slots; return slots;
} }
/**
* Cancels all currently running vertex executions.
*/
public void cancel() { public void cancel() {
for (ExecutionVertex ev : getTaskVertices()) { for (ExecutionVertex ev : getTaskVertices()) {
ev.cancel(); ev.cancel();
} }
} }
public void fail(Throwable t) { /**
* Cancels all currently running vertex executions.
*
* @return A future that is complete once all tasks have canceled.
*/
public Future<Void> cancelWithFuture() {
// we collect all futures from the task cancellations
ArrayList<Future<?>> futures = new ArrayList<>(parallelism);
// cancel each vertex
for (ExecutionVertex ev : getTaskVertices()) { for (ExecutionVertex ev : getTaskVertices()) {
ev.fail(t); futures.add(ev.cancel());
}
} }
public void waitForAllVerticesToReachFinishingState() throws InterruptedException { // return a conjunct future, which is complete once all individual tasks are canceled
synchronized (stateMonitor) { return FutureUtils.combineAll(futures);
while (numSubtasksInFinalState < parallelism) {
stateMonitor.wait();
} }
public void fail(Throwable t) {
for (ExecutionVertex ev : getTaskVertices()) {
ev.fail(t);
} }
} }
public void resetForNewExecution() { public void resetForNewExecution() {
if (!(numSubtasksInFinalState == 0 || numSubtasksInFinalState == parallelism)) {
throw new IllegalStateException("Cannot reset vertex that is not in final state");
}
synchronized (stateMonitor) { synchronized (stateMonitor) {
// check and reset the sharing groups with scheduler hints // check and reset the sharing groups with scheduler hints
...@@ -515,18 +516,8 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable ...@@ -515,18 +516,8 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
slotSharingGroup.clearTaskAssignment(); slotSharingGroup.clearTaskAssignment();
} }
// reset vertices one by one. if one reset fails, the "vertices in final state"
// fields will be consistent to handle triggered cancel calls
for (int i = 0; i < parallelism; i++) { for (int i = 0; i < parallelism; i++) {
taskVertices[i].resetForNewExecution(); taskVertices[i].resetForNewExecution();
if (finishedSubtasks[i]) {
finishedSubtasks[i] = false;
numSubtasksInFinalState--;
}
}
if (numSubtasksInFinalState != 0) {
throw new RuntimeException("Bug: resetting the execution job vertex failed.");
} }
// set up the input splits again // set up the input splits again
...@@ -549,51 +540,6 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable ...@@ -549,51 +540,6 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
} }
} }
//---------------------------------------------------------------------------------------------
// Notifications
//---------------------------------------------------------------------------------------------
void vertexFinished(int subtask) {
subtaskInFinalState(subtask);
}
void vertexCancelled(int subtask) {
subtaskInFinalState(subtask);
}
void vertexFailed(int subtask, Throwable error) {
subtaskInFinalState(subtask);
}
private void subtaskInFinalState(int subtask) {
synchronized (stateMonitor) {
if (!finishedSubtasks[subtask]) {
finishedSubtasks[subtask] = true;
if (numSubtasksInFinalState+1 == parallelism) {
// call finalizeOnMaster hook
try {
getJobVertex().finalizeOnMaster(getGraph().getUserClassLoader());
}
catch (Throwable t) {
getGraph().fail(t);
}
numSubtasksInFinalState++;
// we are in our final state
stateMonitor.notifyAll();
// tell the graph
graph.jobVertexInFinalState();
} else {
numSubtasksInFinalState++;
}
}
}
}
// -------------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------------
// Accumulators / Metrics // Accumulators / Metrics
// -------------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------------
......
...@@ -22,6 +22,7 @@ import org.apache.flink.api.common.Archiveable; ...@@ -22,6 +22,7 @@ import org.apache.flink.api.common.Archiveable;
import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time; import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor; import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescriptor; import org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescriptor;
...@@ -60,8 +61,6 @@ import java.util.List; ...@@ -60,8 +61,6 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import static org.apache.flink.runtime.execution.ExecutionState.CANCELED;
import static org.apache.flink.runtime.execution.ExecutionState.FAILED;
import static org.apache.flink.runtime.execution.ExecutionState.FINISHED; import static org.apache.flink.runtime.execution.ExecutionState.FINISHED;
/** /**
...@@ -509,30 +508,41 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi ...@@ -509,30 +508,41 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
// Actions // Actions
// -------------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------------
public void resetForNewExecution() { public Execution resetForNewExecution() {
LOG.debug("Resetting execution vertex {} for new execution.", getTaskNameWithSubtaskIndex()); LOG.debug("Resetting execution vertex {} for new execution.", getTaskNameWithSubtaskIndex());
synchronized (priorExecutions) { synchronized (priorExecutions) {
Execution execution = currentExecution; final Execution oldExecution = currentExecution;
ExecutionState state = execution.getState(); final ExecutionState oldState = oldExecution.getState();
if (state == FINISHED || state == CANCELED || state == FAILED) { if (oldState.isTerminal()) {
priorExecutions.add(execution); priorExecutions.add(oldExecution);
currentExecution = new Execution(
final Execution newExecution = new Execution(
getExecutionGraph().getFutureExecutor(), getExecutionGraph().getFutureExecutor(),
this, this,
execution.getAttemptNumber()+1, oldExecution.getAttemptNumber()+1,
System.currentTimeMillis(), System.currentTimeMillis(),
timeout); timeout);
this.currentExecution = newExecution;
CoLocationGroup grp = jobVertex.getCoLocationGroup(); CoLocationGroup grp = jobVertex.getCoLocationGroup();
if (grp != null) { if (grp != null) {
this.locationConstraint = grp.getLocationConstraint(subTaskIndex); this.locationConstraint = grp.getLocationConstraint(subTaskIndex);
} }
// if the execution was 'FINISHED' before, tell the ExecutionGraph that
// we take one step back on the road to reaching global FINISHED
if (oldState == FINISHED) {
getExecutionGraph().vertexUnFinished();
}
return newExecution;
} }
else { else {
throw new IllegalStateException("Cannot reset a vertex that is in state " + state); throw new IllegalStateException("Cannot reset a vertex that is in non-terminal state " + oldState);
} }
} }
} }
...@@ -545,8 +555,16 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi ...@@ -545,8 +555,16 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
this.currentExecution.deployToSlot(slot); this.currentExecution.deployToSlot(slot);
} }
public void cancel() { /**
this.currentExecution.cancel(); *
* @return A future that completes once the execution has reached its final state.
*/
public Future<ExecutionState> cancel() {
// to avoid any case of mixup in the presence of concurrent calls,
// we copy a reference to the stack to make sure both calls go to the same Execution
final Execution exec = this.currentExecution;
exec.cancel();
return exec.getTerminationFuture();
} }
public void stop() { public void stop() {
...@@ -621,16 +639,18 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi ...@@ -621,16 +639,18 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
// Notifications from the Execution Attempt // Notifications from the Execution Attempt
// -------------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------------
void executionFinished() { void executionFinished(Execution execution) {
jobVertex.vertexFinished(subTaskIndex); if (execution == currentExecution) {
getExecutionGraph().vertexFinished();
}
} }
void executionCanceled() { void executionCanceled(Execution execution) {
jobVertex.vertexCancelled(subTaskIndex); // nothing to do
} }
void executionFailed(Throwable t) { void executionFailed(Execution execution, Throwable cause) {
jobVertex.vertexFailed(subTaskIndex, t); // nothing to do
} }
// -------------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------------
......
...@@ -60,12 +60,12 @@ import java.util.concurrent.Callable; ...@@ -60,12 +60,12 @@ import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleActorGateway; import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleActorGateway;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.never; import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy; import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
...@@ -288,48 +288,58 @@ public class ExecutionGraphRestartTest extends TestLogger { ...@@ -288,48 +288,58 @@ public class ExecutionGraphRestartTest extends TestLogger {
@Test @Test
public void testCancelWhileFailing() throws Exception { public void testCancelWhileFailing() throws Exception {
// We want to manually control the restart and delay final RestartStrategy restartStrategy = new InfiniteDelayRestartStrategy();
RestartStrategy restartStrategy = new InfiniteDelayRestartStrategy(); final ExecutionGraph graph = createExecutionGraph(restartStrategy).f0;
Tuple2<ExecutionGraph, Instance> executionGraphInstanceTuple = createSpyExecutionGraph(restartStrategy);
ExecutionGraph executionGraph = executionGraphInstanceTuple.f0;
Instance instance = executionGraphInstanceTuple.f1;
doNothing().when(executionGraph).jobVertexInFinalState();
// Kill the instance... assertEquals(JobStatus.RUNNING, graph.getState());
instance.markDead();
Deadline deadline = TestingUtils.TESTING_DURATION().fromNow(); // switch all tasks to running
for (ExecutionVertex vertex : graph.getVerticesTopologically().iterator().next().getTaskVertices()) {
vertex.getCurrentExecutionAttempt().switchToRunning();
}
// ...and wait for all vertices to be in state FAILED. The graph.fail(new Exception("test"));
// jobVertexInFinalState does nothing, that's why we don't wait on the
// job status. assertEquals(JobStatus.FAILING, graph.getState());
boolean success = false;
while (deadline.hasTimeLeft() && !success) { graph.cancel();
success = true;
for (ExecutionVertex vertex : executionGraph.getAllExecutionVertices()) { assertEquals(JobStatus.CANCELLING, graph.getState());
ExecutionState state = vertex.getExecutionState();
if (state != ExecutionState.FAILED && state != ExecutionState.CANCELED) { // let all tasks finish cancelling
success = false; for (ExecutionVertex vertex : graph.getVerticesTopologically().iterator().next().getTaskVertices()) {
Thread.sleep(100); vertex.getCurrentExecutionAttempt().cancelingComplete();
break;
} }
assertEquals(JobStatus.CANCELED, graph.getState());
} }
@Test
public void testFailWhileCanceling() throws Exception {
final RestartStrategy restartStrategy = new NoRestartStrategy();
final ExecutionGraph graph = createExecutionGraph(restartStrategy).f0;
assertEquals(JobStatus.RUNNING, graph.getState());
// switch all tasks to running
for (ExecutionVertex vertex : graph.getVerticesTopologically().iterator().next().getTaskVertices()) {
vertex.getCurrentExecutionAttempt().switchToRunning();
} }
// Still in failing graph.cancel();
assertEquals(JobStatus.FAILING, executionGraph.getState());
// The cancel call needs to change the state to CANCELLING assertEquals(JobStatus.CANCELLING, graph.getState());
executionGraph.cancel();
assertEquals(JobStatus.CANCELLING, executionGraph.getState()); graph.fail(new Exception("test"));
// Unspy and finalize the job state assertEquals(JobStatus.FAILING, graph.getState());
doCallRealMethod().when(executionGraph).jobVertexInFinalState();
executionGraph.jobVertexInFinalState(); // let all tasks finish cancelling
for (ExecutionVertex vertex : graph.getVerticesTopologically().iterator().next().getTaskVertices()) {
vertex.getCurrentExecutionAttempt().cancelingComplete();
}
assertEquals(JobStatus.CANCELED, executionGraph.getState()); assertEquals(JobStatus.FAILED, graph.getState());
} }
@Test @Test
......
...@@ -18,7 +18,6 @@ ...@@ -18,7 +18,6 @@
package org.apache.flink.runtime.executiongraph; package org.apache.flink.runtime.executiongraph;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy; import static org.mockito.Mockito.spy;
...@@ -52,9 +51,10 @@ import org.apache.flink.runtime.messages.TaskMessages.FailIntermediateResultPart ...@@ -52,9 +51,10 @@ import org.apache.flink.runtime.messages.TaskMessages.FailIntermediateResultPart
import org.apache.flink.runtime.messages.TaskMessages.CancelTask; import org.apache.flink.runtime.messages.TaskMessages.CancelTask;
import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.SerializedValue; import org.apache.flink.util.SerializedValue;
import org.mockito.Matchers;
import org.mockito.invocation.InvocationOnMock; import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer; import org.mockito.stubbing.Answer;
import scala.concurrent.ExecutionContext; import scala.concurrent.ExecutionContext;
import scala.concurrent.ExecutionContext$; import scala.concurrent.ExecutionContext$;
...@@ -198,10 +198,6 @@ public class ExecutionGraphTestUtils { ...@@ -198,10 +198,6 @@ public class ExecutionGraphTestUtils {
} }
}; };
doAnswer(noop).when(ejv).vertexCancelled(Matchers.anyInt());
doAnswer(noop).when(ejv).vertexFailed(Matchers.anyInt(), Matchers.any(Throwable.class));
doAnswer(noop).when(ejv).vertexFinished(Matchers.anyInt());
return ejv; return ejv;
} }
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.executiongraph;
import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.*;
import static org.junit.Assert.*;
import static org.mockito.Mockito.mock;
import java.util.Collections;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.SerializedValue;
import org.junit.Test;
public class ExecutionStateProgressTest {
@Test
public void testAccumulatedStateFinished() {
try {
final JobID jid = new JobID();
final JobVertexID vid = new JobVertexID();
JobVertex ajv = new JobVertex("TestVertex", vid);
ajv.setParallelism(3);
ajv.setInvokableClass(mock(AbstractInvokable.class).getClass());
ExecutionGraph graph = new ExecutionGraph(
TestingUtils.defaultExecutor(),
TestingUtils.defaultExecutor(),
jid,
"test job",
new Configuration(),
new SerializedValue<>(new ExecutionConfig()),
AkkaUtils.getDefaultTimeout(),
new NoRestartStrategy(),
new Scheduler(TestingUtils.defaultExecutionContext()));
graph.attachJobGraph(Collections.singletonList(ajv));
setGraphStatus(graph, JobStatus.RUNNING);
ExecutionJobVertex ejv = graph.getJobVertex(vid);
// mock resources and mock taskmanager
for (ExecutionVertex ee : ejv.getTaskVertices()) {
SimpleSlot slot = getInstance(
new ActorTaskManagerGateway(
new SimpleActorGateway(
TestingUtils.defaultExecutionContext()))
).allocateSimpleSlot(jid);
ee.deployToSlot(slot);
}
// finish all
for (ExecutionVertex ee : ejv.getTaskVertices()) {
ee.executionFinished();
}
assertTrue(ejv.isInFinalState());
assertEquals(JobStatus.FINISHED, graph.getState());
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.executiongraph;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
import org.apache.flink.runtime.instance.DummyActorGateway;
import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.instance.SlotProvider;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.operators.testutils.DummyInvokable;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.SerializedValue;
import org.junit.Test;
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.InetAddress;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import static org.junit.Assert.*;
public class TerminalStateDeadlockTest {
private final Field stateField;
private final Field resourceField;
private final Field execGraphStateField;
private final Field execGraphSlotProviderField;
private final SimpleSlot resource;
public TerminalStateDeadlockTest() {
try {
// the reflection fields to access the private fields
this.stateField = Execution.class.getDeclaredField("state");
this.stateField.setAccessible(true);
this.resourceField = Execution.class.getDeclaredField("assignedResource");
this.resourceField.setAccessible(true);
this.execGraphStateField = ExecutionGraph.class.getDeclaredField("state");
this.execGraphStateField.setAccessible(true);
this.execGraphSlotProviderField = ExecutionGraph.class.getDeclaredField("slotProvider");
this.execGraphSlotProviderField.setAccessible(true);
// the dummy resource
ResourceID resourceId = ResourceID.generate();
InetAddress address = InetAddress.getByName("127.0.0.1");
TaskManagerLocation ci = new TaskManagerLocation(resourceId, address, 12345);
HardwareDescription resources = new HardwareDescription(4, 4000000, 3000000, 2000000);
Instance instance = new Instance(
new ActorTaskManagerGateway(DummyActorGateway.INSTANCE), ci, new InstanceID(), resources, 4);
this.resource = instance.allocateSimpleSlot(new JobID());
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
// silence the compiler
throw new RuntimeException();
}
}
// ------------------------------------------------------------------------
@Test
public void testProvokeDeadlock() {
try {
final JobID jobId = resource.getJobID();
final JobVertexID vid1 = new JobVertexID();
final JobVertexID vid2 = new JobVertexID();
final List<JobVertex> vertices;
{
JobVertex v1 = new JobVertex("v1", vid1);
JobVertex v2 = new JobVertex("v2", vid2);
v1.setParallelism(1);
v2.setParallelism(1);
v1.setInvokableClass(DummyInvokable.class);
v2.setInvokableClass(DummyInvokable.class);
vertices = Arrays.asList(v1, v2);
}
final Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
final Executor executor = Executors.newFixedThreadPool(4);
// try a lot!
for (int i = 0; i < 20000; i++) {
final TestExecGraph eg = new TestExecGraph(jobId);
eg.attachJobGraph(vertices);
final Execution e1 = eg.getJobVertex(vid1).getTaskVertices()[0].getCurrentExecutionAttempt();
final Execution e2 = eg.getJobVertex(vid2).getTaskVertices()[0].getCurrentExecutionAttempt();
initializeExecution(e1);
initializeExecution(e2);
execGraphStateField.set(eg, JobStatus.FAILING);
execGraphSlotProviderField.set(eg, scheduler);
Runnable r1 = new Runnable() {
@Override
public void run() {
e1.cancelingComplete();
}
};
Runnable r2 = new Runnable() {
@Override
public void run() {
e2.cancelingComplete();
}
};
executor.execute(r1);
executor.execute(r2);
eg.waitTillDone();
}
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
private void initializeExecution(Execution exec) throws IllegalAccessException {
// set state to canceling
stateField.set(exec, ExecutionState.CANCELING);
// assign a resource
resourceField.set(exec, resource);
}
static class TestExecGraph extends ExecutionGraph {
private static final Configuration EMPTY_CONFIG = new Configuration();
private static final Time TIMEOUT = Time.seconds(30L);
private volatile boolean done;
TestExecGraph(JobID jobId) throws IOException {
super(
TestingUtils.defaultExecutor(),
TestingUtils.defaultExecutor(),
jobId,
"test graph",
EMPTY_CONFIG,
new SerializedValue<>(new ExecutionConfig()),
TIMEOUT,
new FixedDelayRestartStrategy(1, 0),
new Scheduler(TestingUtils.defaultExecutionContext()));
}
@Override
public void scheduleForExecution() {
// notify that we are done with the "restarting"
synchronized (this) {
done = true;
this.notifyAll();
}
}
public void waitTillDone() {
try {
synchronized (this) {
while (!done) {
this.wait();
}
}
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册