提交 28c57c3a 编写于 作者: T Till Rohrmann

[FLINK-3800] [jobmanager] Terminate ExecutionGraphs properly

This PR terminates the ExecutionGraphs properly without restarts when the JobManager calls
cancelAndClearEverything. It is achieved by allowing the method to be only called with an
SuppressRestartsException. The SuppressRestartsException will disable the restart strategy of
the respective ExecutionGraph. This is important because the root cause could be a different
exception. In order to avoid race conditions, the restart strategy has to be checked twice
whether it allwos to restart the job: Once before and once after the job has transitioned to
the state RESTARTING. This avoids that ExecutionGraphs can become an orphan.

Furhtermore, this PR fixes the problem that the default restart strategy is shared by multiple
jobs. The problem is solved by introducing a RestartStrategyFactory which creates for every
job its own instance of a RestartStrategy.

Fix LeaderChangeJobRecoveryTest case

This closes #1923.
上级 e29ac036
...@@ -798,6 +798,14 @@ public class ExecutionGraph implements Serializable { ...@@ -798,6 +798,14 @@ public class ExecutionGraph implements Serializable {
} }
public void fail(Throwable t) { public void fail(Throwable t) {
if (t instanceof SuppressRestartsException) {
if (restartStrategy != null) {
// disable the restart strategy in case that we have seen a SuppressRestartsException
// it basically overrides the restart behaviour of a the root cause
restartStrategy.disable();
}
}
while (true) { while (true) {
JobStatus current = state; JobStatus current = state;
if (current == JobStatus.FAILING || current.isTerminalState()) { if (current == JobStatus.FAILING || current.isTerminalState()) {
...@@ -1021,15 +1029,17 @@ public class ExecutionGraph implements Serializable { ...@@ -1021,15 +1029,17 @@ public class ExecutionGraph implements Serializable {
} }
} }
else if (current == JobStatus.FAILING) { else if (current == JobStatus.FAILING) {
boolean allowRestart = !(failureCause instanceof SuppressRestartsException); if (restartStrategy.canRestart() && transitionState(current, JobStatus.RESTARTING)) {
// double check in case that in the meantime a SuppressRestartsException was thrown
if (allowRestart && restartStrategy.canRestart() && if (restartStrategy.canRestart()) {
transitionState(current, JobStatus.RESTARTING)) { restartStrategy.restart(this);
restartStrategy.restart(this); break;
break; } else {
fail(new Exception("ExecutionGraph went into RESTARTING state but " +
} else if ((!allowRestart || !restartStrategy.canRestart()) && "then the restart strategy was disabled."));
transitionState(current, JobStatus.FAILED, failureCause)) { }
} else if (!restartStrategy.canRestart() && transitionState(current, JobStatus.FAILED, failureCause)) {
postRunCleanup(); postRunCleanup();
break; break;
} }
......
...@@ -41,6 +41,7 @@ public class FixedDelayRestartStrategy implements RestartStrategy { ...@@ -41,6 +41,7 @@ public class FixedDelayRestartStrategy implements RestartStrategy {
private final int maxNumberRestartAttempts; private final int maxNumberRestartAttempts;
private final long delayBetweenRestartAttempts; private final long delayBetweenRestartAttempts;
private int currentRestartAttempt; private int currentRestartAttempt;
private boolean disabled = false;
public FixedDelayRestartStrategy( public FixedDelayRestartStrategy(
int maxNumberRestartAttempts, int maxNumberRestartAttempts,
...@@ -60,7 +61,7 @@ public class FixedDelayRestartStrategy implements RestartStrategy { ...@@ -60,7 +61,7 @@ public class FixedDelayRestartStrategy implements RestartStrategy {
@Override @Override
public boolean canRestart() { public boolean canRestart() {
return currentRestartAttempt < maxNumberRestartAttempts; return !disabled && currentRestartAttempt < maxNumberRestartAttempts;
} }
@Override @Override
...@@ -83,6 +84,11 @@ public class FixedDelayRestartStrategy implements RestartStrategy { ...@@ -83,6 +84,11 @@ public class FixedDelayRestartStrategy implements RestartStrategy {
}, executionGraph.getExecutionContext()); }, executionGraph.getExecutionContext());
} }
@Override
public void disable() {
disabled = true;
}
/** /**
* Creates a FixedDelayRestartStrategy from the given Configuration. * Creates a FixedDelayRestartStrategy from the given Configuration.
* *
...@@ -90,7 +96,7 @@ public class FixedDelayRestartStrategy implements RestartStrategy { ...@@ -90,7 +96,7 @@ public class FixedDelayRestartStrategy implements RestartStrategy {
* @return Initialized instance of FixedDelayRestartStrategy * @return Initialized instance of FixedDelayRestartStrategy
* @throws Exception * @throws Exception
*/ */
public static FixedDelayRestartStrategy create(Configuration configuration) throws Exception { public static FixedDelayRestartStrategyFactory createFactory(Configuration configuration) throws Exception {
int maxAttempts = configuration.getInteger(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1); int maxAttempts = configuration.getInteger(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1);
String timeoutString = configuration.getString( String timeoutString = configuration.getString(
...@@ -118,7 +124,7 @@ public class FixedDelayRestartStrategy implements RestartStrategy { ...@@ -118,7 +124,7 @@ public class FixedDelayRestartStrategy implements RestartStrategy {
} }
} }
return new FixedDelayRestartStrategy(maxAttempts, delay); return new FixedDelayRestartStrategyFactory(maxAttempts, delay);
} }
@Override @Override
...@@ -128,4 +134,22 @@ public class FixedDelayRestartStrategy implements RestartStrategy { ...@@ -128,4 +134,22 @@ public class FixedDelayRestartStrategy implements RestartStrategy {
", delayBetweenRestartAttempts=" + delayBetweenRestartAttempts + ", delayBetweenRestartAttempts=" + delayBetweenRestartAttempts +
')'; ')';
} }
public static class FixedDelayRestartStrategyFactory extends RestartStrategyFactory {
private static final long serialVersionUID = 6642934067762271950L;
private final int maxAttempts;
private final long delay;
public FixedDelayRestartStrategyFactory(int maxAttempts, long delay) {
this.maxAttempts = maxAttempts;
this.delay = delay;
}
@Override
public RestartStrategy createRestartStrategy() {
return new FixedDelayRestartStrategy(maxAttempts, delay);
}
}
} }
...@@ -36,18 +36,31 @@ public class NoRestartStrategy implements RestartStrategy { ...@@ -36,18 +36,31 @@ public class NoRestartStrategy implements RestartStrategy {
throw new RuntimeException("NoRestartStrategy does not support restart."); throw new RuntimeException("NoRestartStrategy does not support restart.");
} }
@Override
public void disable() {}
/** /**
* Creates a NoRestartStrategy instance. * Creates a NoRestartStrategy instance.
* *
* @param configuration Configuration object which is ignored * @param configuration Configuration object which is ignored
* @return NoRestartStrategy instance * @return NoRestartStrategy instance
*/ */
public static NoRestartStrategy create(Configuration configuration) { public static NoRestartStrategyFactory createFactory(Configuration configuration) {
return new NoRestartStrategy(); return new NoRestartStrategyFactory();
} }
@Override @Override
public String toString() { public String toString() {
return "NoRestartStrategy"; return "NoRestartStrategy";
} }
public static class NoRestartStrategyFactory extends RestartStrategyFactory {
private static final long serialVersionUID = -1809462525812787862L;
@Override
public RestartStrategy createRestartStrategy() {
return new NoRestartStrategy();
}
}
} }
...@@ -38,4 +38,9 @@ public interface RestartStrategy { ...@@ -38,4 +38,9 @@ public interface RestartStrategy {
* @param executionGraph The ExecutionGraph to be restarted * @param executionGraph The ExecutionGraph to be restarted
*/ */
void restart(ExecutionGraph executionGraph); void restart(ExecutionGraph executionGraph);
/**
* Disables the restart strategy.
*/
void disable();
} }
...@@ -25,12 +25,21 @@ import org.slf4j.Logger; ...@@ -25,12 +25,21 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import scala.concurrent.duration.Duration; import scala.concurrent.duration.Duration;
import java.io.Serializable;
import java.lang.reflect.InvocationTargetException; import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method; import java.lang.reflect.Method;
public class RestartStrategyFactory { public abstract class RestartStrategyFactory implements Serializable {
private static final long serialVersionUID = 7320252552640522191L;
private static final Logger LOG = LoggerFactory.getLogger(RestartStrategyFactory.class); private static final Logger LOG = LoggerFactory.getLogger(RestartStrategyFactory.class);
private static final String CREATE_METHOD = "create"; private static final String CREATE_METHOD = "createFactory";
/**
* Factory method to create a restart strategy
* @return The created restart strategy
*/
public abstract RestartStrategy createRestartStrategy();
/** /**
* Creates a {@link RestartStrategy} instance from the given {@link org.apache.flink.api.common.restartstrategy.RestartStrategies.RestartStrategyConfiguration}. * Creates a {@link RestartStrategy} instance from the given {@link org.apache.flink.api.common.restartstrategy.RestartStrategies.RestartStrategyConfiguration}.
...@@ -58,11 +67,10 @@ public class RestartStrategyFactory { ...@@ -58,11 +67,10 @@ public class RestartStrategyFactory {
/** /**
* Creates a {@link RestartStrategy} instance from the given {@link Configuration}. * Creates a {@link RestartStrategy} instance from the given {@link Configuration}.
* *
* @param configuration Configuration object containing the configuration values.
* @return RestartStrategy instance * @return RestartStrategy instance
* @throws Exception which indicates that the RestartStrategy could not be instantiated. * @throws Exception which indicates that the RestartStrategy could not be instantiated.
*/ */
public static RestartStrategy createFromConfig(Configuration configuration) throws Exception { public static RestartStrategyFactory createRestartStrategyFactory(Configuration configuration) throws Exception {
String restartStrategyName = configuration.getString(ConfigConstants.RESTART_STRATEGY, "none").toLowerCase(); String restartStrategyName = configuration.getString(ConfigConstants.RESTART_STRATEGY, "none").toLowerCase();
switch (restartStrategyName) { switch (restartStrategyName) {
...@@ -92,16 +100,16 @@ public class RestartStrategyFactory { ...@@ -92,16 +100,16 @@ public class RestartStrategyFactory {
} }
if (numberExecutionRetries > 0 && delay >= 0) { if (numberExecutionRetries > 0 && delay >= 0) {
return new FixedDelayRestartStrategy(numberExecutionRetries, delay); return new FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(numberExecutionRetries, delay);
} else { } else {
return NoRestartStrategy.create(configuration); return NoRestartStrategy.createFactory(configuration);
} }
case "off": case "off":
case "disable": case "disable":
return NoRestartStrategy.create(configuration); return NoRestartStrategy.createFactory(configuration);
case "fixeddelay": case "fixeddelay":
case "fixed-delay": case "fixed-delay":
return FixedDelayRestartStrategy.create(configuration); return FixedDelayRestartStrategy.createFactory(configuration);
default: default:
try { try {
Class<?> clazz = Class.forName(restartStrategyName); Class<?> clazz = Class.forName(restartStrategyName);
...@@ -113,7 +121,7 @@ public class RestartStrategyFactory { ...@@ -113,7 +121,7 @@ public class RestartStrategyFactory {
Object result = method.invoke(null, configuration); Object result = method.invoke(null, configuration);
if (result != null) { if (result != null) {
return (RestartStrategy) result; return (RestartStrategyFactory) result;
} }
} }
} }
...@@ -128,7 +136,7 @@ public class RestartStrategyFactory { ...@@ -128,7 +136,7 @@ public class RestartStrategyFactory {
} }
// fallback in case of an error // fallback in case of an error
return NoRestartStrategy.create(configuration); return NoRestartStrategy.createFactory(configuration);
} }
} }
} }
...@@ -28,6 +28,7 @@ import akka.actor._ ...@@ -28,6 +28,7 @@ import akka.actor._
import akka.pattern.ask import akka.pattern.ask
import grizzled.slf4j.Logger import grizzled.slf4j.Logger
import org.apache.flink.api.common.restartstrategy.RestartStrategies.RestartStrategyConfiguration
import org.apache.flink.api.common.{ExecutionConfig, JobID} import org.apache.flink.api.common.{ExecutionConfig, JobID}
import org.apache.flink.configuration.{ConfigConstants, Configuration, GlobalConfiguration} import org.apache.flink.configuration.{ConfigConstants, Configuration, GlobalConfiguration}
...@@ -118,7 +119,7 @@ class JobManager( ...@@ -118,7 +119,7 @@ class JobManager(
protected val scheduler: FlinkScheduler, protected val scheduler: FlinkScheduler,
protected val libraryCacheManager: BlobLibraryCacheManager, protected val libraryCacheManager: BlobLibraryCacheManager,
protected val archive: ActorRef, protected val archive: ActorRef,
protected val defaultRestartStrategy: RestartStrategy, protected val restartStrategyFactory: RestartStrategyFactory,
protected val timeout: FiniteDuration, protected val timeout: FiniteDuration,
protected val leaderElectionService: LeaderElectionService, protected val leaderElectionService: LeaderElectionService,
protected val submittedJobGraphs : SubmittedJobGraphStore, protected val submittedJobGraphs : SubmittedJobGraphStore,
...@@ -210,7 +211,7 @@ class JobManager( ...@@ -210,7 +211,7 @@ class JobManager(
log.info(s"Stopping JobManager $getAddress.") log.info(s"Stopping JobManager $getAddress.")
val newFuturesToComplete = cancelAndClearEverything( val newFuturesToComplete = cancelAndClearEverything(
new Exception("The JobManager is shutting down."), new SuppressRestartsException(new Exception("The JobManager is shutting down.")),
removeJobFromStateBackend = true) removeJobFromStateBackend = true)
implicit val executionContext = context.dispatcher implicit val executionContext = context.dispatcher
...@@ -307,7 +308,7 @@ class JobManager( ...@@ -307,7 +308,7 @@ class JobManager(
log.info(s"JobManager ${self.path.toSerializationFormat} was revoked leadership.") log.info(s"JobManager ${self.path.toSerializationFormat} was revoked leadership.")
val newFuturesToComplete = cancelAndClearEverything( val newFuturesToComplete = cancelAndClearEverything(
new Exception("JobManager is no longer the leader."), new SuppressRestartsException(new Exception("JobManager is no longer the leader.")),
removeJobFromStateBackend = false) removeJobFromStateBackend = false)
futuresToComplete = Some(futuresToComplete.getOrElse(Seq()) ++ newFuturesToComplete) futuresToComplete = Some(futuresToComplete.getOrElse(Seq()) ++ newFuturesToComplete)
...@@ -1071,7 +1072,7 @@ class JobManager( ...@@ -1071,7 +1072,7 @@ class JobManager(
val restartStrategy = Option(jobGraph.getExecutionConfig().getRestartStrategy()) val restartStrategy = Option(jobGraph.getExecutionConfig().getRestartStrategy())
.map(RestartStrategyFactory.createRestartStrategy(_)) match { .map(RestartStrategyFactory.createRestartStrategy(_)) match {
case Some(strategy) => strategy case Some(strategy) => strategy
case None => defaultRestartStrategy case None => restartStrategyFactory.createRestartStrategy()
} }
log.info(s"Using restart strategy $restartStrategy for $jobId.") log.info(s"Using restart strategy $restartStrategy for $jobId.")
...@@ -1629,7 +1630,7 @@ class JobManager( ...@@ -1629,7 +1630,7 @@ class JobManager(
* @param cause Cause for the cancelling. * @param cause Cause for the cancelling.
*/ */
private def cancelAndClearEverything( private def cancelAndClearEverything(
cause: Throwable, cause: SuppressRestartsException,
removeJobFromStateBackend: Boolean) removeJobFromStateBackend: Boolean)
: Seq[Future[Unit]] = { : Seq[Future[Unit]] = {
val futures = for ((jobID, (eg, jobInfo)) <- currentJobs) yield { val futures = for ((jobID, (eg, jobInfo)) <- currentJobs) yield {
...@@ -2265,7 +2266,7 @@ object JobManager { ...@@ -2265,7 +2266,7 @@ object JobManager {
InstanceManager, InstanceManager,
FlinkScheduler, FlinkScheduler,
BlobLibraryCacheManager, BlobLibraryCacheManager,
RestartStrategy, RestartStrategyFactory,
FiniteDuration, // timeout FiniteDuration, // timeout
Int, // number of archived jobs Int, // number of archived jobs
LeaderElectionService, LeaderElectionService,
...@@ -2281,8 +2282,7 @@ object JobManager { ...@@ -2281,8 +2282,7 @@ object JobManager {
ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL, ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL,
ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) * 1000 ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) * 1000
val restartStrategy = RestartStrategyFactory val restartStrategy = RestartStrategyFactory.createRestartStrategyFactory(configuration)
.createFromConfig(configuration)
val archiveCount = configuration.getInteger(ConfigConstants.JOB_MANAGER_WEB_ARCHIVE_COUNT, val archiveCount = configuration.getInteger(ConfigConstants.JOB_MANAGER_WEB_ARCHIVE_COUNT,
ConfigConstants.DEFAULT_JOB_MANAGER_WEB_ARCHIVE_COUNT) ConfigConstants.DEFAULT_JOB_MANAGER_WEB_ARCHIVE_COUNT)
......
...@@ -41,7 +41,7 @@ import org.apache.flink.runtime.accumulators.AccumulatorSnapshot ...@@ -41,7 +41,7 @@ import org.apache.flink.runtime.accumulators.AccumulatorSnapshot
import org.apache.flink.runtime.clusterframework.messages.StopCluster import org.apache.flink.runtime.clusterframework.messages.StopCluster
import org.apache.flink.runtime.clusterframework.types.ResourceID import org.apache.flink.runtime.clusterframework.types.ResourceID
import org.apache.flink.runtime.akka.AkkaUtils import org.apache.flink.runtime.akka.AkkaUtils
import org.apache.flink.runtime.blob.{BlobKey, BlobClient, BlobCache, BlobService} import org.apache.flink.runtime.blob.{BlobClient, BlobCache, BlobService}
import org.apache.flink.runtime.broadcast.BroadcastVariableManager import org.apache.flink.runtime.broadcast.BroadcastVariableManager
import org.apache.flink.runtime.deployment.{InputChannelDeploymentDescriptor, TaskDeploymentDescriptor} import org.apache.flink.runtime.deployment.{InputChannelDeploymentDescriptor, TaskDeploymentDescriptor}
import org.apache.flink.runtime.execution.ExecutionState import org.apache.flink.runtime.execution.ExecutionState
......
...@@ -192,7 +192,7 @@ public class JobManagerLeaderElectionTest extends TestLogger { ...@@ -192,7 +192,7 @@ public class JobManagerLeaderElectionTest extends TestLogger {
new Scheduler(TestingUtils.defaultExecutionContext()), new Scheduler(TestingUtils.defaultExecutionContext()),
new BlobLibraryCacheManager(new BlobServer(configuration), 10L), new BlobLibraryCacheManager(new BlobServer(configuration), 10L),
ActorRef.noSender(), ActorRef.noSender(),
new NoRestartStrategy(), new NoRestartStrategy.NoRestartStrategyFactory(),
AkkaUtils.getDefaultTimeout(), AkkaUtils.getDefaultTimeout(),
leaderElectionService, leaderElectionService,
submittedJobGraphStore, submittedJobGraphStore,
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.leaderelection;
import akka.actor.ActorRef;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.Tasks;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.messages.ExecutionGraphMessages;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
import org.apache.flink.util.TestLogger;
import org.junit.Before;
import org.junit.Test;
import scala.concurrent.Await;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.Promise;
import scala.concurrent.duration.FiniteDuration;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static org.junit.Assert.assertTrue;
public class LeaderChangeJobRecoveryTest extends TestLogger {
private static FiniteDuration timeout = FiniteDuration.apply(30, TimeUnit.SECONDS);
private int numTMs = 1;
private int numSlotsPerTM = 1;
private int parallelism = numTMs * numSlotsPerTM;
private Configuration configuration;
private LeaderElectionRetrievalTestingCluster cluster = null;
private JobGraph job = createBlockingJob(parallelism);
@Before
public void before() throws TimeoutException, InterruptedException {
Tasks.BlockingOnceReceiver$.MODULE$.blocking_$eq(true);
configuration = new Configuration();
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 1);
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTM);
cluster = new LeaderElectionRetrievalTestingCluster(configuration, true, false, new FixedDelayRestartStrategy(9999, 100));
cluster.start(false);
// wait for actors to be alive so that they have started their leader retrieval service
cluster.waitForActorsToBeAlive();
}
/**
* Tests that the job is not restarted or at least terminates eventually in case that the
* JobManager loses its leadership.
*
* @throws Exception
*/
@Test
public void testNotRestartedWhenLosingLeadership() throws Exception {
UUID leaderSessionID = UUID.randomUUID();
cluster.grantLeadership(0, leaderSessionID);
cluster.notifyRetrievalListeners(0, leaderSessionID);
cluster.waitForTaskManagersToBeRegistered(timeout);
cluster.submitJobDetached(job);
ActorGateway jm = cluster.getLeaderGateway(timeout);
Future<Object> wait = jm.ask(new TestingJobManagerMessages.WaitForAllVerticesToBeRunningOrFinished(job.getJobID()), timeout);
Await.ready(wait, timeout);
Future<Object> futureExecutionGraph = jm.ask(new TestingJobManagerMessages.RequestExecutionGraph(job.getJobID()), timeout);
TestingJobManagerMessages.ResponseExecutionGraph responseExecutionGraph =
(TestingJobManagerMessages.ResponseExecutionGraph) Await.result(futureExecutionGraph, timeout);
assertTrue(responseExecutionGraph instanceof TestingJobManagerMessages.ExecutionGraphFound);
ExecutionGraph executionGraph = ((TestingJobManagerMessages.ExecutionGraphFound) responseExecutionGraph).executionGraph();
TestActorGateway testActorGateway = new TestActorGateway();
executionGraph.registerJobStatusListener(testActorGateway);
cluster.revokeLeadership();
Future<Boolean> hasReachedTerminalState = testActorGateway.hasReachedTerminalState();
assertTrue("The job should have reached a terminal state.", Await.result(hasReachedTerminalState, timeout));
}
public JobGraph createBlockingJob(int parallelism) {
Tasks.BlockingOnceReceiver$.MODULE$.blocking_$eq(true);
JobVertex sender = new JobVertex("sender");
JobVertex receiver = new JobVertex("receiver");
sender.setInvokableClass(Tasks.Sender.class);
receiver.setInvokableClass(Tasks.BlockingOnceReceiver.class);
sender.setParallelism(parallelism);
receiver.setParallelism(parallelism);
receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE);
SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
sender.setSlotSharingGroup(slotSharingGroup);
receiver.setSlotSharingGroup(slotSharingGroup);
ExecutionConfig executionConfig = new ExecutionConfig();
return new JobGraph("Blocking test job", executionConfig, sender, receiver);
}
public static class TestActorGateway implements ActorGateway {
private static final long serialVersionUID = -736146686160538227L;
private transient Promise<Boolean> terminalState = new scala.concurrent.impl.Promise.DefaultPromise<>();
public Future<Boolean> hasReachedTerminalState() {
return terminalState.future();
}
@Override
public Future<Object> ask(Object message, FiniteDuration timeout) {
return null;
}
@Override
public void tell(Object message) {
this.tell(message, new AkkaActorGateway(ActorRef.noSender(), null));
}
@Override
public void tell(Object message, ActorGateway sender) {
if (message instanceof ExecutionGraphMessages.JobStatusChanged) {
ExecutionGraphMessages.JobStatusChanged jobStatusChanged = (ExecutionGraphMessages.JobStatusChanged) message;
if (jobStatusChanged.newJobStatus().isTerminalState()) {
terminalState.success(true);
}
}
}
@Override
public void forward(Object message, ActorGateway sender) {
}
@Override
public Future<Object> retry(Object message, int numberRetries, FiniteDuration timeout, ExecutionContext executionContext) {
return null;
}
@Override
public String path() {
return null;
}
@Override
public ActorRef actor() {
return null;
}
@Override
public UUID leaderSessionID() {
return null;
}
}
}
...@@ -68,7 +68,7 @@ public class LeaderChangeStateCleanupTest extends TestLogger { ...@@ -68,7 +68,7 @@ public class LeaderChangeStateCleanupTest extends TestLogger {
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs); configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTM); configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTM);
cluster = new LeaderElectionRetrievalTestingCluster(configuration, true, false); cluster = new LeaderElectionRetrievalTestingCluster(configuration, true, false, null);
cluster.start(false); // TaskManagers don't have to register at the JobManager cluster.start(false); // TaskManagers don't have to register at the JobManager
cluster.waitForActorsToBeAlive(); // we only wait until all actors are alive cluster.waitForActorsToBeAlive(); // we only wait until all actors are alive
......
...@@ -20,6 +20,7 @@ package org.apache.flink.runtime.leaderelection; ...@@ -20,6 +20,7 @@ package org.apache.flink.runtime.leaderelection;
import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.testingUtils.TestingCluster; import org.apache.flink.runtime.testingUtils.TestingCluster;
import scala.Option; import scala.Option;
...@@ -38,6 +39,7 @@ public class LeaderElectionRetrievalTestingCluster extends TestingCluster { ...@@ -38,6 +39,7 @@ public class LeaderElectionRetrievalTestingCluster extends TestingCluster {
private final Configuration userConfiguration; private final Configuration userConfiguration;
private final boolean useSingleActorSystem; private final boolean useSingleActorSystem;
private final RestartStrategy restartStrategy;
public List<TestingLeaderElectionService> leaderElectionServices; public List<TestingLeaderElectionService> leaderElectionServices;
public List<TestingLeaderRetrievalService> leaderRetrievalServices; public List<TestingLeaderRetrievalService> leaderRetrievalServices;
...@@ -47,7 +49,8 @@ public class LeaderElectionRetrievalTestingCluster extends TestingCluster { ...@@ -47,7 +49,8 @@ public class LeaderElectionRetrievalTestingCluster extends TestingCluster {
public LeaderElectionRetrievalTestingCluster( public LeaderElectionRetrievalTestingCluster(
Configuration userConfiguration, Configuration userConfiguration,
boolean singleActorSystem, boolean singleActorSystem,
boolean synchronousDispatcher) { boolean synchronousDispatcher,
RestartStrategy restartStrategy) {
super(userConfiguration, singleActorSystem, synchronousDispatcher); super(userConfiguration, singleActorSystem, synchronousDispatcher);
this.userConfiguration = userConfiguration; this.userConfiguration = userConfiguration;
...@@ -55,6 +58,8 @@ public class LeaderElectionRetrievalTestingCluster extends TestingCluster { ...@@ -55,6 +58,8 @@ public class LeaderElectionRetrievalTestingCluster extends TestingCluster {
leaderElectionServices = new ArrayList<TestingLeaderElectionService>(); leaderElectionServices = new ArrayList<TestingLeaderElectionService>();
leaderRetrievalServices = new ArrayList<TestingLeaderRetrievalService>(); leaderRetrievalServices = new ArrayList<TestingLeaderRetrievalService>();
this.restartStrategy = restartStrategy;
} }
@Override @Override
...@@ -90,6 +95,15 @@ public class LeaderElectionRetrievalTestingCluster extends TestingCluster { ...@@ -90,6 +95,15 @@ public class LeaderElectionRetrievalTestingCluster extends TestingCluster {
ConfigConstants.DEFAULT_LOCAL_NUMBER_JOB_MANAGER); ConfigConstants.DEFAULT_LOCAL_NUMBER_JOB_MANAGER);
} }
@Override
public RestartStrategy getRestartStrategy(RestartStrategy other) {
if (this.restartStrategy != null) {
return this.restartStrategy;
} else {
return other;
}
}
public void grantLeadership(int index, UUID leaderSessionID) { public void grantLeadership(int index, UUID leaderSessionID) {
if(leaderIndex >= 0) { if(leaderIndex >= 0) {
// first revoke leadership // first revoke leadership
...@@ -109,4 +123,11 @@ public class LeaderElectionRetrievalTestingCluster extends TestingCluster { ...@@ -109,4 +123,11 @@ public class LeaderElectionRetrievalTestingCluster extends TestingCluster {
service.notifyListener(address, leaderSessionID); service.notifyListener(address, leaderSessionID);
} }
} }
public void revokeLeadership() {
if (leaderIndex >= 0) {
leaderElectionServices.get(leaderIndex).notLeader();
leaderIndex = -1;
}
}
} }
...@@ -26,11 +26,11 @@ import akka.testkit.CallingThreadDispatcher ...@@ -26,11 +26,11 @@ import akka.testkit.CallingThreadDispatcher
import org.apache.flink.configuration.{ConfigConstants, Configuration} import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.runtime.clusterframework.FlinkResourceManager import org.apache.flink.runtime.clusterframework.FlinkResourceManager
import org.apache.flink.runtime.clusterframework.types.ResourceID import org.apache.flink.runtime.clusterframework.types.ResourceID
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy
import org.apache.flink.runtime.jobmanager.JobManager import org.apache.flink.runtime.jobmanager.JobManager
import org.apache.flink.runtime.leaderelection.LeaderElectionService import org.apache.flink.runtime.leaderelection.LeaderElectionService
import org.apache.flink.runtime.minicluster.FlinkMiniCluster import org.apache.flink.runtime.minicluster.FlinkMiniCluster
import org.apache.flink.runtime.testutils.TestingResourceManager import org.apache.flink.runtime.testutils.TestingResourceManager
import org.apache.flink.util.NetUtils
import org.apache.flink.runtime.taskmanager.TaskManager import org.apache.flink.runtime.taskmanager.TaskManager
import org.apache.flink.runtime.testingUtils.TestingMessages.Alive import org.apache.flink.runtime.testingUtils.TestingMessages.Alive
...@@ -100,7 +100,7 @@ class TestingCluster( ...@@ -100,7 +100,7 @@ class TestingCluster(
instanceManager, instanceManager,
scheduler, scheduler,
libraryCacheManager, libraryCacheManager,
restartStrategy, restartStrategyFactory,
timeout, timeout,
archiveCount, archiveCount,
leaderElectionService, leaderElectionService,
...@@ -122,7 +122,7 @@ class TestingCluster( ...@@ -122,7 +122,7 @@ class TestingCluster(
scheduler, scheduler,
libraryCacheManager, libraryCacheManager,
archive, archive,
restartStrategy, restartStrategyFactory,
timeout, timeout,
leaderElectionService, leaderElectionService,
submittedJobsGraphs, submittedJobsGraphs,
...@@ -186,6 +186,10 @@ class TestingCluster( ...@@ -186,6 +186,10 @@ class TestingCluster(
None None
} }
def getRestartStrategy(restartStrategy: RestartStrategy) = {
restartStrategy
}
@throws(classOf[TimeoutException]) @throws(classOf[TimeoutException])
@throws(classOf[InterruptedException]) @throws(classOf[InterruptedException])
def waitForTaskManagersToBeAlive(): Unit = { def waitForTaskManagersToBeAlive(): Unit = {
......
...@@ -23,7 +23,7 @@ import akka.actor.ActorRef ...@@ -23,7 +23,7 @@ import akka.actor.ActorRef
import org.apache.flink.configuration.Configuration import org.apache.flink.configuration.Configuration
import org.apache.flink.runtime.checkpoint.{SavepointStore, CheckpointRecoveryFactory} import org.apache.flink.runtime.checkpoint.{SavepointStore, CheckpointRecoveryFactory}
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
import org.apache.flink.runtime.instance.InstanceManager import org.apache.flink.runtime.instance.InstanceManager
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
import org.apache.flink.runtime.jobmanager.{JobManager, SubmittedJobGraphStore} import org.apache.flink.runtime.jobmanager.{JobManager, SubmittedJobGraphStore}
...@@ -44,7 +44,7 @@ class TestingJobManager( ...@@ -44,7 +44,7 @@ class TestingJobManager(
scheduler: Scheduler, scheduler: Scheduler,
libraryCacheManager: BlobLibraryCacheManager, libraryCacheManager: BlobLibraryCacheManager,
archive: ActorRef, archive: ActorRef,
restartStrategy: RestartStrategy, restartStrategyFactory: RestartStrategyFactory,
timeout: FiniteDuration, timeout: FiniteDuration,
leaderElectionService: LeaderElectionService, leaderElectionService: LeaderElectionService,
submittedJobGraphs : SubmittedJobGraphStore, submittedJobGraphs : SubmittedJobGraphStore,
...@@ -58,7 +58,7 @@ class TestingJobManager( ...@@ -58,7 +58,7 @@ class TestingJobManager(
scheduler, scheduler,
libraryCacheManager, libraryCacheManager,
archive, archive,
restartStrategy, restartStrategyFactory,
timeout, timeout,
leaderElectionService, leaderElectionService,
submittedJobGraphs, submittedJobGraphs,
......
...@@ -24,7 +24,7 @@ import akka.actor.ActorRef ...@@ -24,7 +24,7 @@ import akka.actor.ActorRef
import org.apache.flink.configuration.Configuration import org.apache.flink.configuration.Configuration
import org.apache.flink.runtime.checkpoint.{SavepointStore, CheckpointRecoveryFactory} import org.apache.flink.runtime.checkpoint.{SavepointStore, CheckpointRecoveryFactory}
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
import org.apache.flink.runtime.instance.InstanceManager import org.apache.flink.runtime.instance.InstanceManager
import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
...@@ -46,7 +46,7 @@ import scala.concurrent.duration.FiniteDuration ...@@ -46,7 +46,7 @@ import scala.concurrent.duration.FiniteDuration
* @param scheduler Scheduler to schedule Flink jobs * @param scheduler Scheduler to schedule Flink jobs
* @param libraryCacheManager Manager to manage uploaded jar files * @param libraryCacheManager Manager to manage uploaded jar files
* @param archive Archive for finished Flink jobs * @param archive Archive for finished Flink jobs
* @param restartStrategy Default restart strategy for job restarts * @param restartStrategyFactory Default restart strategy for job restarts
* @param timeout Timeout for futures * @param timeout Timeout for futures
* @param leaderElectionService LeaderElectionService to participate in the leader election * @param leaderElectionService LeaderElectionService to participate in the leader election
*/ */
...@@ -57,7 +57,7 @@ class TestingYarnJobManager( ...@@ -57,7 +57,7 @@ class TestingYarnJobManager(
scheduler: Scheduler, scheduler: Scheduler,
libraryCacheManager: BlobLibraryCacheManager, libraryCacheManager: BlobLibraryCacheManager,
archive: ActorRef, archive: ActorRef,
restartStrategy: RestartStrategy, restartStrategyFactory: RestartStrategyFactory,
timeout: FiniteDuration, timeout: FiniteDuration,
leaderElectionService: LeaderElectionService, leaderElectionService: LeaderElectionService,
submittedJobGraphs : SubmittedJobGraphStore, submittedJobGraphs : SubmittedJobGraphStore,
...@@ -71,7 +71,7 @@ class TestingYarnJobManager( ...@@ -71,7 +71,7 @@ class TestingYarnJobManager(
scheduler, scheduler,
libraryCacheManager, libraryCacheManager,
archive, archive,
restartStrategy, restartStrategyFactory,
timeout, timeout,
leaderElectionService, leaderElectionService,
submittedJobGraphs, submittedJobGraphs,
......
...@@ -26,7 +26,7 @@ import org.apache.flink.api.common.JobID ...@@ -26,7 +26,7 @@ import org.apache.flink.api.common.JobID
import org.apache.flink.configuration.{Configuration => FlinkConfiguration, ConfigConstants} import org.apache.flink.configuration.{Configuration => FlinkConfiguration, ConfigConstants}
import org.apache.flink.runtime.checkpoint.{SavepointStore, CheckpointRecoveryFactory} import org.apache.flink.runtime.checkpoint.{SavepointStore, CheckpointRecoveryFactory}
import org.apache.flink.runtime.clusterframework.ApplicationStatus import org.apache.flink.runtime.clusterframework.ApplicationStatus
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
import org.apache.flink.runtime.clusterframework.messages._ import org.apache.flink.runtime.clusterframework.messages._
import org.apache.flink.runtime.jobgraph.JobStatus import org.apache.flink.runtime.jobgraph.JobStatus
import org.apache.flink.runtime.jobmanager.{SubmittedJobGraphStore, JobManager} import org.apache.flink.runtime.jobmanager.{SubmittedJobGraphStore, JobManager}
...@@ -53,7 +53,7 @@ import scala.language.postfixOps ...@@ -53,7 +53,7 @@ import scala.language.postfixOps
* @param scheduler Scheduler to schedule Flink jobs * @param scheduler Scheduler to schedule Flink jobs
* @param libraryCacheManager Manager to manage uploaded jar files * @param libraryCacheManager Manager to manage uploaded jar files
* @param archive Archive for finished Flink jobs * @param archive Archive for finished Flink jobs
* @param restartStrategy Restart strategy to be used in case of a job recovery * @param restartStrategyFactory Restart strategy to be used in case of a job recovery
* @param timeout Timeout for futures * @param timeout Timeout for futures
* @param leaderElectionService LeaderElectionService to participate in the leader election * @param leaderElectionService LeaderElectionService to participate in the leader election
*/ */
...@@ -64,7 +64,7 @@ class YarnJobManager( ...@@ -64,7 +64,7 @@ class YarnJobManager(
scheduler: FlinkScheduler, scheduler: FlinkScheduler,
libraryCacheManager: BlobLibraryCacheManager, libraryCacheManager: BlobLibraryCacheManager,
archive: ActorRef, archive: ActorRef,
restartStrategy: RestartStrategy, restartStrategyFactory: RestartStrategyFactory,
timeout: FiniteDuration, timeout: FiniteDuration,
leaderElectionService: LeaderElectionService, leaderElectionService: LeaderElectionService,
submittedJobGraphs : SubmittedJobGraphStore, submittedJobGraphs : SubmittedJobGraphStore,
...@@ -78,7 +78,7 @@ class YarnJobManager( ...@@ -78,7 +78,7 @@ class YarnJobManager(
scheduler, scheduler,
libraryCacheManager, libraryCacheManager,
archive, archive,
restartStrategy, restartStrategyFactory,
timeout, timeout,
leaderElectionService, leaderElectionService,
submittedJobGraphs, submittedJobGraphs,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册