提交 874d9565 编写于 作者: S Stephan Ewen

[hotfix] [dist. coordination] Small code cleanups in ExecutionGraph and related classes

上级 69843fef
......@@ -820,7 +820,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
void sendPartitionInfos() {
// check if the ExecutionVertex has already been archived and thus cleared the
// partial partition infos queue
if(partialInputChannelDeploymentDescriptors != null && !partialInputChannelDeploymentDescriptors.isEmpty()) {
if (partialInputChannelDeploymentDescriptors != null && !partialInputChannelDeploymentDescriptors.isEmpty()) {
PartialInputChannelDeploymentDescriptor partialInputChannelDeploymentDescriptor;
......@@ -931,7 +931,9 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
}
else if (currentState == CANCELING || currentState == FAILED) {
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("Concurrent canceling/failing of %s while deployment was in progress.", getVertexWithAttempt()));
// this log statement is guarded because the 'getVertexWithAttempt()' method
// performs string concatenations
LOG.debug("Concurrent canceling/failing of {} while deployment was in progress.", getVertexWithAttempt());
}
sendCancelRpcCall();
}
......
......@@ -20,6 +20,7 @@ package org.apache.flink.runtime.executiongraph;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.Archiveable;
import org.apache.flink.api.common.ArchivedExecutionConfig;
import org.apache.flink.api.common.ExecutionConfig;
......@@ -63,7 +64,6 @@ import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableExceptio
import org.apache.flink.runtime.query.KvStateLocationRegistry;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.util.SerializableObject;
import org.apache.flink.runtime.util.SerializedThrowable;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
......@@ -92,6 +92,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
......@@ -131,7 +132,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
/** The lock used to secure all access to mutable fields, especially the tracking of progress
* within the job. */
private final SerializableObject progressLock = new SerializableObject();
private final Object progressLock = new Object();
/** Job specific information like the job id, job name, job configuration, etc. */
private final JobInformation jobInformation;
......@@ -222,7 +223,6 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
/** Checkpoint stats tracker separate from the coordinator in order to be
* available after archiving. */
@SuppressWarnings("NonSerializableFieldInSerializableClass")
private CheckpointStatsTracker checkpointStatsTracker;
// ------ Fields that are only relevant for archived execution graphs ------------
......@@ -235,6 +235,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
/**
* This constructor is for tests only, because it does not include class loading information.
*/
@VisibleForTesting
ExecutionGraph(
ScheduledExecutorService futureExecutor,
Executor ioExecutor,
......@@ -369,24 +370,16 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
CheckpointStatsTracker statsTracker) {
// simple sanity checks
if (interval < 10 || checkpointTimeout < 10) {
throw new IllegalArgumentException();
}
if (state != JobStatus.CREATED) {
throw new IllegalStateException("Job must be in CREATED state");
}
checkArgument(interval >= 10, "checkpoint interval must not be below 10ms");
checkArgument(checkpointTimeout >= 10, "checkpoint timeout must not be below 10ms");
checkState(state == JobStatus.CREATED, "Job must be in CREATED state");
checkState(checkpointCoordinator == null, "checkpointing already enabled");
ExecutionVertex[] tasksToTrigger = collectExecutionVertices(verticesToTrigger);
ExecutionVertex[] tasksToWaitFor = collectExecutionVertices(verticesToWaitFor);
ExecutionVertex[] tasksToCommitTo = collectExecutionVertices(verticesToCommitTo);
// disable to make sure existing checkpoint coordinators are cleared
try {
disableSnaphotCheckpointing();
} catch (Throwable t) {
LOG.error("Error while shutting down checkpointer.");
}
checkpointStatsTracker = checkNotNull(statsTracker, "CheckpointStatsTracker");
// create the coordinator that triggers and commits checkpoints and holds the state
......@@ -416,24 +409,6 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
}
}
/**
* Disables checkpointing.
*
* <p>The shutdown of the checkpoint coordinator might block. Make sure that calls to this
* method don't block the job manager actor and run asynchronously.
*/
public void disableSnaphotCheckpointing() throws Exception {
if (state != JobStatus.CREATED) {
throw new IllegalStateException("Job must be in CREATED state");
}
if (checkpointCoordinator != null) {
checkpointCoordinator.shutdown(state);
checkpointCoordinator = null;
checkpointStatsTracker = null;
}
}
@Override
public CheckpointCoordinator getCheckpointCoordinator() {
return checkpointCoordinator;
......@@ -761,7 +736,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
private void scheduleLazy(SlotProvider slotProvider) throws NoResourceAvailableException {
// simply take the vertices without inputs.
for (ExecutionJobVertex ejv : this.tasks.values()) {
for (ExecutionJobVertex ejv : verticesInCreationOrder) {
if (ejv.getJobVertex().isInputVertex()) {
ejv.scheduleAll(slotProvider, allowQueuedScheduling);
}
......@@ -932,9 +907,9 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
}
public void stop() throws StoppingException {
if(this.isStoppable) {
for(ExecutionVertex ev : this.getAllExecutionVertices()) {
if(ev.getNumberOfInputs() == 0) { // send signal to sources only
if (isStoppable) {
for (ExecutionVertex ev : this.getAllExecutionVertices()) {
if (ev.getNumberOfInputs() == 0) { // send signal to sources only
ev.stop();
}
}
......@@ -1011,7 +986,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
return;
}
// no need to treat other states
// else: concurrent change to execution state, retry
}
}
......@@ -1273,35 +1248,47 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
* @return True, if the task update was properly applied, false, if the execution attempt was not found.
*/
public boolean updateState(TaskExecutionState state) {
Execution attempt = this.currentExecutions.get(state.getID());
final Execution attempt = currentExecutions.get(state.getID());
if (attempt != null) {
try {
Map<String, Accumulator<?, ?>> accumulators;
switch (state.getExecutionState()) {
case RUNNING:
return attempt.switchToRunning();
case FINISHED:
// this deserialization is exception-free
accumulators = deserializeAccumulators(state);
attempt.markFinished(accumulators, state.getIOMetrics());
return true;
case CANCELED:
// this deserialization is exception-free
accumulators = deserializeAccumulators(state);
attempt.cancelingComplete(accumulators, state.getIOMetrics());
return true;
case FAILED:
// this deserialization is exception-free
accumulators = deserializeAccumulators(state);
attempt.markFailed(state.getError(userClassLoader), accumulators, state.getIOMetrics());
return true;
default:
// we mark as failed and return false, which triggers the TaskManager
// to remove the task
attempt.fail(new Exception("TaskManager sent illegal state update: " + state.getExecutionState()));
return false;
}
}
catch (Throwable t) {
ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
switch (state.getExecutionState()) {
case RUNNING:
return attempt.switchToRunning();
case FINISHED:
try {
Map<String, Accumulator<?, ?>> userAccumulators = deserializeAccumulators(state);
attempt.markFinished(userAccumulators, state.getIOMetrics());
}
catch (Exception e) {
LOG.error("Failed to deserialize final accumulator results.", e);
attempt.markFailed(e);
}
return true;
case CANCELED:
Map<String, Accumulator<?, ?>> userAcc1 = deserializeAccumulators(state);
attempt.cancelingComplete(userAcc1, state.getIOMetrics());
return true;
case FAILED:
Map<String, Accumulator<?, ?>> userAcc2 = deserializeAccumulators(state);
attempt.markFailed(state.getError(userClassLoader), userAcc2, state.getIOMetrics());
return true;
default:
// we mark as failed and return false, which triggers the TaskManager
// to remove the task
attempt.fail(new Exception("TaskManager sent illegal state update: " + state.getExecutionState()));
return false;
// failures during updates leave the ExecutionGraph inconsistent
fail(t);
return false;
}
}
else {
......@@ -1309,17 +1296,28 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
}
}
/**
* Deserializes accumulators from a task state update.
*
* <p>This method never throws an exception!
*
* @param state The task execution state from which to deserialize the accumulators.
* @return The deserialized accumulators, of null, if there are no accumulators or an error occurred.
*/
private Map<String, Accumulator<?, ?>> deserializeAccumulators(TaskExecutionState state) {
AccumulatorSnapshot serializedAccumulators = state.getAccumulators();
Map<String, Accumulator<?, ?>> accumulators = null;
if (serializedAccumulators != null) {
try {
accumulators = serializedAccumulators.deserializeUserAccumulators(userClassLoader);
} catch (Exception e) {
LOG.error("Failed to deserialize final accumulator results.", e);
return serializedAccumulators.deserializeUserAccumulators(userClassLoader);
}
catch (Throwable t) {
// we catch Throwable here to include all form of linking errors that may
// occur if user classes are missing in the classpath
LOG.error("Failed to deserialize final accumulator results.", t);
}
}
return accumulators;
return null;
}
/**
......
......@@ -25,7 +25,9 @@ import java.io.Serializable;
* An instance of this class represents a snapshot of the io-related metrics of a single task.
*/
public class IOMetrics implements Serializable {
private static final long serialVersionUID = -7208093607556457183L;
protected long numRecordsIn;
protected long numRecordsOut;
......
......@@ -93,4 +93,12 @@ public class JobInformation implements Serializable {
public Collection<URL> getRequiredClasspathURLs() {
return requiredClasspathURLs;
}
// ------------------------------------------------------------------------
@Override
public String toString() {
return "JobInformation for '" + jobName + "' (" + jobId + ')';
}
}
......@@ -49,8 +49,9 @@ public class TaskExecutionState implements Serializable {
private final SerializedThrowable throwable;
/** Serialized flink and user-defined accumulators */
/** Serialized user-defined accumulators */
private final AccumulatorSnapshot accumulators;
private final IOMetrics ioMetrics;
/**
......
......@@ -18,14 +18,11 @@
package org.apache.flink.runtime.checkpoint;
import akka.actor.ActorSystem;
import akka.testkit.JavaTestKit;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
......@@ -37,7 +34,7 @@ import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.SerializedValue;
import org.junit.AfterClass;
import org.junit.Test;
import org.mockito.Matchers;
......@@ -50,13 +47,6 @@ import static org.mockito.Mockito.verify;
public class ExecutionGraphCheckpointCoordinatorTest {
private static ActorSystem system = AkkaUtils.createLocalActorSystem(new Configuration());
@AfterClass
public static void teardown() {
JavaTestKit.shutdownActorSystem(system);
}
/**
* Tests that a shut down checkpoint coordinator calls shutdown on
* the store and counter.
......
......@@ -162,7 +162,7 @@ public class ExecutionGraphMetricsTest extends TestLogger {
testingRestartStrategy,
Collections.<BlobKey>emptyList(),
Collections.<URL>emptyList(),
scheduler,
scheduler,
getClass().getClassLoader(),
metricGroup);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册