提交 73c73e92 编写于 作者: U Ufuk Celebi 提交者: Till Rohrmann

[FLINK-2354] [runtime] Add job graph and checkpoint recovery

This closes #1153.
上级 3aaee1e5
......@@ -407,6 +407,12 @@ public final class ConfigConstants {
*/
public static final String STATE_BACKEND = "state.backend";
/**
* File system state backend base path for recoverable state handles. Recovery state is written
* to this path and the file state handles are persisted for recovery.
*/
public static final String STATE_BACKEND_FS_RECOVERY_PATH = "state.backend.fs.dir.recovery";
// ----------------------------- Miscellaneous ----------------------------
/**
......@@ -433,6 +439,15 @@ public final class ConfigConstants {
public static final String ZOOKEEPER_LEADER_PATH = "ha.zookeeper.dir.leader";
/** ZooKeeper root path (ZNode) for job graphs. */
public static final String ZOOKEEPER_JOBGRAPHS_PATH = "ha.zookeeper.dir.jobgraphs";
/** ZooKeeper root path (ZNode) for completed checkpoints. */
public static final String ZOOKEEPER_CHECKPOINTS_PATH = "ha.zookeeper.dir.checkpoints";
/** ZooKeeper root path (ZNode) for checkpoint counters. */
public static final String ZOOKEEPER_CHECKPOINT_COUNTER_PATH = "ha.zookeeper.dir.checkpoint-counter";
public static final String ZOOKEEPER_SESSION_TIMEOUT = "ha.zookeeper.client.session-timeout";
public static final String ZOOKEEPER_CONNECTION_TIMEOUT = "ha.zookeeper.client.connection-timeout";
......@@ -699,6 +714,12 @@ public final class ConfigConstants {
public static final String DEFAULT_ZOOKEEPER_LEADER_PATH = "/leader";
public static final String DEFAULT_ZOOKEEPER_JOBGRAPHS_PATH = "/jobgraphs";
public static final String DEFAULT_ZOOKEEPER_CHECKPOINTS_PATH = "/checkpoints";
public static final String DEFAULT_ZOOKEEPER_CHECKPOINT_COUNTER_PATH = "/checkpoint-counter";
public static final int DEFAULT_ZOOKEEPER_SESSION_TIMEOUT = 60000;
public static final int DEFAULT_ZOOKEEPER_CONNECTION_TIMEOUT = 15000;
......
......@@ -21,16 +21,16 @@ package org.apache.flink.runtime.checkpoint;
import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import akka.actor.Props;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.RecoveryMode;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete;
import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint;
......@@ -38,7 +38,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
......@@ -48,13 +47,19 @@ import java.util.Timer;
import java.util.TimerTask;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
/**
* The checkpoint coordinator coordinates the distributed snapshots of operators and state.
* It triggers the checkpoint by sending the messages to the relevant tasks and collects the
* checkpoint acknowledgements. It also collects and maintains the overview of the state handles
* reported by the tasks that acknowledge the checkpoint.
*
* <p>Depending on the configured {@link RecoveryMode}, the behaviour of the {@link
* CompletedCheckpointStore} and {@link CheckpointIDCounter} change. The default standalone
* implementations don't support any recovery.
*/
public class CheckpointCoordinator {
......@@ -79,12 +84,20 @@ public class CheckpointCoordinator {
private final ExecutionVertex[] tasksToCommitTo;
private final Map<Long, PendingCheckpoint> pendingCheckpoints;
private final ArrayDeque<SuccessfulCheckpoint> completedCheckpoints;
/**
* Completed checkpoints. Implementations can be blocking. Make sure calls to methods
* accessing this don't block the job manager actor and run asynchronously.
*/
private final CompletedCheckpointStore completedCheckpointStore;
private final ArrayDeque<Long> recentPendingCheckpoints;
private final AtomicLong checkpointIdCounter = new AtomicLong(1);
/**
* Checkpoint ID counter to ensure ascending IDs. In case of job manager failures, these
* need to be ascending across job managers.
*/
private final CheckpointIDCounter checkpointIdCounter;
private final AtomicInteger numUnsuccessfulCheckpointsTriggers = new AtomicInteger();
......@@ -93,8 +106,6 @@ public class CheckpointCoordinator {
private final long checkpointTimeout;
private final int numSuccessfulCheckpointsToRetain;
private TimerTask periodicScheduler;
private ActorGateway jobStatusListener;
......@@ -110,61 +121,62 @@ public class CheckpointCoordinator {
public CheckpointCoordinator(
JobID job,
int numSuccessfulCheckpointsToRetain,
long checkpointTimeout,
ExecutionVertex[] tasksToTrigger,
ExecutionVertex[] tasksToWaitFor,
ExecutionVertex[] tasksToCommitTo,
ClassLoader userClassLoader) {
ClassLoader userClassLoader,
CheckpointIDCounter checkpointIDCounter,
CompletedCheckpointStore completedCheckpointStore,
RecoveryMode recoveryMode) throws Exception {
// some sanity checks
if (job == null || tasksToTrigger == null ||
tasksToWaitFor == null || tasksToCommitTo == null) {
throw new NullPointerException();
}
if (numSuccessfulCheckpointsToRetain < 1) {
throw new IllegalArgumentException("Must retain at least one successful checkpoint");
}
if (checkpointTimeout < 1) {
throw new IllegalArgumentException("Checkpoint timeout must be larger than zero");
}
// Sanity check
checkArgument(checkpointTimeout >= 1, "Checkpoint timeout must be larger than zero");
this.job = job;
this.numSuccessfulCheckpointsToRetain = numSuccessfulCheckpointsToRetain;
this.job = checkNotNull(job);
this.checkpointTimeout = checkpointTimeout;
this.tasksToTrigger = tasksToTrigger;
this.tasksToWaitFor = tasksToWaitFor;
this.tasksToCommitTo = tasksToCommitTo;
this.tasksToTrigger = checkNotNull(tasksToTrigger);
this.tasksToWaitFor = checkNotNull(tasksToWaitFor);
this.tasksToCommitTo = checkNotNull(tasksToCommitTo);
this.pendingCheckpoints = new LinkedHashMap<Long, PendingCheckpoint>();
this.completedCheckpoints = new ArrayDeque<SuccessfulCheckpoint>(numSuccessfulCheckpointsToRetain + 1);
this.completedCheckpointStore = checkNotNull(completedCheckpointStore);
this.recentPendingCheckpoints = new ArrayDeque<Long>(NUM_GHOST_CHECKPOINT_IDS);
this.userClassLoader = userClassLoader;
this.checkpointIdCounter = checkNotNull(checkpointIDCounter);
checkpointIDCounter.start();
timer = new Timer("Checkpoint Timer", true);
this.timer = new Timer("Checkpoint Timer", true);
// Add shutdown hook to clean up state handles
shutdownHook = new Thread(new Runnable() {
@Override
public void run() {
try {
CheckpointCoordinator.this.shutdown();
}
catch (Throwable t) {
LOG.error("Error during shutdown of blob service via JVM shutdown hook: " +
t.getMessage(), t);
if (recoveryMode == RecoveryMode.STANDALONE) {
// Add shutdown hook to clean up state handles when no checkpoint recovery is
// possible. In case of another configured recovery mode, the checkpoints need to be
// available for the standby job managers.
this.shutdownHook = new Thread(new Runnable() {
@Override
public void run() {
try {
CheckpointCoordinator.this.shutdown();
}
catch (Throwable t) {
LOG.error("Error during shutdown of checkpoint coordniator via " +
"JVM shutdown hook: " + t.getMessage(), t);
}
}
}
});
});
try {
// Add JVM shutdown hook to call shutdown of service
Runtime.getRuntime().addShutdownHook(shutdownHook);
}
catch (IllegalStateException ignored) {
// JVM is already shutting down. No need to do anything.
try {
// Add JVM shutdown hook to call shutdown of service
Runtime.getRuntime().addShutdownHook(shutdownHook);
}
catch (IllegalStateException ignored) {
// JVM is already shutting down. No need to do anything.
}
catch (Throwable t) {
LOG.error("Cannot register checkpoint coordinator shutdown hook.", t);
}
}
catch (Throwable t) {
LOG.error("Cannot register checkpoint coordinator shutdown hook.", t);
else {
this.shutdownHook = null;
}
}
......@@ -178,41 +190,39 @@ public class CheckpointCoordinator {
* After this method has been called, the coordinator does not accept and further
* messages and cannot trigger any further checkpoints.
*/
public void shutdown() {
public void shutdown() throws Exception {
synchronized (lock) {
try {
if (shutdown) {
return;
}
shutdown = true;
LOG.info("Stopping checkpoint coordinator for job " + job);
// shut down the thread that handles the timeouts
timer.cancel();
// make sure that the actor does not linger
if (jobStatusListener != null) {
jobStatusListener.tell(PoisonPill.getInstance());
jobStatusListener = null;
}
// the scheduling thread needs also to go away
if (periodicScheduler != null) {
periodicScheduler.cancel();
periodicScheduler = null;
}
// clear and discard all pending checkpoints
for (PendingCheckpoint pending : pendingCheckpoints.values()) {
pending.discard(userClassLoader, true);
}
pendingCheckpoints.clear();
// clean and discard all successful checkpoints
for (SuccessfulCheckpoint checkpoint : completedCheckpoints) {
checkpoint.discard(userClassLoader);
try {
if (!shutdown) {
shutdown = true;
LOG.info("Stopping checkpoint coordinator for job " + job);
// shut down the thread that handles the timeouts
timer.cancel();
// make sure that the actor does not linger
if (jobStatusListener != null) {
jobStatusListener.tell(PoisonPill.getInstance());
jobStatusListener = null;
}
// the scheduling thread needs also to go away
if (periodicScheduler != null) {
periodicScheduler.cancel();
periodicScheduler = null;
}
checkpointIdCounter.stop();
// clear and discard all pending checkpoints
for (PendingCheckpoint pending : pendingCheckpoints.values()) {
pending.discard(userClassLoader, true);
}
pendingCheckpoints.clear();
// clean and discard all successful checkpoints
completedCheckpointStore.discardAllCheckpoints();
}
completedCheckpoints.clear();
}
finally {
// Remove shutdown hook to prevent resource leaks, unless this is invoked by the
......@@ -244,7 +254,7 @@ public class CheckpointCoordinator {
* Triggers a new checkpoint and uses the current system time as the
* checkpoint time.
*/
public void triggerCheckpoint() {
public void triggerCheckpoint() throws Exception {
triggerCheckpoint(System.currentTimeMillis());
}
......@@ -254,7 +264,7 @@ public class CheckpointCoordinator {
*
* @param timestamp The timestamp for the checkpoint.
*/
public boolean triggerCheckpoint(final long timestamp) {
public boolean triggerCheckpoint(final long timestamp) throws Exception {
if (shutdown) {
LOG.error("Cannot trigger checkpoint, checkpoint coordinator has been shutdown.");
return false;
......@@ -354,7 +364,7 @@ public class CheckpointCoordinator {
}
}
public void receiveAcknowledgeMessage(AcknowledgeCheckpoint message) {
public void receiveAcknowledgeMessage(AcknowledgeCheckpoint message) throws Exception {
if (shutdown || message == null) {
return;
}
......@@ -365,7 +375,7 @@ public class CheckpointCoordinator {
final long checkpointId = message.getCheckpointId();
SuccessfulCheckpoint completed = null;
CompletedCheckpoint completed = null;
PendingCheckpoint checkpoint;
synchronized (lock) {
// we need to check inside the lock for being shutdown as well, otherwise we
......@@ -380,13 +390,13 @@ public class CheckpointCoordinator {
if (checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getState())) {
if (checkpoint.isFullyAcknowledged()) {
completed = checkpoint.toCompletedCheckpoint();
completedCheckpointStore.addCheckpoint(completed);
LOG.info("Completed checkpoint " + checkpointId);
LOG.debug(completed.getStates().toString());
completed = checkpoint.toCompletedCheckpoint();
completedCheckpoints.addLast(completed);
if (completedCheckpoints.size() > numSuccessfulCheckpointsToRetain) {
completedCheckpoints.removeFirst().discard(userClassLoader);
}
pendingCheckpoints.remove(checkpointId);
rememberRecentCheckpointId(checkpointId);
......@@ -456,25 +466,30 @@ public class CheckpointCoordinator {
// Checkpoint State Restoring
// --------------------------------------------------------------------------------------------
public void restoreLatestCheckpointedState(Map<JobVertexID, ExecutionJobVertex> tasks,
boolean errorIfNoCheckpoint,
boolean allOrNothingState) throws Exception {
public void restoreLatestCheckpointedState(
Map<JobVertexID, ExecutionJobVertex> tasks,
boolean errorIfNoCheckpoint,
boolean allOrNothingState) throws Exception {
synchronized (lock) {
if (shutdown) {
throw new IllegalStateException("CheckpointCoordinator is shut down");
}
if (completedCheckpoints.isEmpty()) {
// Recover the checkpoints
completedCheckpointStore.recover();
// restore from the latest checkpoint
CompletedCheckpoint latest = completedCheckpointStore.getLatestCheckpoint();
if (latest == null) {
if (errorIfNoCheckpoint) {
throw new IllegalStateException("No completed checkpoint available");
} else {
return;
}
}
// restore from the latest checkpoint
SuccessfulCheckpoint latest = completedCheckpoints.getLast();
if (allOrNothingState) {
Map<ExecutionJobVertex, Integer> stateCounts = new HashMap<ExecutionJobVertex, Integer>();
......@@ -519,7 +534,9 @@ public class CheckpointCoordinator {
}
public int getNumberOfRetainedSuccessfulCheckpoints() {
return this.completedCheckpoints.size();
synchronized (lock) {
return completedCheckpointStore.getNumberOfRetainedCheckpoints();
}
}
public Map<Long, PendingCheckpoint> getPendingCheckpoints() {
......@@ -528,9 +545,9 @@ public class CheckpointCoordinator {
}
}
public List<SuccessfulCheckpoint> getSuccessfulCheckpoints() {
public List<CompletedCheckpoint> getSuccessfulCheckpoints() throws Exception {
synchronized (lock) {
return new ArrayList<SuccessfulCheckpoint>(this.completedCheckpoints);
return completedCheckpointStore.getAllCheckpoints();
}
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.checkpoint;
/**
* A checkpoint ID counter.
*/
public interface CheckpointIDCounter {
/**
* Starts the {@link CheckpointIDCounter} service.
*/
void start() throws Exception;
/**
* Stops the {@link CheckpointIDCounter} service.
*/
void stop() throws Exception;
/**
* Atomically increments the current checkpoint ID.
*
* @return The previous checkpoint ID
*/
long getAndIncrement() throws Exception;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.checkpoint;
import org.apache.flink.api.common.JobID;
/**
* A factory for per Job checkpoint recovery components.
*/
public interface CheckpointRecoveryFactory {
/**
* The number of {@link CompletedCheckpoint} instances to retain.
*/
int NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN = 1;
/**
* Starts the {@link CheckpointRecoveryFactory} service.
*/
void start();
/**
* Stops the {@link CheckpointRecoveryFactory} service.
*/
void stop();
/**
* Creates a {@link CompletedCheckpointStore} instance for a job.
*
* @param jobId Job ID to recover checkpoints for
* @param userClassLoader User code class loader of the job
* @return {@link CompletedCheckpointStore} instance for the job
*/
CompletedCheckpointStore createCompletedCheckpoints(JobID jobId, ClassLoader userClassLoader)
throws Exception;
/**
* Creates a {@link CheckpointIDCounter} instance for a job.
*
* @param jobId Job ID to recover checkpoints for
* @return {@link CheckpointIDCounter} instance for the job
*/
CheckpointIDCounter createCheckpointIDCounter(JobID jobId) throws Exception;
}
......@@ -19,29 +19,28 @@
package org.apache.flink.runtime.checkpoint;
import org.apache.flink.api.common.JobID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
/**
* A successful checkpoint describes a checkpoint after all required tasks acknowledged it (with their state)
* and that is considered completed.
*/
public class SuccessfulCheckpoint {
private static final Logger LOG = LoggerFactory.getLogger(SuccessfulCheckpoint.class);
public class CompletedCheckpoint implements Serializable {
private static final long serialVersionUID = -8360248179615702014L;
private final JobID job;
private final long checkpointID;
private final long timestamp;
private final List<StateForTask> states;
private final ArrayList<StateForTask> states;
public SuccessfulCheckpoint(JobID job, long checkpointID, long timestamp, List<StateForTask> states) {
public CompletedCheckpoint(JobID job, long checkpointID, long timestamp, ArrayList<StateForTask> states) {
this.job = job;
this.checkpointID = checkpointID;
this.timestamp = timestamp;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.checkpoint;
import java.util.List;
/**
* A bounded LIFO-queue of {@link CompletedCheckpoint} instances.
*/
public interface CompletedCheckpointStore {
/**
* Recover available {@link CompletedCheckpoint} instances.
*
* <p>After a call to this method, {@link #getLatestCheckpoint()} returns the latest
* available checkpoint.
*/
void recover() throws Exception;
/**
* Adds a {@link CompletedCheckpoint} instance to the list of completed checkpoints.
*
* <p>Only a bounded number of checkpoints is kept. When exceeding the maximum number of
* retained checkpoints, the oldest one will be discarded via {@link
* CompletedCheckpoint#discard(ClassLoader)}.
*/
void addCheckpoint(CompletedCheckpoint checkpoint) throws Exception;
/**
* Returns the latest {@link CompletedCheckpoint} instance or <code>null</code> if none was
* added.
*/
CompletedCheckpoint getLatestCheckpoint() throws Exception;
/**
* Discards all added {@link CompletedCheckpoint} instances via {@link
* CompletedCheckpoint#discard(ClassLoader)}.
*/
void discardAllCheckpoints() throws Exception;
/**
* Returns all {@link CompletedCheckpoint} instances.
*
* <p>Returns an empty list if no checkpoint has been added yet.
*/
List<CompletedCheckpoint> getAllCheckpoints() throws Exception;
/**
* Returns the current number of retained checkpoints.
*/
int getNumberOfRetainedCheckpoints();
}
......@@ -31,7 +31,7 @@ import org.apache.flink.util.SerializedValue;
/**
* A pending checkpoint is a checkpoint that has been started, but has not been
* acknowledged by all tasks that need to acknowledge it. Once all tasks have
* acknowledged it, it becomes a {@link SuccessfulCheckpoint}.
* acknowledged it, it becomes a {@link CompletedCheckpoint}.
*
* <p>Note that the pending checkpoint, as well as the successful checkpoint keep the
* state handles always as serialized values, never as actual values.</p>
......@@ -109,13 +109,13 @@ public class PendingCheckpoint {
return collectedStates;
}
public SuccessfulCheckpoint toCompletedCheckpoint() {
public CompletedCheckpoint toCompletedCheckpoint() {
synchronized (lock) {
if (discarded) {
throw new IllegalStateException("pending checkpoint is discarded");
}
if (notYetAcknowledgedTasks.isEmpty()) {
SuccessfulCheckpoint completed = new SuccessfulCheckpoint(jobId, checkpointId,
CompletedCheckpoint completed = new CompletedCheckpoint(jobId, checkpointId,
checkpointTimestamp, new ArrayList<StateForTask>(collectedStates));
discard(null, false);
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.checkpoint;
import org.apache.flink.runtime.jobmanager.RecoveryMode;
import java.util.concurrent.atomic.AtomicLong;
/**
* {@link CheckpointIDCounter} instances for JobManagers running in {@link RecoveryMode#STANDALONE}.
*
* <p>Simple wrapper of an {@link AtomicLong}. This is sufficient, because job managers are not
* recoverable in this recovery mode.
*/
public class StandaloneCheckpointIDCounter implements CheckpointIDCounter {
private final AtomicLong checkpointIdCounter = new AtomicLong(1);
@Override
public void start() throws Exception {
}
@Override
public void stop() throws Exception {
}
@Override
public long getAndIncrement() throws Exception {
return checkpointIdCounter.getAndIncrement();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.checkpoint;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobmanager.RecoveryMode;
/**
* {@link CheckpointCoordinator} components in {@link RecoveryMode#STANDALONE}.
*/
public class StandaloneCheckpointRecoveryFactory implements CheckpointRecoveryFactory {
@Override
public void start() {
// Nothing to do
}
@Override
public void stop() {
// Nothing to do
}
@Override
public CompletedCheckpointStore createCompletedCheckpoints(JobID jobId, ClassLoader userClassLoader)
throws Exception {
return new StandaloneCompletedCheckpointStore(CheckpointRecoveryFactory
.NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN, userClassLoader);
}
@Override
public CheckpointIDCounter createCheckpointIDCounter(JobID ignored) {
return new StandaloneCheckpointIDCounter();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.checkpoint;
import org.apache.flink.runtime.jobmanager.RecoveryMode;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
/**
* {@link CompletedCheckpointStore} for JobManagers running in {@link RecoveryMode#STANDALONE}.
*/
class StandaloneCompletedCheckpointStore implements CompletedCheckpointStore {
/** The maximum number of checkpoints to retain (at least 1). */
private final int maxNumberOfCheckpointsToRetain;
/** User class loader for discarding {@link CompletedCheckpoint} instances. */
private final ClassLoader userClassLoader;
/** The completed checkpoints. */
private final ArrayDeque<CompletedCheckpoint> checkpoints;
/**
* Creates {@link StandaloneCompletedCheckpointStore}.
*
* @param maxNumberOfCheckpointsToRetain The maximum number of checkpoints to retain (at
* least 1). Adding more checkpoints than this results
* in older checkpoints being discarded.
* @param userClassLoader The user class loader used to discard checkpoints
*/
public StandaloneCompletedCheckpointStore(
int maxNumberOfCheckpointsToRetain,
ClassLoader userClassLoader) {
checkArgument(maxNumberOfCheckpointsToRetain >= 1, "Must retain at least one checkpoint.");
this.maxNumberOfCheckpointsToRetain = maxNumberOfCheckpointsToRetain;
this.userClassLoader = checkNotNull(userClassLoader, "User class loader");
this.checkpoints = new ArrayDeque<>(maxNumberOfCheckpointsToRetain + 1);
}
@Override
public void recover() throws Exception {
// Nothing to do
}
@Override
public void addCheckpoint(CompletedCheckpoint checkpoint) {
checkpoints.addLast(checkpoint);
if (checkpoints.size() > maxNumberOfCheckpointsToRetain) {
checkpoints.removeFirst().discard(userClassLoader);
}
}
@Override
public CompletedCheckpoint getLatestCheckpoint() {
return checkpoints.isEmpty() ? null : checkpoints.getLast();
}
@Override
public List<CompletedCheckpoint> getAllCheckpoints() {
return new ArrayList<>(checkpoints);
}
@Override
public int getNumberOfRetainedCheckpoints() {
return checkpoints.size();
}
@Override
public void discardAllCheckpoints() {
for (CompletedCheckpoint checkpoint : checkpoints) {
checkpoint.discard(userClassLoader);
}
checkpoints.clear();
}
}
......@@ -24,6 +24,11 @@ import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serializable;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
/**
* Simple bean to describe the state belonging to a parallel operator.
* Since we hold the state across execution attempts, we identify a task by its
......@@ -34,8 +39,10 @@ import org.slf4j.LoggerFactory;
* Furthermore, the state may involve user-defined classes that are not accessible without
* the respective classloader.
*/
public class StateForTask {
public class StateForTask implements Serializable {
private static final long serialVersionUID = -2394696997971923995L;
private static final Logger LOG = LoggerFactory.getLogger(StateForTask.class);
/** The state of the parallel operator */
......@@ -48,12 +55,10 @@ public class StateForTask {
private final int subtask;
public StateForTask(SerializedValue<StateHandle<?>> state, JobVertexID operatorId, int subtask) {
if (state == null || operatorId == null || subtask < 0) {
throw new IllegalArgumentException();
}
this.state = state;
this.operatorId = operatorId;
this.state = checkNotNull(state, "State");
this.operatorId = checkNotNull(operatorId, "Operator ID");
checkArgument(subtask >= 0, "Negative subtask index");
this.subtask = subtask;
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.checkpoint;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.shared.SharedCount;
import org.apache.curator.framework.recipes.shared.VersionedValue;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.flink.runtime.jobmanager.RecoveryMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static com.google.common.base.Preconditions.checkNotNull;
/**
* {@link CheckpointIDCounter} instances for JobManagers running in {@link RecoveryMode#ZOOKEEPER}.
*
* <p>Each counter creates a ZNode:
* <pre>
* +----O /flink/checkpoint-counter/&lt;job-id&gt; 1 [persistent]
* .
* .
* .
* +----O /flink/checkpoint-counter/&lt;job-id&gt; N [persistent]
* </pre>
*
* <p>The checkpoints IDs are required to be ascending (per job). In order to guarantee this in case
* of job manager failures we use ZooKeeper to have a shared counter across job manager instances.
*/
public class ZooKeeperCheckpointIDCounter implements CheckpointIDCounter {
private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperCheckpointIDCounter.class);
/** Curator ZooKeeper client */
private final CuratorFramework client;
/** Path of the shared count */
private final String counterPath;
/** Curator recipe for shared counts */
private final SharedCount sharedCount;
/** Connection state listener to monitor the client connection */
private final SharedCountConnectionStateListener connStateListener =
new SharedCountConnectionStateListener();
/**
* Creates a {@link ZooKeeperCheckpointIDCounter} instance.
*
* @param client Curator ZooKeeper client
* @param counterPath ZooKeeper path for the counter. It's sufficient to have a path per-job.
* @throws Exception
*/
public ZooKeeperCheckpointIDCounter(CuratorFramework client, String counterPath) throws Exception {
this.client = checkNotNull(client, "Curator client");
this.counterPath = checkNotNull(counterPath, "Counter path");
this.sharedCount = new SharedCount(client, counterPath, 1);
}
@Override
public void start() throws Exception {
sharedCount.start();
client.getConnectionStateListenable().addListener(connStateListener);
}
@Override
public void stop() throws Exception {
sharedCount.close();
client.getConnectionStateListenable().removeListener(connStateListener);
LOG.info("Removing {} from ZooKeeper", counterPath);
client.delete().deletingChildrenIfNeeded().inBackground().forPath(counterPath);
}
@Override
public long getAndIncrement() throws Exception {
while (true) {
ConnectionState connState = connStateListener.getLastState();
if (connState != null) {
throw new IllegalStateException("Connection state: " + connState);
}
VersionedValue<Integer> current = sharedCount.getVersionedValue();
Integer newCount = current.getValue() + 1;
if (sharedCount.trySetCount(current, newCount)) {
return current.getValue();
}
}
}
/**
* Connection state listener. In case of {@link ConnectionState#SUSPENDED} or {@link
* ConnectionState#LOST} we are not guaranteed to read a current count from ZooKeeper.
*/
private class SharedCountConnectionStateListener implements ConnectionStateListener {
private volatile ConnectionState lastState;
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {
if (newState == ConnectionState.SUSPENDED || newState == ConnectionState.LOST) {
lastState = newState;
}
}
private ConnectionState getLastState() {
return lastState;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.checkpoint;
import org.apache.curator.framework.CuratorFramework;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobmanager.RecoveryMode;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import static com.google.common.base.Preconditions.checkNotNull;
/**
* {@link CheckpointCoordinator} components in {@link RecoveryMode#ZOOKEEPER}.
*/
public class ZooKeeperCheckpointRecoveryFactory implements CheckpointRecoveryFactory {
private final CuratorFramework client;
private final Configuration config;
public ZooKeeperCheckpointRecoveryFactory(CuratorFramework client, Configuration config) {
this.client = checkNotNull(client, "Curator client");
this.config = checkNotNull(config, "Configuration");
}
@Override
public void start() {
// Nothing to do
}
@Override
public void stop() {
client.close();
}
@Override
public CompletedCheckpointStore createCompletedCheckpoints(JobID jobId, ClassLoader userClassLoader)
throws Exception {
return ZooKeeperUtils.createCompletedCheckpoints(client, config, jobId,
NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN, userClassLoader);
}
@Override
public CheckpointIDCounter createCheckpointIDCounter(JobID jobID) throws Exception {
return ZooKeeperUtils.createCheckpointIDCounter(client, config, jobID);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.checkpoint;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.CuratorEventType;
import org.apache.curator.utils.ZKPaths;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.jobmanager.RecoveryMode;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.runtime.state.StateHandleProvider;
import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.ConcurrentModificationException;
import java.util.List;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
/**
* {@link CompletedCheckpointStore} for JobManagers running in {@link RecoveryMode#ZOOKEEPER}.
*
* <p>Checkpoints are added under a ZNode per job:
* <pre>
* +----O /flink/checkpoints/&lt;job-id&gt; [persistent]
* . |
* . +----O /flink/checkpoints/&lt;job-id&gt;/1 [persistent]
* . . .
* . . .
* . . .
* . +----O /flink/checkpoints/&lt;job-id&gt;/N [persistent]
* </pre>
*
* <p>During recovery, the latest checkpoint is read from ZooKeeper. If there is more than one,
* only the latest one is used and older ones are discarded (even if the maximum number
* of retained checkpoints is greater than one).
*
* <p>If there is a network partition and multiple JobManagers run concurrent checkpoints for the
* same program, it is OK to take any valid successful checkpoint as long as the "history" of
* checkpoints is consistent. Currently, after recovery we start out with only a single
* checkpoint to circumvent those situations.
*/
public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointStore {
private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperCompletedCheckpointStore.class);
/** Curator ZooKeeper client */
private final CuratorFramework client;
/** Completed checkpoints in ZooKeeper */
private final ZooKeeperStateHandleStore<CompletedCheckpoint> checkpointsInZooKeeper;
/** The maximum number of checkpoints to retain (at least 1). */
private final int maxNumberOfCheckpointsToRetain;
/** User class loader for discarding {@link CompletedCheckpoint} instances. */
private final ClassLoader userClassLoader;
/** Local completed checkpoints. */
private final ArrayDeque<Tuple2<StateHandle<CompletedCheckpoint>, String>> checkpointStateHandles;
/**
* Creates a {@link ZooKeeperCompletedCheckpointStore} instance.
*
* @param maxNumberOfCheckpointsToRetain The maximum number of checkpoints to retain (at
* least 1). Adding more checkpoints than this results
* in older checkpoints being discarded. On recovery,
* we will only start with a single checkpoint.
* @param userClassLoader The user class loader used to discard checkpoints
* @param client The Curator ZooKeeper client
* @param checkpointsPath The ZooKeeper path for the checkpoints (needs to
* start with a '/')
* @param stateHandleProvider The state handle provider for checkpoints
* @throws Exception
*/
public ZooKeeperCompletedCheckpointStore(
int maxNumberOfCheckpointsToRetain,
ClassLoader userClassLoader,
CuratorFramework client,
String checkpointsPath,
StateHandleProvider<CompletedCheckpoint> stateHandleProvider) throws Exception {
checkArgument(maxNumberOfCheckpointsToRetain >= 1, "Must retain at least one checkpoint.");
this.maxNumberOfCheckpointsToRetain = maxNumberOfCheckpointsToRetain;
this.userClassLoader = checkNotNull(userClassLoader, "User class loader");
checkNotNull(client, "Curator client");
checkNotNull(checkpointsPath, "Checkpoints path");
checkNotNull(stateHandleProvider, "State handle provider");
// Ensure that the checkpoints path exists
client.newNamespaceAwareEnsurePath(checkpointsPath)
.ensure(client.getZookeeperClient());
// All operations will have the path as root
this.client = client.usingNamespace(client.getNamespace() + checkpointsPath);
this.checkpointsInZooKeeper = new ZooKeeperStateHandleStore<>(
this.client, stateHandleProvider);
this.checkpointStateHandles = new ArrayDeque<>(maxNumberOfCheckpointsToRetain + 1);
LOG.info("Initialized in '{}'.", checkpointsPath);
}
/**
* Gets the latest checkpoint from ZooKeeper and removes all others.
*
* <p><strong>Important</strong>: Even if there are more than one checkpoint in ZooKeeper,
* this will only recover the latest and discard the others. Otherwise, there is no guarantee
* that the history of checkpoints is consistent.
*/
@Override
public void recover() throws Exception {
LOG.info("Recovering checkpoints from ZooKeeper.");
// Get all there is first
List<Tuple2<StateHandle<CompletedCheckpoint>, String>> initialCheckpoints;
while (true) {
try {
initialCheckpoints = checkpointsInZooKeeper.getAllSortedByName();
break;
}
catch (ConcurrentModificationException e) {
LOG.warn("Concurrent modification while reading from ZooKeeper. Retrying.");
}
}
int numberOfInitialCheckpoints = initialCheckpoints.size();
LOG.info("Found {} checkpoints in ZooKeeper.", numberOfInitialCheckpoints);
if (numberOfInitialCheckpoints > 0) {
// Take the last one. This is the latest checkpoints, because path names are strictly
// increasing (checkpoint ID).
Tuple2<StateHandle<CompletedCheckpoint>, String> latest = initialCheckpoints
.get(numberOfInitialCheckpoints - 1);
CompletedCheckpoint latestCheckpoint = latest.f0.getState(userClassLoader);
checkpointStateHandles.add(latest);
LOG.info("Initialized with {}. Removing all older checkpoints.", latestCheckpoint);
for (int i = 0; i < numberOfInitialCheckpoints - 1; i++) {
try {
removeFromZooKeeperAndDiscardCheckpoint(initialCheckpoints.get(i));
}
catch (Exception e) {
LOG.error("Failed to discard checkpoint", e);
}
}
}
}
/**
* Synchronously writes the new checkpoints to ZooKeeper and asynchronously removes older ones.
*
* @param checkpoint Completed checkpoint to add.
*/
@Override
public void addCheckpoint(CompletedCheckpoint checkpoint) throws Exception {
checkNotNull(checkpoint, "Checkpoint");
// First add the new one. If it fails, we don't want to loose existing data.
String path = String.format("/%s", checkpoint.getCheckpointID());
final StateHandle<CompletedCheckpoint> stateHandle = checkpointsInZooKeeper.add(path, checkpoint);
checkpointStateHandles.addLast(new Tuple2<>(stateHandle, path));
// Everything worked, let's remove a previous checkpoint if necessary.
if (checkpointStateHandles.size() > maxNumberOfCheckpointsToRetain) {
removeFromZooKeeperAndDiscardCheckpoint(checkpointStateHandles.removeFirst());
}
LOG.debug("Added {} to {}.", checkpoint, path);
}
@Override
public CompletedCheckpoint getLatestCheckpoint() throws Exception {
if (checkpointStateHandles.isEmpty()) {
return null;
}
else {
return checkpointStateHandles.getLast().f0.getState(userClassLoader);
}
}
@Override
public List<CompletedCheckpoint> getAllCheckpoints() throws Exception {
List<CompletedCheckpoint> checkpoints = new ArrayList<>(checkpointStateHandles.size());
for (Tuple2<StateHandle<CompletedCheckpoint>, String> stateHandle : checkpointStateHandles) {
checkpoints.add(stateHandle.f0.getState(userClassLoader));
}
return checkpoints;
}
@Override
public int getNumberOfRetainedCheckpoints() {
return checkpointStateHandles.size();
}
@Override
public void discardAllCheckpoints() throws Exception {
for (Tuple2<StateHandle<CompletedCheckpoint>, String> checkpoint : checkpointStateHandles) {
try {
removeFromZooKeeperAndDiscardCheckpoint(checkpoint);
}
catch (Exception e) {
LOG.error("Failed to discard checkpoint.", e);
}
}
checkpointStateHandles.clear();
String path = "/" + client.getNamespace();
LOG.info("Removing {} from ZooKeeper", path);
ZKPaths.deleteChildren(client.getZookeeperClient().getZooKeeper(), path, true);
}
/**
* Removes the state handle from ZooKeeper, discards the checkpoints, and the state handle.
*/
private void removeFromZooKeeperAndDiscardCheckpoint(
final Tuple2<StateHandle<CompletedCheckpoint>, String> stateHandleAndPath) throws Exception {
final BackgroundCallback callback = new BackgroundCallback() {
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
try {
if (event.getType() == CuratorEventType.DELETE) {
if (event.getResultCode() == 0) {
// The checkpoint
CompletedCheckpoint checkpoint = stateHandleAndPath
.f0.getState(userClassLoader);
checkpoint.discard(userClassLoader);
// Discard the state handle
stateHandleAndPath.f0.discardState();
// Discard the checkpoint
LOG.debug("Discarded " + checkpoint);
}
else {
throw new IllegalStateException("Unexpected result code " +
event.getResultCode() + " in '" + event + "' callback.");
}
}
else {
throw new IllegalStateException("Unexpected event type " +
event.getType() + " in '" + event + "' callback.");
}
}
catch (Exception e) {
LOG.error("Failed to discard checkpoint.", e);
}
}
};
// Remove state handle from ZooKeeper first. If this fails, we can still recover, but if
// we remove a state handle and fail to remove it from ZooKeeper, we end up in an
// inconsistent state.
checkpointsInZooKeeper.remove(stateHandleAndPath.f1, callback);
}
}
......@@ -30,6 +30,8 @@ import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
......@@ -39,6 +41,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobmanager.RecoveryMode;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.messages.ExecutionGraphMessages;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
......@@ -110,8 +113,6 @@ public class ExecutionGraph implements Serializable {
/** The log object used for debugging. */
static final Logger LOG = LoggerFactory.getLogger(ExecutionGraph.class);
private static final int NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN = 1;
// --------------------------------------------------------------------------------------------
/** The lock used to secure all access to mutable fields, especially the tracking of progress
......@@ -347,7 +348,11 @@ public class ExecutionGraph implements Serializable {
List<ExecutionJobVertex> verticesToWaitFor,
List<ExecutionJobVertex> verticesToCommitTo,
ActorSystem actorSystem,
UUID leaderSessionID) {
UUID leaderSessionID,
CheckpointIDCounter checkpointIDCounter,
CompletedCheckpointStore completedCheckpointStore,
RecoveryMode recoveryMode) throws Exception {
// simple sanity checks
if (interval < 10 || checkpointTimeout < 10) {
throw new IllegalArgumentException();
......@@ -367,12 +372,14 @@ public class ExecutionGraph implements Serializable {
snapshotCheckpointsEnabled = true;
checkpointCoordinator = new CheckpointCoordinator(
jobID,
NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN,
checkpointTimeout,
tasksToTrigger,
tasksToWaitFor,
tasksToCommitTo,
userClassLoader);
userClassLoader,
checkpointIDCounter,
completedCheckpointStore,
recoveryMode);
// the periodic checkpoint scheduler is activated and deactivated as a result of
// job status changes (running -> on, all other states -> off)
......@@ -382,8 +389,14 @@ public class ExecutionGraph implements Serializable {
interval,
leaderSessionID));
}
public void disableSnaphotCheckpointing() {
/**
* Disables checkpointing.
*
* <p>The shutdown of the checkpoint coordinator might block. Make sure that calls to this
* method don't block the job manager actor and run asynchronously.
*/
public void disableSnaphotCheckpointing() throws Exception {
if (state != JobStatus.CREATED) {
throw new IllegalStateException("Job must be in CREATED state");
}
......@@ -772,6 +785,20 @@ public class ExecutionGraph implements Serializable {
}
}
/**
* Restores the latest checkpointed state.
*
* <p>The recovery of checkpoints might block. Make sure that calls to this method don't
* block the job manager actor and run asynchronously.
*/
public void restoreLatestCheckpointedState() throws Exception {
synchronized (progressLock) {
if (checkpointCoordinator != null) {
checkpointCoordinator.restoreLatestCheckpointedState(getAllVertices(), false, false);
}
}
}
/**
* This method cleans fields that are irrelevant for the archived execution attempt.
*/
......@@ -886,7 +913,13 @@ public class ExecutionGraph implements Serializable {
}
}, executionContext);
} else {
restart();
future(new Callable<Object>() {
@Override
public Object call() throws Exception {
restart();
return null;
}
}, executionContext);
}
break;
}
......@@ -906,7 +939,7 @@ public class ExecutionGraph implements Serializable {
}
}
}
private void postRunCleanup() {
try {
CheckpointCoordinator coord = this.checkpointCoordinator;
......
......@@ -538,4 +538,9 @@ public class JobGraph implements Serializable {
}
}
}
@Override
public String toString() {
return "JobGraph(jobId: " + jobID + ")";
}
}
......@@ -34,6 +34,19 @@ public enum RecoveryMode {
STANDALONE,
ZOOKEEPER;
/**
* Return the configured {@link RecoveryMode}.
*
* @param config The config to parse
* @return Configured recovery mode or {@link ConfigConstants#DEFAULT_RECOVERY_MODE} if not
* configured.
*/
public static RecoveryMode fromConfig(Configuration config) {
return RecoveryMode.valueOf(config.getString(
ConfigConstants.RECOVERY_MODE,
ConfigConstants.DEFAULT_RECOVERY_MODE).toUpperCase());
}
/**
* Returns true if the defined recovery mode supports high availability.
*
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.jobmanager;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import scala.Option;
import java.util.Collections;
import java.util.List;
/**
* {@link SubmittedJobGraph} instances for JobManagers running in {@link RecoveryMode#STANDALONE}.
*
* <p>All operations are NoOps, because {@link JobGraph} instances cannot be recovered in this
* recovery mode.
*/
public class StandaloneSubmittedJobGraphStore implements SubmittedJobGraphStore {
@Override
public void start(SubmittedJobGraphListener jobGraphListener) throws Exception {
// Nothing to do
}
@Override
public void stop() {
// Nothing to do
}
@Override
public void putJobGraph(SubmittedJobGraph jobGraph) throws Exception {
// Nothing to do
}
@Override
public void removeJobGraph(JobID jobId) throws Exception {
// Nothing to do
}
@Override
public Option<SubmittedJobGraph> recoverJobGraph(JobID jobId) throws Exception {
return Option.empty();
}
@Override
public List<SubmittedJobGraph> recoverJobGraphs() throws Exception {
return Collections.emptyList();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.jobmanager;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import java.io.Serializable;
import static com.google.common.base.Preconditions.checkNotNull;
/**
* A recoverable {@link JobGraph} and {@link JobInfo}.
*/
public class SubmittedJobGraph implements Serializable {
private static final long serialVersionUID = 2836099271734771825L;
/** The submitted {@link JobGraph} */
private final JobGraph jobGraph;
/** The {@link JobInfo}. */
private final JobInfo jobInfo;
/**
* Creates a {@link SubmittedJobGraph}.
*
* @param jobGraph The submitted {@link JobGraph}
* @param jobInfo The {@link JobInfo}
*/
public SubmittedJobGraph(JobGraph jobGraph, JobInfo jobInfo) {
this.jobGraph = checkNotNull(jobGraph, "Job graph");
this.jobInfo = checkNotNull(jobInfo, "Job info");
}
/**
* Returns the submitted {@link JobGraph}.
*/
public JobGraph getJobGraph() {
return jobGraph;
}
/**
* Returns the {@link JobID} of the submitted {@link JobGraph}.
*/
public JobID getJobId() {
return jobGraph.getJobID();
}
/**
* Returns the {@link JobInfo} of the client who submitted the {@link JobGraph}.
*/
public JobInfo getJobInfo() throws Exception {
return jobInfo;
}
@Override
public String toString() {
return String.format("SubmittedJobGraph(%s, %s)", jobGraph, jobInfo);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.jobmanager;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import scala.Option;
import java.util.List;
/**
* {@link SubmittedJobGraph} instances for recovery.
*/
public interface SubmittedJobGraphStore {
/**
* Starts the {@link SubmittedJobGraphStore} service.
*/
void start(SubmittedJobGraphListener jobGraphListener) throws Exception;
/**
* Stops the {@link SubmittedJobGraphStore} service.
*/
void stop() throws Exception;
/**
* Returns a list of all submitted {@link JobGraph} instances.
*/
List<SubmittedJobGraph> recoverJobGraphs() throws Exception;
/**
* Returns the {@link SubmittedJobGraph} with the given {@link JobID}.
*
* <p>An Exception is thrown, if no job graph with the given ID exists.
*/
Option<SubmittedJobGraph> recoverJobGraph(JobID jobId) throws Exception;
/**
* Adds the {@link SubmittedJobGraph} instance.
*
* <p>If a job graph with the same {@link JobID} exists, it is replaced.
*/
void putJobGraph(SubmittedJobGraph jobGraph) throws Exception;
/**
* Removes the {@link SubmittedJobGraph} with the given {@link JobID} if it exists.
*/
void removeJobGraph(JobID jobId) throws Exception;
/**
* A listener for {@link SubmittedJobGraph} instances. This is used to react to races between
* multiple running {@link SubmittedJobGraphStore} instances (on multiple job managers).
*/
interface SubmittedJobGraphListener {
/**
* Callback for {@link SubmittedJobGraph} instances added by a different {@link
* SubmittedJobGraphStore} instance.
*
* <p><strong>Important:</strong> It is possible to get false positives and be notified
* about a job graph, which was added by this instance.
*
* @param jobId The {@link JobID} of the added job graph
*/
void onAddedJobGraph(JobID jobId);
/**
* Callback for {@link SubmittedJobGraph} instances removed by a different {@link
* SubmittedJobGraphStore} instance.
*
* @param jobId The {@link JobID} of the removed job graph
*/
void onRemovedJobGraph(JobID jobId);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.jobmanager;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.utils.ZKPaths;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.runtime.state.StateHandleProvider;
import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import java.util.ArrayList;
import java.util.Collections;
import java.util.ConcurrentModificationException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
/**
* {@link SubmittedJobGraph} instances for JobManagers running in {@link RecoveryMode#ZOOKEEPER}.
*
* <p>Each job graph creates ZNode:
* <pre>
* +----O /flink/jobgraphs/&lt;job-id&gt; 1 [persistent]
* .
* .
* .
* +----O /flink/jobgraphs/&lt;job-id&gt; N [persistent]
* </pre>
*
* <p>The root path is watched to detect concurrent modifications in corner situations where
* multiple instances operate concurrently. The job manager acts as a {@link SubmittedJobGraphListener}
* to react to such situations.
*/
public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperSubmittedJobGraphStore.class);
/** Lock to synchronize with the {@link SubmittedJobGraphListener}. */
private final Object cacheLock = new Object();
/** Client (not a namespace facade) */
private final CuratorFramework client;
/** The set of IDs of all added job graphs. */
private final Set<JobID> addedJobGraphs = new HashSet<>();
/** Completed checkpoints in ZooKeeper */
private final ZooKeeperStateHandleStore<SubmittedJobGraph> jobGraphsInZooKeeper;
/**
* Cache to monitor all children. This is used to detect races with other instances working
* on the same state.
*/
private final PathChildrenCache pathCache;
/** The external listener to be notified on races. */
private SubmittedJobGraphListener jobGraphListener;
/** Flag indicating whether this instance is running. */
private boolean isRunning;
public ZooKeeperSubmittedJobGraphStore(
CuratorFramework client,
String currentJobsPath,
StateHandleProvider<SubmittedJobGraph> stateHandleProvider) throws Exception {
checkNotNull(currentJobsPath, "Current jobs path");
checkNotNull(stateHandleProvider, "State handle provider");
// Keep a reference to the original client and not the namespace facade. The namespace
// facade cannot be closed.
this.client = checkNotNull(client, "Curator client");
// Ensure that the job graphs path exists
client.newNamespaceAwareEnsurePath(currentJobsPath)
.ensure(client.getZookeeperClient());
// All operations will have the path as root
client = client.usingNamespace(client.getNamespace() + currentJobsPath);
this.jobGraphsInZooKeeper = new ZooKeeperStateHandleStore<>(client, stateHandleProvider);
this.pathCache = new PathChildrenCache(client, "/", false);
pathCache.getListenable().addListener(new SubmittedJobGraphsPathCacheListener());
}
@Override
public void start(SubmittedJobGraphListener jobGraphListener) throws Exception {
synchronized (cacheLock) {
if (!isRunning) {
this.jobGraphListener = jobGraphListener;
pathCache.start();
isRunning = true;
}
}
}
@Override
public void stop() throws Exception {
synchronized (cacheLock) {
if (isRunning) {
jobGraphListener = null;
pathCache.close();
client.close();
isRunning = false;
}
}
}
@Override
public List<SubmittedJobGraph> recoverJobGraphs() throws Exception {
synchronized (cacheLock) {
verifyIsRunning();
List<Tuple2<StateHandle<SubmittedJobGraph>, String>> submitted;
while (true) {
try {
submitted = jobGraphsInZooKeeper.getAll();
break;
}
catch (ConcurrentModificationException e) {
LOG.warn("Concurrent modification while reading from ZooKeeper. Retrying.");
}
}
if (submitted.size() != 0) {
List<SubmittedJobGraph> jobGraphs = new ArrayList<>(submitted.size());
for (Tuple2<StateHandle<SubmittedJobGraph>, String> jobStateHandle : submitted) {
SubmittedJobGraph jobGraph = jobStateHandle
.f0.getState(ClassLoader.getSystemClassLoader());
addedJobGraphs.add(jobGraph.getJobId());
jobGraphs.add(jobGraph);
}
LOG.info("Recovered {} job graphs: {}.", jobGraphs.size(), jobGraphs);
return jobGraphs;
}
else {
LOG.info("No job graph to recover.");
return Collections.emptyList();
}
}
}
@Override
public Option<SubmittedJobGraph> recoverJobGraph(JobID jobId) throws Exception {
checkNotNull(jobId, "Job ID");
String path = getPathForJob(jobId);
synchronized (cacheLock) {
verifyIsRunning();
try {
StateHandle<SubmittedJobGraph> jobStateHandle = jobGraphsInZooKeeper.get(path);
SubmittedJobGraph jobGraph = jobStateHandle
.getState(ClassLoader.getSystemClassLoader());
addedJobGraphs.add(jobGraph.getJobId());
LOG.info("Recovered {}.", jobGraph);
return Option.apply(jobGraph);
}
catch (KeeperException.NoNodeException ignored) {
return Option.empty();
}
}
}
@Override
public void putJobGraph(SubmittedJobGraph jobGraph) throws Exception {
checkNotNull(jobGraph, "Job graph");
String path = getPathForJob(jobGraph.getJobId());
boolean success = false;
while (!success) {
synchronized (cacheLock) {
verifyIsRunning();
int currentVersion = jobGraphsInZooKeeper.exists(path);
if (currentVersion == -1) {
try {
jobGraphsInZooKeeper.add(path, jobGraph);
addedJobGraphs.add(jobGraph.getJobId());
LOG.info("Added {} to ZooKeeper.", jobGraph);
success = true;
}
catch (KeeperException.NodeExistsException ignored) {
}
}
else if (addedJobGraphs.contains(jobGraph.getJobId())) {
try {
jobGraphsInZooKeeper.replace(path, currentVersion, jobGraph);
LOG.info("Updated {} in ZooKeeper.", jobGraph);
success = true;
}
catch (KeeperException.NoNodeException ignored) {
}
}
else {
throw new IllegalStateException("Oh, no. Trying to update a graph you didn't " +
"#getAllSubmittedJobGraphs() or #putJobGraph() yourself before.");
}
}
}
}
@Override
public void removeJobGraph(JobID jobId) throws Exception {
checkNotNull(jobId, "Job ID");
String path = getPathForJob(jobId);
synchronized (cacheLock) {
if (addedJobGraphs.contains(jobId)) {
jobGraphsInZooKeeper.removeAndDiscardState(path);
addedJobGraphs.remove(jobId);
LOG.info("Removed job graph {} from ZooKeeper.", jobId);
}
}
}
/**
* Monitors ZooKeeper for changes.
*
* <p>Detects modifications from other job managers in corner situations. The event
* notifications fire for changes from this job manager as well.
*/
private final class SubmittedJobGraphsPathCacheListener implements PathChildrenCacheListener {
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
throws Exception {
if (LOG.isDebugEnabled()) {
if (event.getData() != null) {
LOG.debug("Received {} event (path: {})", event.getType(), event.getData().getPath());
}
else {
LOG.debug("Received {} event", event.getType());
}
}
switch (event.getType()) {
case CHILD_ADDED:
synchronized (cacheLock) {
try {
JobID jobId = fromEvent(event);
if (jobGraphListener != null && !addedJobGraphs.contains(jobId)) {
try {
// Whoa! This has been added by someone else. Or we were fast
// to remove it (false positive).
jobGraphListener.onAddedJobGraph(jobId);
}
catch (Throwable t) {
LOG.error("Error in callback", t);
}
}
}
catch (Exception e) {
LOG.error("Error in SubmittedJobGraphsPathCacheListener", e);
}
}
break;
case CHILD_UPDATED:
// Nothing to do
break;
case CHILD_REMOVED:
synchronized (cacheLock) {
try {
JobID jobId = fromEvent(event);
if (jobGraphListener != null && addedJobGraphs.contains(jobId)) {
try {
// Oh oh. Someone else removed one of our job graphs. Mean!
jobGraphListener.onRemovedJobGraph(jobId);
}
catch (Throwable t) {
LOG.error("Error in callback", t);
}
}
break;
}
catch (Exception e) {
LOG.error("Error in SubmittedJobGraphsPathCacheListener", e);
}
}
break;
case CONNECTION_SUSPENDED:
LOG.warn("ZooKeeper connection SUSPENDED. Changes to the submitted job " +
"graphs are not monitored (temporarily).");
case CONNECTION_LOST:
LOG.warn("ZooKeeper connection LOST. Changes to the submitted job " +
"graphs are not monitored (permanently).");
break;
case CONNECTION_RECONNECTED:
LOG.info("ZooKeeper connection RECONNECTED. Changes to the submitted job " +
"graphs are monitored again.");
case INITIALIZED:
LOG.info("SubmittedJobGraphsPathCacheListener initialized");
break;
}
}
/**
* Returns a JobID for the event's path.
*/
private JobID fromEvent(PathChildrenCacheEvent event) {
return JobID.fromHexString(ZKPaths.getNodeFromPath(event.getData().getPath()));
}
}
/**
* Verifies that the state is running.
*/
private void verifyIsRunning() {
checkState(isRunning, "Not running. Forgot to call start()?");
}
/**
* Returns the JobID as a String (with leading slash).
*/
public static String getPathForJob(JobID jobId) {
checkNotNull(jobId, "Job ID");
return String.format("/%s", jobId);
}
}
......@@ -67,4 +67,5 @@ public interface LeaderElectionService {
* @return true if the associated {@link LeaderContender} is the leader, otherwise false
*/
boolean hasLeadership();
}
......@@ -43,6 +43,7 @@ import java.util.UUID;
* ZooKeeper as well.
*/
public class ZooKeeperLeaderElectionService implements LeaderElectionService, LeaderLatchListener, NodeCacheListener {
private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperLeaderElectionService.class);
/** Client to the ZooKeeper quorum */
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.state;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
public enum StateBackend {
JOBMANAGER, FILESYSTEM;
/**
* Returns the configured {@link StateBackend}.
*
* @param config The config to parse
* @return Configured state backend or {@link ConfigConstants#DEFAULT_RECOVERY_MODE} if not
* configured.
*/
public static StateBackend fromConfig(Configuration config) {
return StateBackend.valueOf(config.getString(
ConfigConstants.STATE_BACKEND,
ConfigConstants.DEFAULT_STATE_BACKEND).toUpperCase());
}
}
......@@ -16,52 +16,46 @@
* limitations under the License.
*/
package org.apache.flink.runtime.util;
package org.apache.flink.runtime.state;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobmanager.RecoveryMode;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.leaderelection.StandaloneLeaderElectionService;
import org.apache.flink.configuration.IllegalConfigurationException;
import java.io.Serializable;
/**
* Utility class to help working with {@link LeaderElectionService} class.
* State handler provider factory.
*
* <p>This is going to be superseded soon.
*/
public final class LeaderElectionUtils {
public class StateHandleProviderFactory {
/**
* Creates a {@link LeaderElectionService} based on the provided {@link Configuration} object.
*
* @param configuration Configuration object
* @return {@link LeaderElectionService} which was created based on the provided Configuration
* @throws Exception
* Creates a {@link org.apache.flink.runtime.state.FileStateHandle.FileStateHandleProvider} at
* the configured recovery path.
*/
public static LeaderElectionService createLeaderElectionService(Configuration configuration) throws Exception {
RecoveryMode recoveryMode = RecoveryMode.valueOf(configuration.getString(
ConfigConstants.RECOVERY_MODE,
ConfigConstants.DEFAULT_RECOVERY_MODE).toUpperCase()
);
LeaderElectionService leaderElectionService;
switch(recoveryMode) {
case STANDALONE:
leaderElectionService = new StandaloneLeaderElectionService();
break;
case ZOOKEEPER:
leaderElectionService = ZooKeeperUtils.createLeaderElectionService(configuration);
break;
default:
throw new Exception("Unknown RecoveryMode " + recoveryMode);
public static <T extends Serializable> StateHandleProvider<T> createRecoveryFileStateHandleProvider(
Configuration config) {
StateBackend stateBackend = StateBackend.fromConfig(config);
if (stateBackend == StateBackend.FILESYSTEM) {
String recoveryPath = config.getString(
ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, "");
if (recoveryPath.equals("")) {
throw new IllegalConfigurationException("Missing recovery path. Specify via " +
"configuration key '" + ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH + "'.");
}
else {
return FileStateHandle.createProvider(recoveryPath);
}
}
else {
throw new IllegalConfigurationException("Unexpected state backend configuration " +
stateBackend);
}
return leaderElectionService;
}
/**
* Private constructor to prevent instantiation.
*/
private LeaderElectionUtils() {
throw new RuntimeException();
}
}
......@@ -21,19 +21,27 @@ package org.apache.flink.runtime.util;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore;
import org.apache.flink.runtime.jobmanager.RecoveryMode;
import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
import org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore;
import org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService;
import org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService;
import org.apache.flink.runtime.state.StateHandleProvider;
import org.apache.flink.runtime.state.StateHandleProviderFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Utility class to work with Apache Zookeeper for Flink runtime.
*/
public final class ZooKeeperUtils {
import static com.google.common.base.Preconditions.checkNotNull;
public class ZooKeeperUtils {
private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperUtils.class);
......@@ -47,8 +55,10 @@ public final class ZooKeeperUtils {
public static CuratorFramework startCuratorFramework(Configuration configuration) {
String zkQuorum = configuration.getString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, "");
if(zkQuorum == null || zkQuorum.equals("")) {
throw new RuntimeException("No valid ZooKeeper quorum has been specified.");
if (zkQuorum == null || zkQuorum.equals("")) {
throw new RuntimeException("No valid ZooKeeper quorum has been specified. " +
"You can specify the quorum via the configuration key '" +
ConfigConstants.ZOOKEEPER_QUORUM_KEY + "'.");
}
int sessionTimeout = configuration.getInteger(
......@@ -59,7 +69,7 @@ public final class ZooKeeperUtils {
ConfigConstants.ZOOKEEPER_CONNECTION_TIMEOUT,
ConfigConstants.DEFAULT_ZOOKEEPER_CONNECTION_TIMEOUT);
int retryWait = configuration.getInteger (
int retryWait = configuration.getInteger(
ConfigConstants.ZOOKEEPER_RETRY_WAIT,
ConfigConstants.DEFAULT_ZOOKEEPER_RETRY_WAIT);
......@@ -88,14 +98,10 @@ public final class ZooKeeperUtils {
}
/**
* Returns whether high availability is enabled (<=> ZooKeeper quorum configured).
* Returns whether {@link RecoveryMode#ZOOKEEPER} is configured.
*/
public static boolean isZooKeeperHighAvailabilityEnabled(Configuration flinkConf) {
String recoveryMode = flinkConf.getString(
ConfigConstants.RECOVERY_MODE,
ConfigConstants.DEFAULT_RECOVERY_MODE).toUpperCase();
return recoveryMode.equals(RecoveryMode.ZOOKEEPER.name());
public static boolean isZooKeeperRecoveryMode(Configuration flinkConf) {
return RecoveryMode.fromConfig(flinkConf).equals(RecoveryMode.ZOOKEEPER);
}
/**
......@@ -125,7 +131,7 @@ public final class ZooKeeperUtils {
* @throws Exception
*/
public static ZooKeeperLeaderRetrievalService createLeaderRetrievalService(
Configuration configuration) throws Exception{
Configuration configuration) throws Exception {
CuratorFramework client = startCuratorFramework(configuration);
String leaderPath = configuration.getString(ConfigConstants.ZOOKEEPER_LEADER_PATH,
ConfigConstants.DEFAULT_ZOOKEEPER_LEADER_PATH);
......@@ -134,7 +140,8 @@ public final class ZooKeeperUtils {
}
/**
* Creates a {@link ZooKeeperLeaderElectionService} instance.
* Creates a {@link ZooKeeperLeaderElectionService} instance and a new {@link
* CuratorFramework} client.
*
* @param configuration {@link Configuration} object containing the configuration values
* @return {@link ZooKeeperLeaderElectionService} instance.
......@@ -142,8 +149,24 @@ public final class ZooKeeperUtils {
*/
public static ZooKeeperLeaderElectionService createLeaderElectionService(
Configuration configuration) throws Exception {
CuratorFramework client = startCuratorFramework(configuration);
return createLeaderElectionService(client, configuration);
}
/**
* Creates a {@link ZooKeeperLeaderElectionService} instance.
*
* @param client The {@link CuratorFramework} ZooKeeper client to use
* @param configuration {@link Configuration} object containing the configuration values
* @return {@link ZooKeeperLeaderElectionService} instance.
* @throws Exception
*/
public static ZooKeeperLeaderElectionService createLeaderElectionService(
CuratorFramework client,
Configuration configuration) throws Exception {
String latchPath = configuration.getString(ConfigConstants.ZOOKEEPER_LATCH_PATH,
ConfigConstants.DEFAULT_ZOOKEEPER_LATCH_PATH);
String leaderPath = configuration.getString(ConfigConstants.ZOOKEEPER_LEADER_PATH,
......@@ -152,6 +175,89 @@ public final class ZooKeeperUtils {
return new ZooKeeperLeaderElectionService(client, latchPath, leaderPath);
}
/**
* Creates a {@link ZooKeeperSubmittedJobGraphStore} instance.
*
* @param client The {@link CuratorFramework} ZooKeeper client to use
* @param configuration {@link Configuration} object
* @return {@link ZooKeeperSubmittedJobGraphStore} instance
*/
public static ZooKeeperSubmittedJobGraphStore createSubmittedJobGraphs(
CuratorFramework client,
Configuration configuration) throws Exception {
checkNotNull(configuration, "Configuration");
StateHandleProvider<SubmittedJobGraph> stateHandleProvider =
StateHandleProviderFactory.createRecoveryFileStateHandleProvider(configuration);
// ZooKeeper submitted jobs root dir
String zooKeeperSubmittedJobsPath = configuration.getString(
ConfigConstants.ZOOKEEPER_JOBGRAPHS_PATH,
ConfigConstants.DEFAULT_ZOOKEEPER_JOBGRAPHS_PATH);
return new ZooKeeperSubmittedJobGraphStore(
client, zooKeeperSubmittedJobsPath, stateHandleProvider);
}
/**
* Creates a {@link ZooKeeperCompletedCheckpointStore} instance.
*
* @param client The {@link CuratorFramework} ZooKeeper client to use
* @param configuration {@link Configuration} object
* @param jobId ID of job to create the instance for
* @param maxNumberOfCheckpointsToRetain The maximum number of checkpoints to retain
* @param userClassLoader User code class loader
* @return {@link ZooKeeperCompletedCheckpointStore} instance
*/
public static CompletedCheckpointStore createCompletedCheckpoints(
CuratorFramework client,
Configuration configuration,
JobID jobId,
int maxNumberOfCheckpointsToRetain,
ClassLoader userClassLoader) throws Exception {
checkNotNull(configuration, "Configuration");
StateHandleProvider<CompletedCheckpoint> stateHandleProvider =
StateHandleProviderFactory.createRecoveryFileStateHandleProvider(configuration);
String completedCheckpointsPath = configuration.getString(
ConfigConstants.ZOOKEEPER_CHECKPOINTS_PATH,
ConfigConstants.DEFAULT_ZOOKEEPER_CHECKPOINTS_PATH);
completedCheckpointsPath += ZooKeeperSubmittedJobGraphStore.getPathForJob(jobId);
return new ZooKeeperCompletedCheckpointStore(
maxNumberOfCheckpointsToRetain,
userClassLoader,
client,
completedCheckpointsPath,
stateHandleProvider);
}
/**
* Creates a {@link ZooKeeperCheckpointIDCounter} instance.
*
* @param client The {@link CuratorFramework} ZooKeeper client to use
* @param configuration {@link Configuration} object
* @param jobId ID of job to create the instance for
* @return {@link ZooKeeperCheckpointIDCounter} instance
*/
public static ZooKeeperCheckpointIDCounter createCheckpointIDCounter(
CuratorFramework client,
Configuration configuration,
JobID jobId) throws Exception {
String checkpointIdCounterPath = configuration.getString(
ConfigConstants.ZOOKEEPER_CHECKPOINT_COUNTER_PATH,
ConfigConstants.DEFAULT_ZOOKEEPER_CHECKPOINT_COUNTER_PATH);
checkpointIdCounterPath += ZooKeeperSubmittedJobGraphStore.getPathForJob(jobId);
return new ZooKeeperCheckpointIDCounter(client, checkpointIdCounterPath);
}
/**
* Private constructor to prevent instantiation.
*/
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.zookeeper;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.utils.ZKPaths;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.runtime.state.StateHandleProvider;
import org.apache.flink.util.InstantiationUtil;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import static com.google.common.base.Preconditions.checkNotNull;
/**
* State handles backed by ZooKeeper.
*
* <p>Added state is persisted via {@link StateHandle}s, which in turn are written to
* ZooKeeper. This level of indirection is necessary to keep the amount of data in ZooKeeper
* small. ZooKeeper is build for data in the KB range whereas state can grow to multiple MBs.
*
* <p>State modifications require some care, because it is possible that certain failures bring
* the state handle backend and ZooKeeper out of sync.
*
* <p>ZooKeeper holds the ground truth about state handles, i.e. the following holds:
*
* <pre>
* State handle in ZooKeeper => State handle exists
* </pre>
*
* But not:
*
* <pre>
* State handle exists => State handle in ZooKeeper
* </pre>
*
* There can be lingering state handles when failures happen during operation. They
* need to be cleaned up manually (see <a href="https://issues.apache.org/jira/browse/FLINK-2513">
* FLINK-2513</a> about a possible way to overcome this).
*
* @param <T> Type of state
*/
public class ZooKeeperStateHandleStore<T extends Serializable> {
/** Curator ZooKeeper client */
private final CuratorFramework client;
/** State handle provider */
private final StateHandleProvider<T> stateHandleProvider;
/**
* Creates a {@link ZooKeeperStateHandleStore}.
*
* @param client The Curator ZooKeeper client. <strong>Important:</strong> It is
* expected that the client's namespace ensures that the root
* path is exclusive for all state handles managed by this
* instance, e.g. <code>client.usingNamespace("/stateHandles")</code>
* @param stateHandleProvider The state handle provider for the state
*/
public ZooKeeperStateHandleStore(
CuratorFramework client,
StateHandleProvider<T> stateHandleProvider) {
this.client = checkNotNull(client, "Curator client");
this.stateHandleProvider = checkNotNull(stateHandleProvider, "State handle provider");
}
/**
* Creates a state handle and stores it in ZooKeeper with create mode {@link
* CreateMode#PERSISTENT}.
*
* @see #add(String, Serializable, CreateMode)
*/
public StateHandle<T> add(String pathInZooKeeper, T state) throws Exception {
return add(pathInZooKeeper, state, CreateMode.PERSISTENT);
}
/**
* Creates a state handle and stores it in ZooKeeper.
*
* <p><strong>Important</strong>: This will <em>not</em> store the actual state in
* ZooKeeper, but create a state handle and store it in ZooKeeper. This level of indirection
* makes sure that data in ZooKeeper is small.
*
* @param pathInZooKeeper Destination path in ZooKeeper (expected to *not* exist yet and
* start with a '/')
* @param state State to be added
* @param createMode The create mode for the new path in ZooKeeper
* @return Created {@link StateHandle}
* @throws Exception If a ZooKeeper or state handle operation fails
*/
public StateHandle<T> add(String pathInZooKeeper, T state, CreateMode createMode) throws Exception {
checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
checkNotNull(state, "State");
// Create the state handle. Nothing persisted yet.
StateHandle<T> stateHandle = stateHandleProvider.createStateHandle(state);
boolean success = false;
try {
// Serialize the state handle. This writes the state to the backend.
byte[] serializedStateHandle = InstantiationUtil.serializeObject(stateHandle);
// Write state handle (not the actual state) to ZooKeeper. This is expected to be
// smaller than the state itself. This level of indirection makes sure that data in
// ZooKeeper is small, because ZooKeeper is designed for data in the KB range, but
// the state can be larger.
client.create().withMode(createMode).forPath(pathInZooKeeper, serializedStateHandle);
success = true;
return stateHandle;
}
finally {
if (!success) {
// Cleanup the state handle if it was not written to ZooKeeper.
if (stateHandle != null) {
stateHandle.discardState();
}
}
}
}
/**
* Replaces a state handle in ZooKeeper and discards the old state handle.
*
* @param pathInZooKeeper Destination path in ZooKeeper (expected to exist and start with a '/')
* @param expectedVersion Expected version of the node to replace
* @param state The new state to replace the old one
* @throws Exception If a ZooKeeper or state handle operation fails
*/
public void replace(String pathInZooKeeper, int expectedVersion, T state) throws Exception {
checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
checkNotNull(state, "State");
StateHandle<T> oldStateHandle = get(pathInZooKeeper);
StateHandle<T> stateHandle = stateHandleProvider.createStateHandle(state);
boolean success = false;
try {
// Serialize the new state handle. This writes the state to the backend.
byte[] serializedStateHandle = InstantiationUtil.serializeObject(stateHandle);
// Replace state handle in ZooKeeper.
client.setData()
.withVersion(expectedVersion)
.forPath(pathInZooKeeper, serializedStateHandle);
success = true;
}
finally {
if (success) {
oldStateHandle.discardState();
}
else {
stateHandle.discardState();
}
}
}
/**
* Returns the version of the node if it exists or <code>-1</code> if it doesn't.
*
* @param pathInZooKeeper Path in ZooKeeper to check
* @return Version of the ZNode if the path exists, <code>-1</code> otherwise.
* @throws Exception If the ZooKeeper operation fails
*/
public int exists(String pathInZooKeeper) throws Exception {
checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
Stat stat = client.checkExists().forPath(pathInZooKeeper);
if (stat != null) {
return stat.getVersion();
}
return -1;
}
/**
* Gets a state handle from ZooKeeper.
*
* @param pathInZooKeeper Path in ZooKeeper to get the state handle from (expected to
* exist and start with a '/').
* @return The state handle
* @throws Exception If a ZooKeeper or state handle operation fails
*/
@SuppressWarnings("unchecked")
public StateHandle<T> get(String pathInZooKeeper) throws Exception {
checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
byte[] data = client.getData().forPath(pathInZooKeeper);
return (StateHandle<T>) InstantiationUtil
.deserializeObject(data, ClassLoader.getSystemClassLoader());
}
/**
* Gets all available state handles from ZooKeeper.
*
* <p>If there is a concurrent modification, the operation is retried until it succeeds.
*
* @return All state handles from ZooKeeper.
* @throws Exception If a ZooKeeper or state handle operation fails
*/
@SuppressWarnings("unchecked")
public List<Tuple2<StateHandle<T>, String>> getAll() throws Exception {
final List<Tuple2<StateHandle<T>, String>> stateHandles = new ArrayList<>();
boolean success = false;
retry:
while (!success) {
// Initial cVersion (number of changes to the children of this node)
int initialCVersion = client.checkExists().forPath("/").getCversion();
List<String> children = client.getChildren().forPath("/");
for (String path : children) {
path = "/" + path;
try {
final StateHandle<T> stateHandle = get(path);
stateHandles.add(new Tuple2<>(stateHandle, path));
}
catch (KeeperException.NoNodeException ignored) {
// Concurrent deletion, retry
continue retry;
}
}
int finalCVersion = client.checkExists().forPath("/").getCversion();
// Check for concurrent modifications
success = initialCVersion == finalCVersion;
}
return stateHandles;
}
/**
* Gets all available state handles from ZooKeeper sorted by name (ascending).
*
* <p>If there is a concurrent modification, the operation is retried until it succeeds.
*
* @return All state handles in ZooKeeper.
* @throws Exception If a ZooKeeper or state handle operation fails
*/
@SuppressWarnings("unchecked")
public List<Tuple2<StateHandle<T>, String>> getAllSortedByName() throws Exception {
final List<Tuple2<StateHandle<T>, String>> stateHandles = new ArrayList<>();
boolean success = false;
retry:
while (!success) {
// Initial cVersion (number of changes to the children of this node)
int initialCVersion = client.checkExists().forPath("/").getCversion();
List<String> children = ZKPaths.getSortedChildren(
client.getZookeeperClient().getZooKeeper(),
ZKPaths.fixForNamespace(client.getNamespace(), "/"));
for (String path : children) {
path = "/" + path;
try {
final StateHandle<T> stateHandle = get(path);
stateHandles.add(new Tuple2<>(stateHandle, path));
}
catch (KeeperException.NoNodeException ignored) {
// Concurrent deletion, retry
continue retry;
}
}
int finalCVersion = client.checkExists().forPath("/").getCversion();
// Check for concurrent modifications
success = initialCVersion == finalCVersion;
}
return stateHandles;
}
/**
* Removes a state handle from ZooKeeper.
*
* <p><stong>Important</stong>: this does not discard the state handle. If you want to
* discard the state handle call {@link #removeAndDiscardState(String)}.
*
* @param pathInZooKeeper Path of state handle to remove (expected to start with a '/')
* @throws Exception If the ZooKeeper operation fails
*/
public void remove(String pathInZooKeeper) throws Exception {
checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
client.delete().deletingChildrenIfNeeded().forPath(pathInZooKeeper);
}
/**
* Removes a state handle from ZooKeeper asynchronously.
*
* <p><stong>Important</stong>: this does not discard the state handle. If you want to
* discard the state handle call {@link #removeAndDiscardState(String)}.
*
* @param pathInZooKeeper Path of state handle to remove (expected to start with a '/')
* @param callback The callback after the operation finishes
* @throws Exception If the ZooKeeper operation fails
*/
public void remove(String pathInZooKeeper, BackgroundCallback callback) throws Exception {
checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
checkNotNull(callback, "Background callback");
client.delete().deletingChildrenIfNeeded().inBackground(callback).forPath(pathInZooKeeper);
}
/**
* Discards a state handle and removes it from ZooKeeper.
*
* <p>If you only want to remove the state handle in ZooKeeper call {@link #remove(String)}.
*
* @param pathInZooKeeper Path of state handle to discard (expected to start with a '/')
* @throws Exception If the ZooKeeper or state handle operation fails
*/
public void removeAndDiscardState(String pathInZooKeeper) throws Exception {
checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
StateHandle<T> stateHandle = get(pathInZooKeeper);
// Delete the state handle from ZooKeeper first
client.delete().deletingChildrenIfNeeded().forPath(pathInZooKeeper);
// Discard the state handle only after it has been successfully deleted from ZooKeeper.
// Otherwise we might enter an illegal state after failures (with a state handle in
// ZooKeeper, which has already been discarded).
stateHandle.discardState();
}
/**
* Discards all available state handles and removes them from ZooKeeper.
*
* @throws Exception If a ZooKeeper or state handle operation fails
*/
public void removeAndDiscardAllState() throws Exception {
final List<Tuple2<StateHandle<T>, String>> allStateHandles = getAll();
ZKPaths.deleteChildren(
client.getZookeeperClient().getZooKeeper(),
ZKPaths.fixForNamespace(client.getNamespace(), "/"),
false);
// Discard the state handles only after they have been successfully deleted from ZooKeeper.
for (Tuple2<StateHandle<T>, String> stateHandleAndPath : allStateHandles) {
stateHandleAndPath.f0.discardState();
}
}
}
......@@ -19,7 +19,7 @@
package org.apache.flink.runtime.jobmanager
import akka.actor.ActorRef
import org.apache.flink.runtime.akka.ListeningBehaviour
/**
* Utility class to store job information on the [[JobManager]]. The JobInfo stores which actor
......@@ -27,11 +27,20 @@ import akka.actor.ActorRef
* Additionally, it stores whether the job was started in the detached mode. Detached means that
* the submitting actor does not wait for the job result once the job has terminated.
*
* Important: This class is serializable, but needs to be deserialized in the context of an actor
* system in order to resolve the client [[ActorRef]]. It is possible to serialize the Akka URL
* manually, but it is cumbersome and complicates testing in certain scenarios, where you need to
* make sure to resolve the correct [[ActorRef]]s when submitting jobs (RepointableActorRef vs.
* RemoteActorRef).
*
* @param client Actor which submitted the job
* @param start Starting time
*/
class JobInfo(val client: ActorRef, val start: Long,
val sessionTimeout: Long) {
class JobInfo(
val client: ActorRef,
val listeningBehaviour: ListeningBehaviour,
val start: Long,
val sessionTimeout: Long) extends Serializable {
var sessionAlive = sessionTimeout > 0
......@@ -49,12 +58,16 @@ class JobInfo(val client: ActorRef, val start: Long,
}
}
override def toString = s"JobInfo(client: $client ($listeningBehaviour), start: $start)"
def setLastActive() =
lastActive = System.currentTimeMillis()
}
object JobInfo{
def apply(client: ActorRef, start: Long,
sessionTimeout: Long) =
new JobInfo(client, start, sessionTimeout)
def apply(
client: ActorRef,
listeningBehaviour: ListeningBehaviour,
start: Long,
sessionTimeout: Long) = new JobInfo(client, listeningBehaviour, start, sessionTimeout)
}
......@@ -65,6 +65,18 @@ object JobManagerMessages {
listeningBehaviour: ListeningBehaviour)
extends RequiresLeaderSessionID
/**
* Triggers the recovery of the job with the given ID.
*
* @param jobId ID of the job to recover
*/
case class RecoverJob(jobId: JobID) extends RequiresLeaderSessionID
/**
* Triggers recovery of all available jobs.
*/
case class RecoverAllJobs() extends RequiresLeaderSessionID
/**
* Cancels a job with the given [[jobID]] at the JobManager. The result of the cancellation is
* sent back to the sender as a [[CancellationResponse]] message.
......@@ -354,6 +366,10 @@ object JobManagerMessages {
// --------------------------------------------------------------------------
// Utility methods to allow simpler case object access from Java
// --------------------------------------------------------------------------
def getRequestJobStatus(jobId : JobID) : AnyRef = {
RequestJobStatus(jobId)
}
def getRequestNumberRegisteredTaskManager : AnyRef = {
RequestNumberRegisteredTaskManager
......
......@@ -94,9 +94,7 @@ abstract class FlinkMiniCluster(
implicit val timeout = AkkaUtils.getTimeout(userConfiguration)
val recoveryMode = RecoveryMode.valueOf(configuration.getString(
ConfigConstants.RECOVERY_MODE,
ConfigConstants.DEFAULT_RECOVERY_MODE).toUpperCase)
val recoveryMode = RecoveryMode.fromConfig(configuration)
val numJobManagers = getNumberOfJobManagers
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.checkpoint;
import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import static org.junit.Assert.assertEquals;
public abstract class CheckpointIDCounterTest extends TestLogger {
protected abstract CheckpointIDCounter createCompletedCheckpoints() throws Exception;
public static class StandaloneCheckpointIDCounterTest extends CheckpointIDCounterTest {
@Override
protected CheckpointIDCounter createCompletedCheckpoints() throws Exception {
return new StandaloneCheckpointIDCounter();
}
}
public static class ZooKeeperCheckpointIDCounterITCase extends CheckpointIDCounterTest {
private final static ZooKeeperTestEnvironment ZooKeeper = new ZooKeeperTestEnvironment(1);
@AfterClass
public static void tearDown() throws Exception {
if (ZooKeeper != null) {
ZooKeeper.shutdown();
}
}
@Before
public void cleanUp() throws Exception {
ZooKeeper.deleteAll();
}
@Override
protected CheckpointIDCounter createCompletedCheckpoints() throws Exception {
return new ZooKeeperCheckpointIDCounter(ZooKeeper.getClient(),
"/checkpoint-id-counter");
}
}
// ---------------------------------------------------------------------------------------------
/**
* Tests serial increment and get calls.
*/
@Test
public void testSerialIncrementAndGet() throws Exception {
final CheckpointIDCounter counter = createCompletedCheckpoints();
try {
counter.start();
assertEquals(1, counter.getAndIncrement());
assertEquals(2, counter.getAndIncrement());
assertEquals(3, counter.getAndIncrement());
assertEquals(4, counter.getAndIncrement());
}
finally {
counter.stop();
}
}
/**
* Tests concurrent increment and get calls from multiple Threads and verifies that the numbers
* counts strictly increasing.
*/
@Test
public void testConcurrentGetAndIncrement() throws Exception {
// Config
final int numThreads = 8;
// Setup
final CountDownLatch startLatch = new CountDownLatch(1);
final CheckpointIDCounter counter = createCompletedCheckpoints();
counter.start();
ExecutorService executor = null;
try {
executor = Executors.newFixedThreadPool(numThreads);
List<Future<List<Long>>> resultFutures = new ArrayList<>(numThreads);
for (int i = 0; i < numThreads; i++) {
resultFutures.add(executor.submit(new Incrementer(startLatch, counter)));
}
// Kick off the incrementing
startLatch.countDown();
final int expectedTotal = numThreads * Incrementer.NumIncrements;
List<Long> all = new ArrayList<>(expectedTotal);
// Get the counts
for (Future<List<Long>> result : resultFutures) {
List<Long> counts = result.get();
for (long val : counts) {
all.add(val);
}
}
// Verify
Collections.sort(all);
assertEquals(expectedTotal, all.size());
long current = 0;
for (long val : all) {
// Incrementing counts
assertEquals(++current, val);
}
// The final count
assertEquals(expectedTotal + 1, counter.getAndIncrement());
}
finally {
if (executor != null) {
executor.shutdown();
}
counter.stop();
}
}
/**
* Task repeatedly incrementing the {@link CheckpointIDCounter}.
*/
private static class Incrementer implements Callable<List<Long>> {
/** Total number of {@link CheckpointIDCounter#getAndIncrement()} calls. */
private final static int NumIncrements = 128;
private final CountDownLatch startLatch;
private final CheckpointIDCounter counter;
public Incrementer(CountDownLatch startLatch, CheckpointIDCounter counter) {
this.startLatch = startLatch;
this.counter = counter;
}
@Override
public List<Long> call() throws Exception {
final Random rand = new Random();
final List<Long> counts = new ArrayList<>();
// Wait for the main thread to kick off execution
this.startLatch.await();
for (int i = 0; i < NumIncrements; i++) {
counts.add(counter.getAndIncrement());
// To get some "random" interleaving ;)
Thread.sleep(rand.nextInt(20));
}
return counts;
}
}
}
......@@ -25,6 +25,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.RecoveryMode;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.state.LocalStateHandle;
import org.apache.flink.runtime.state.StateHandle;
......@@ -79,10 +80,12 @@ public class CheckpointStateRestoreTest {
map.put(statelessId, stateless);
CheckpointCoordinator coord = new CheckpointCoordinator(jid, 1, 200000L,
CheckpointCoordinator coord = new CheckpointCoordinator(jid, 200000L,
new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 },
new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 },
new ExecutionVertex[0], cl);
new ExecutionVertex[0], cl,
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1, cl), RecoveryMode.STANDALONE);
// create ourselves a checkpoint with state
final long timestamp = 34623786L;
......@@ -148,10 +151,12 @@ public class CheckpointStateRestoreTest {
map.put(statelessId, stateless);
CheckpointCoordinator coord = new CheckpointCoordinator(jid, 1, 200000L,
CheckpointCoordinator coord = new CheckpointCoordinator(jid, 200000L,
new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 },
new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 },
new ExecutionVertex[0], cl);
new ExecutionVertex[0], cl,
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1, cl), RecoveryMode.STANDALONE);
// create ourselves a checkpoint with state
final long timestamp = 34623786L;
......@@ -188,10 +193,12 @@ public class CheckpointStateRestoreTest {
@Test
public void testNoCheckpointAvailable() {
try {
CheckpointCoordinator coord = new CheckpointCoordinator(new JobID(), 1, 200000L,
CheckpointCoordinator coord = new CheckpointCoordinator(new JobID(), 200000L,
new ExecutionVertex[] { mock(ExecutionVertex.class) },
new ExecutionVertex[] { mock(ExecutionVertex.class) },
new ExecutionVertex[0], cl);
new ExecutionVertex[0], cl,
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1, cl), RecoveryMode.STANDALONE);
try {
coord.restoreLatestCheckpointedState(new HashMap<JobVertexID, ExecutionJobVertex>(), true, false);
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.checkpoint;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.messages.CheckpointMessagesTest;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
/**
* Test for basic {@link CompletedCheckpointStore} contract.
*/
public abstract class CompletedCheckpointStoreTest extends TestLogger {
/**
* Creates the {@link CompletedCheckpointStore} implementation to be tested.
*/
protected abstract CompletedCheckpointStore createCompletedCheckpoints(
int maxNumberOfCheckpointsToRetain, ClassLoader userLoader) throws Exception;
// ---------------------------------------------------------------------------------------------
// Verify that discarded checkpoints are called with the correct class loader
private final ClassLoader userClassLoader = ClassLoader.getSystemClassLoader();
/**
* Tests that at least one checkpoint needs to be retained.
*/
@Test(expected = Exception.class)
public void testExceptionOnNoRetainedCheckpoints() throws Exception {
createCompletedCheckpoints(0, userClassLoader);
}
/**
* Tests adding and getting a checkpoint.
*/
@Test
public void testAddAndGetLatestCheckpoint() throws Exception {
CompletedCheckpointStore checkpoints = createCompletedCheckpoints(4, userClassLoader);
// Empty state
assertEquals(0, checkpoints.getNumberOfRetainedCheckpoints());
assertEquals(0, checkpoints.getAllCheckpoints().size());
TestCheckpoint[] expected = new TestCheckpoint[] {
createCheckpoint(0), createCheckpoint(1) };
// Add and get latest
checkpoints.addCheckpoint(expected[0]);
assertEquals(1, checkpoints.getNumberOfRetainedCheckpoints());
verifyCheckpoint(expected[0], checkpoints.getLatestCheckpoint());
checkpoints.addCheckpoint(expected[1]);
assertEquals(2, checkpoints.getNumberOfRetainedCheckpoints());
verifyCheckpoint(expected[1], checkpoints.getLatestCheckpoint());
}
/**
* Tests that adding more checkpoints than retained discards the correct checkpoints (using
* the correct class loader).
*/
@Test
public void testAddCheckpointMoreThanMaxRetained() throws Exception {
CompletedCheckpointStore checkpoints = createCompletedCheckpoints(1, userClassLoader);
TestCheckpoint[] expected = new TestCheckpoint[] {
createCheckpoint(0), createCheckpoint(1),
createCheckpoint(2), createCheckpoint(3)
};
// Add checkpoints
checkpoints.addCheckpoint(expected[0]);
assertEquals(1, checkpoints.getNumberOfRetainedCheckpoints());
for (int i = 1; i < expected.length; i++) {
checkpoints.addCheckpoint(expected[i]);
// The ZooKeeper implementation discards asynchronously
expected[i - 1].awaitDiscard();
assertTrue(expected[i - 1].isDiscarded());
assertEquals(userClassLoader, expected[i - 1].getDiscardClassLoader());
assertEquals(1, checkpoints.getNumberOfRetainedCheckpoints());
}
}
/**
* Tests that
* <ul>
* <li>{@link CompletedCheckpointStore#getLatestCheckpoint()} returns <code>null</code>,</li>
* <li>{@link CompletedCheckpointStore#getAllCheckpoints()} returns an empty list,</li>
* <li>{@link CompletedCheckpointStore#getNumberOfRetainedCheckpoints()} returns 0.</li>
* </ul>
*/
@Test
public void testEmptyState() throws Exception {
CompletedCheckpointStore checkpoints = createCompletedCheckpoints(1, userClassLoader);
assertNull(checkpoints.getLatestCheckpoint());
assertEquals(0, checkpoints.getAllCheckpoints().size());
assertEquals(0, checkpoints.getNumberOfRetainedCheckpoints());
}
/**
* Tests that all added checkpoints are returned.
*/
@Test
public void testGetAllCheckpoints() throws Exception {
CompletedCheckpointStore checkpoints = createCompletedCheckpoints(4, userClassLoader);
TestCheckpoint[] expected = new TestCheckpoint[] {
createCheckpoint(0), createCheckpoint(1),
createCheckpoint(2), createCheckpoint(3)
};
for (TestCheckpoint checkpoint : expected) {
checkpoints.addCheckpoint(checkpoint);
}
List<CompletedCheckpoint> actual = checkpoints.getAllCheckpoints();
assertEquals(expected.length, actual.size());
for (int i = 0; i < expected.length; i++) {
assertEquals(expected[i], actual.get(i));
}
}
/**
* Tests that all checkpoints are discarded (using the correct class loader).
*/
@Test
public void testDiscardAllCheckpoints() throws Exception {
CompletedCheckpointStore checkpoints = createCompletedCheckpoints(4, userClassLoader);
TestCheckpoint[] expected = new TestCheckpoint[] {
createCheckpoint(0), createCheckpoint(1),
createCheckpoint(2), createCheckpoint(3)
};
for (TestCheckpoint checkpoint : expected) {
checkpoints.addCheckpoint(checkpoint);
}
checkpoints.discardAllCheckpoints();
// Empty state
assertNull(checkpoints.getLatestCheckpoint());
assertEquals(0, checkpoints.getAllCheckpoints().size());
assertEquals(0, checkpoints.getNumberOfRetainedCheckpoints());
// All have been discarded
for (TestCheckpoint checkpoint : expected) {
// The ZooKeeper implementation discards asynchronously
checkpoint.awaitDiscard();
assertTrue(checkpoint.isDiscarded());
assertEquals(userClassLoader, checkpoint.getDiscardClassLoader());
}
}
// ---------------------------------------------------------------------------------------------
protected TestCheckpoint createCheckpoint(int id) throws IOException {
return createCheckpoint(id, 4);
}
protected TestCheckpoint createCheckpoint(int id, int numberOfStates)
throws IOException {
JobVertexID jvid = new JobVertexID();
ArrayList<StateForTask> taskStates = new ArrayList<>();
for (int i = 0; i < numberOfStates; i++) {
SerializedValue<StateHandle<?>> stateHandle = new SerializedValue<StateHandle<?>>(
new CheckpointMessagesTest.MyHandle());
taskStates.add(new StateForTask(stateHandle, jvid, i));
}
return new TestCheckpoint(new JobID(), id, 0, taskStates);
}
private void verifyCheckpoint(CompletedCheckpoint expected, CompletedCheckpoint actual) {
assertEquals(expected.getJobId(), actual.getJobId());
assertEquals(expected.getCheckpointID(), actual.getCheckpointID());
assertEquals(expected.getTimestamp(), actual.getTimestamp());
List<StateForTask> expectedStates = expected.getStates();
List<StateForTask> actualStates = actual.getStates();
assertEquals(expectedStates.size(), actualStates.size());
for (int i = 0; i < expectedStates.size(); i++) {
assertEquals(expectedStates.get(i), actualStates.get(i));
}
}
/**
* A test {@link CompletedCheckpoint}. We want to verify that the correct class loader is
* used when discarding. Spying on a regular {@link CompletedCheckpoint} instance with
* Mockito doesn't work, because it it breaks serializability.
*/
protected static class TestCheckpoint extends CompletedCheckpoint {
private static final long serialVersionUID = 4211419809665983026L;
private boolean isDiscarded;
// Latch for test variants which discard asynchronously
private transient final CountDownLatch discardLatch = new CountDownLatch(1);
private transient ClassLoader discardClassLoader;
public TestCheckpoint(
JobID jobId,
long checkpointId,
long timestamp,
ArrayList<StateForTask> states) {
super(jobId, checkpointId, timestamp, states);
}
@Override
public void discard(ClassLoader userClassLoader) {
super.discard(userClassLoader);
if (!isDiscarded) {
this.discardClassLoader = userClassLoader;
this.isDiscarded = true;
if (discardLatch != null) {
discardLatch.countDown();
}
}
}
public boolean isDiscarded() {
return isDiscarded;
}
public void awaitDiscard() throws InterruptedException {
if (discardLatch != null) {
discardLatch.await();
}
}
public ClassLoader getDiscardClassLoader() {
return discardClassLoader;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
TestCheckpoint that = (TestCheckpoint) o;
return getJobId().equals(that.getJobId())
&& getCheckpointID() == that.getCheckpointID();
}
@Override
public int hashCode() {
return getJobId().hashCode() + (int) getCheckpointID();
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.checkpoint;
/**
* Tests for basic {@link CompletedCheckpointStore} contract.
*/
public class StandaloneCompletedCheckpointStoreTest extends CompletedCheckpointStoreTest {
@Override
protected CompletedCheckpointStore createCompletedCheckpoints(
int maxNumberOfCheckpointsToRetain,
ClassLoader userClassLoader) throws Exception {
return new StandaloneCompletedCheckpointStore(maxNumberOfCheckpointsToRetain, userClassLoader);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.checkpoint;
import org.apache.flink.runtime.state.LocalStateHandle;
import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Test;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
/**
* Tests for basic {@link CompletedCheckpointStore} contract and ZooKeeper state handling.
*/
public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpointStoreTest {
private final static ZooKeeperTestEnvironment ZooKeeper = new ZooKeeperTestEnvironment(1);
private final static String CheckpointsPath = "/checkpoints";
@AfterClass
public static void tearDown() throws Exception {
if (ZooKeeper != null) {
ZooKeeper.shutdown();
}
}
@Before
public void cleanUp() throws Exception {
ZooKeeper.deleteAll();
}
@Override
protected CompletedCheckpointStore createCompletedCheckpoints(int maxNumberOfCheckpointsToRetain,
ClassLoader userLoader) throws Exception {
return new ZooKeeperCompletedCheckpointStore(maxNumberOfCheckpointsToRetain, userLoader,
ZooKeeper.createClient(), CheckpointsPath, new LocalStateHandle
.LocalStateHandleProvider<CompletedCheckpoint>());
}
// ---------------------------------------------------------------------------------------------
/**
* Tests that older checkpoints are cleaned up at startup.
*/
@Test
public void testRecover() throws Exception {
CompletedCheckpointStore checkpoints = createCompletedCheckpoints(3, ClassLoader
.getSystemClassLoader());
TestCheckpoint[] expected = new TestCheckpoint[] {
createCheckpoint(0), createCheckpoint(1), createCheckpoint(2)
};
// Add multiple checkpoints
checkpoints.addCheckpoint(expected[0]);
checkpoints.addCheckpoint(expected[1]);
checkpoints.addCheckpoint(expected[2]);
// All three should be in ZK
assertEquals(3, ZooKeeper.getClient().getChildren().forPath(CheckpointsPath).size());
// Recover
checkpoints.recover();
// Only the latest one should be in ZK
Deadline deadline = new FiniteDuration(1, TimeUnit.MINUTES).fromNow();
// Retry this operation, because removal is asynchronous
while (deadline.hasTimeLeft() && ZooKeeper.getClient()
.getChildren().forPath(CheckpointsPath).size() != 1) {
Thread.sleep(Math.min(100, deadline.timeLeft().toMillis()));
}
assertEquals(1, ZooKeeper.getClient().getChildren().forPath(CheckpointsPath).size());
assertEquals(expected[2], checkpoints.getLatestCheckpoint());
}
}
......@@ -178,6 +178,10 @@ public class BlobLibraryCacheManagerTest {
// un-register them again
libCache.unregisterTask(jid, executionId);
// Don't fail if called again
libCache.unregisterTask(jid, executionId);
assertEquals(0, libCache.getNumberOfReferenceHolders(jid));
// library is still cached (but not associated with job any more)
......
......@@ -25,7 +25,7 @@ import io.netty.channel.ChannelPromise;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.net.NetUtils;
import org.apache.flink.util.NetUtils;
import org.junit.Ignore;
import org.junit.Test;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册