diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index e17a3e52416caf7fc38f774ffb2814e9088b0afc..1a3ef11ca7d5d25aa93ddbbd97c51cc390de88f1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -820,7 +820,7 @@ public class Execution implements AccessExecution, Archiveable= 10, "checkpoint interval must not be below 10ms"); + checkArgument(checkpointTimeout >= 10, "checkpoint timeout must not be below 10ms"); + + checkState(state == JobStatus.CREATED, "Job must be in CREATED state"); + checkState(checkpointCoordinator == null, "checkpointing already enabled"); ExecutionVertex[] tasksToTrigger = collectExecutionVertices(verticesToTrigger); ExecutionVertex[] tasksToWaitFor = collectExecutionVertices(verticesToWaitFor); ExecutionVertex[] tasksToCommitTo = collectExecutionVertices(verticesToCommitTo); - // disable to make sure existing checkpoint coordinators are cleared - try { - disableSnaphotCheckpointing(); - } catch (Throwable t) { - LOG.error("Error while shutting down checkpointer."); - } - checkpointStatsTracker = checkNotNull(statsTracker, "CheckpointStatsTracker"); // create the coordinator that triggers and commits checkpoints and holds the state @@ -416,24 +409,6 @@ public class ExecutionGraph implements AccessExecutionGraph, ArchiveableThe shutdown of the checkpoint coordinator might block. Make sure that calls to this - * method don't block the job manager actor and run asynchronously. - */ - public void disableSnaphotCheckpointing() throws Exception { - if (state != JobStatus.CREATED) { - throw new IllegalStateException("Job must be in CREATED state"); - } - - if (checkpointCoordinator != null) { - checkpointCoordinator.shutdown(state); - checkpointCoordinator = null; - checkpointStatsTracker = null; - } - } - @Override public CheckpointCoordinator getCheckpointCoordinator() { return checkpointCoordinator; @@ -761,7 +736,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable> accumulators; + + switch (state.getExecutionState()) { + case RUNNING: + return attempt.switchToRunning(); + + case FINISHED: + // this deserialization is exception-free + accumulators = deserializeAccumulators(state); + attempt.markFinished(accumulators, state.getIOMetrics()); + return true; + + case CANCELED: + // this deserialization is exception-free + accumulators = deserializeAccumulators(state); + attempt.cancelingComplete(accumulators, state.getIOMetrics()); + return true; + + case FAILED: + // this deserialization is exception-free + accumulators = deserializeAccumulators(state); + attempt.markFailed(state.getError(userClassLoader), accumulators, state.getIOMetrics()); + return true; + + default: + // we mark as failed and return false, which triggers the TaskManager + // to remove the task + attempt.fail(new Exception("TaskManager sent illegal state update: " + state.getExecutionState())); + return false; + } + } + catch (Throwable t) { + ExceptionUtils.rethrowIfFatalErrorOrOOM(t); - switch (state.getExecutionState()) { - case RUNNING: - return attempt.switchToRunning(); - case FINISHED: - try { - Map> userAccumulators = deserializeAccumulators(state); - attempt.markFinished(userAccumulators, state.getIOMetrics()); - } - catch (Exception e) { - LOG.error("Failed to deserialize final accumulator results.", e); - attempt.markFailed(e); - } - return true; - case CANCELED: - Map> userAcc1 = deserializeAccumulators(state); - attempt.cancelingComplete(userAcc1, state.getIOMetrics()); - return true; - case FAILED: - Map> userAcc2 = deserializeAccumulators(state); - attempt.markFailed(state.getError(userClassLoader), userAcc2, state.getIOMetrics()); - return true; - default: - // we mark as failed and return false, which triggers the TaskManager - // to remove the task - attempt.fail(new Exception("TaskManager sent illegal state update: " + state.getExecutionState())); - return false; + // failures during updates leave the ExecutionGraph inconsistent + fail(t); + return false; } } else { @@ -1309,17 +1296,28 @@ public class ExecutionGraph implements AccessExecutionGraph, ArchiveableThis method never throws an exception! + * + * @param state The task execution state from which to deserialize the accumulators. + * @return The deserialized accumulators, of null, if there are no accumulators or an error occurred. + */ private Map> deserializeAccumulators(TaskExecutionState state) { AccumulatorSnapshot serializedAccumulators = state.getAccumulators(); - Map> accumulators = null; + if (serializedAccumulators != null) { try { - accumulators = serializedAccumulators.deserializeUserAccumulators(userClassLoader); - } catch (Exception e) { - LOG.error("Failed to deserialize final accumulator results.", e); + return serializedAccumulators.deserializeUserAccumulators(userClassLoader); + } + catch (Throwable t) { + // we catch Throwable here to include all form of linking errors that may + // occur if user classes are missing in the classpath + LOG.error("Failed to deserialize final accumulator results.", t); } } - return accumulators; + return null; } /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java index 82c376e1e9fa62e8f9cb4ea26ebf986a8187e5c1..668418d9ded2e597c7e5d3881c45777332c86b66 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java @@ -25,7 +25,9 @@ import java.io.Serializable; * An instance of this class represents a snapshot of the io-related metrics of a single task. */ public class IOMetrics implements Serializable { + private static final long serialVersionUID = -7208093607556457183L; + protected long numRecordsIn; protected long numRecordsOut; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobInformation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobInformation.java index 6e3c1e880eae5e2a6707964d22a99dc32a227992..f497f8c21a45840932b558a79dde3d0db746299e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobInformation.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobInformation.java @@ -93,4 +93,12 @@ public class JobInformation implements Serializable { public Collection getRequiredClasspathURLs() { return requiredClasspathURLs; } + + // ------------------------------------------------------------------------ + + + @Override + public String toString() { + return "JobInformation for '" + jobName + "' (" + jobId + ')'; + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java index 5cc24846801d265d39299916f4ad4cc167507ac2..9395435ac9f994bcb746ed1571b29f5094fe1cdd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java @@ -49,8 +49,9 @@ public class TaskExecutionState implements Serializable { private final SerializedThrowable throwable; - /** Serialized flink and user-defined accumulators */ + /** Serialized user-defined accumulators */ private final AccumulatorSnapshot accumulators; + private final IOMetrics ioMetrics; /** diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java index 98b4c4deeeba53aff3ce44f442acc82268cea641..81162b64d1ce457bef017a3e7089c990e0023f78 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java @@ -18,14 +18,11 @@ package org.apache.flink.runtime.checkpoint; -import akka.actor.ActorSystem; -import akka.testkit.JavaTestKit; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; -import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.executiongraph.ExecutionGraph; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; @@ -37,7 +34,7 @@ import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings; import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.util.SerializedValue; -import org.junit.AfterClass; + import org.junit.Test; import org.mockito.Matchers; @@ -50,13 +47,6 @@ import static org.mockito.Mockito.verify; public class ExecutionGraphCheckpointCoordinatorTest { - private static ActorSystem system = AkkaUtils.createLocalActorSystem(new Configuration()); - - @AfterClass - public static void teardown() { - JavaTestKit.shutdownActorSystem(system); - } - /** * Tests that a shut down checkpoint coordinator calls shutdown on * the store and counter. diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java index 203c547935222b13561854770ca227fc01e1c876..5496e35dd186b2dfdae7ee14522acedd5c8a542d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java @@ -162,7 +162,7 @@ public class ExecutionGraphMetricsTest extends TestLogger { testingRestartStrategy, Collections.emptyList(), Collections.emptyList(), - scheduler, + scheduler, getClass().getClassLoader(), metricGroup);