未验证 提交 57872d53 编写于 作者: D Dawid Wysakowicz 提交者: Till Rohrmann

[FLINK-9143] Use cluster strategy if none was set on client side

Added NoOrFixedIfCheckpointingEnabledRestartStrategy

This closes #6283.
上级 c9ad0a07
......@@ -23,6 +23,7 @@ import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.util.Preconditions;
import com.esotericsoftware.kryo.Serializer;
......@@ -138,7 +139,8 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
@Deprecated
private long executionRetryDelay = DEFAULT_RESTART_DELAY;
private RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration;
private RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration =
new RestartStrategies.FallbackRestartStrategyConfiguration();
private long taskCancellationIntervalMillis = -1;
......@@ -390,7 +392,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
*/
@PublicEvolving
public void setRestartStrategy(RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration) {
this.restartStrategyConfiguration = restartStrategyConfiguration;
this.restartStrategyConfiguration = Preconditions.checkNotNull(restartStrategyConfiguration);
}
/**
......@@ -401,14 +403,14 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
@PublicEvolving
@SuppressWarnings("deprecation")
public RestartStrategies.RestartStrategyConfiguration getRestartStrategy() {
if (restartStrategyConfiguration == null) {
if (restartStrategyConfiguration instanceof RestartStrategies.FallbackRestartStrategyConfiguration) {
// support the old API calls by creating a restart strategy from them
if (getNumberOfExecutionRetries() > 0 && getExecutionRetryDelay() >= 0) {
return RestartStrategies.fixedDelayRestart(getNumberOfExecutionRetries(), getExecutionRetryDelay());
} else if (getNumberOfExecutionRetries() == 0) {
return RestartStrategies.noRestart();
} else {
return null;
return restartStrategyConfiguration;
}
} else {
return restartStrategyConfiguration;
......
......@@ -22,6 +22,7 @@ import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.time.Time;
import java.io.Serializable;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
/**
......@@ -106,6 +107,19 @@ public class RestartStrategies {
public String getDescription() {
return "Restart deactivated.";
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
return o instanceof NoRestartStrategyConfiguration;
}
@Override
public int hashCode() {
return Objects.hash();
}
}
/**
......@@ -188,6 +202,25 @@ public class RestartStrategies {
return "Failure rate restart with maximum of " + maxFailureRate + " failures within interval " + failureInterval.toString()
+ " and fixed delay " + delayBetweenAttemptsInterval.toString();
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
FailureRateRestartStrategyConfiguration that = (FailureRateRestartStrategyConfiguration) o;
return maxFailureRate == that.maxFailureRate &&
Objects.equals(failureInterval, that.failureInterval) &&
Objects.equals(delayBetweenAttemptsInterval, that.delayBetweenAttemptsInterval);
}
@Override
public int hashCode() {
return Objects.hash(maxFailureRate, failureInterval, delayBetweenAttemptsInterval);
}
}
/**
......@@ -195,12 +228,25 @@ public class RestartStrategies {
* strategy. Useful especially when one has a custom implementation of restart strategy set via
* flink-conf.yaml.
*/
public static final class FallbackRestartStrategyConfiguration extends RestartStrategyConfiguration{
public static final class FallbackRestartStrategyConfiguration extends RestartStrategyConfiguration {
private static final long serialVersionUID = -4441787204284085544L;
@Override
public String getDescription() {
return "Cluster level default restart strategy";
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
return o instanceof FallbackRestartStrategyConfiguration;
}
@Override
public int hashCode() {
return Objects.hash();
}
}
}
......@@ -313,7 +313,7 @@ public class WebFrontendITCase extends TestLogger {
assertEquals(HttpResponseStatus.OK, response.getStatus());
assertEquals("application/json; charset=UTF-8", response.getType());
assertEquals("{\"jid\":\"" + jid + "\",\"name\":\"Stoppable streaming test job\"," +
"\"execution-config\":{\"execution-mode\":\"PIPELINED\",\"restart-strategy\":\"default\"," +
"\"execution-config\":{\"execution-mode\":\"PIPELINED\",\"restart-strategy\":\"Cluster level default restart strategy\"," +
"\"job-parallelism\":-1,\"object-reuse-mode\":false,\"user-config\":{}}}", response.getContent());
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.executiongraph.restart;
/**
* Default restart strategy that resolves either to {@link NoRestartStrategy} or {@link FixedDelayRestartStrategy}
* depending if checkpointing was enabled.
*/
public class NoOrFixedIfCheckpointingEnabledRestartStrategyFactory extends RestartStrategyFactory {
private static final long DEFAULT_RESTART_DELAY = 0;
private static final long serialVersionUID = -1809462525812787862L;
@Override
public RestartStrategy createRestartStrategy() {
return createRestartStrategy(false);
}
RestartStrategy createRestartStrategy(boolean isCheckpointingEnabled) {
if (isCheckpointingEnabled) {
return new FixedDelayRestartStrategy(Integer.MAX_VALUE, DEFAULT_RESTART_DELAY);
} else {
return new NoRestartStrategy();
}
}
}
......@@ -38,10 +38,10 @@ public class NoRestartStrategy implements RestartStrategy {
}
/**
* Creates a NoRestartStrategy instance.
* Creates a NoRestartStrategyFactory instance.
*
* @param configuration Configuration object which is ignored
* @return NoRestartStrategy instance
* @return NoRestartStrategyFactory instance
*/
public static NoRestartStrategyFactory createFactory(Configuration configuration) {
return new NoRestartStrategyFactory();
......
......@@ -82,38 +82,41 @@ public abstract class RestartStrategyFactory implements Serializable {
* @throws Exception which indicates that the RestartStrategy could not be instantiated.
*/
public static RestartStrategyFactory createRestartStrategyFactory(Configuration configuration) throws Exception {
String restartStrategyName = configuration.getString(ConfigConstants.RESTART_STRATEGY, "none");
String restartStrategyName = configuration.getString(ConfigConstants.RESTART_STRATEGY, null);
if (restartStrategyName == null) {
// support deprecated ConfigConstants values
final int numberExecutionRetries = configuration.getInteger(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS,
ConfigConstants.DEFAULT_EXECUTION_RETRIES);
String pauseString = configuration.getString(AkkaOptions.WATCH_HEARTBEAT_PAUSE);
String delayString = configuration.getString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY,
pauseString);
long delay;
try {
delay = Duration.apply(delayString).toMillis();
} catch (NumberFormatException nfe) {
if (delayString.equals(pauseString)) {
throw new Exception("Invalid config value for " +
AkkaOptions.WATCH_HEARTBEAT_PAUSE.key() + ": " + pauseString +
". Value must be a valid duration (such as '10 s' or '1 min')");
} else {
throw new Exception("Invalid config value for " +
ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY + ": " + delayString +
". Value must be a valid duration (such as '100 milli' or '10 s')");
}
}
if (numberExecutionRetries > 0 && delay >= 0) {
return new FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(numberExecutionRetries, delay);
} else {
return new NoOrFixedIfCheckpointingEnabledRestartStrategyFactory();
}
}
switch (restartStrategyName.toLowerCase()) {
case "none":
// support deprecated ConfigConstants values
final int numberExecutionRetries = configuration.getInteger(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS,
ConfigConstants.DEFAULT_EXECUTION_RETRIES);
String pauseString = configuration.getString(AkkaOptions.WATCH_HEARTBEAT_PAUSE);
String delayString = configuration.getString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY,
pauseString);
long delay;
try {
delay = Duration.apply(delayString).toMillis();
} catch (NumberFormatException nfe) {
if (delayString.equals(pauseString)) {
throw new Exception("Invalid config value for " +
AkkaOptions.WATCH_HEARTBEAT_PAUSE.key() + ": " + pauseString +
". Value must be a valid duration (such as '10 s' or '1 min')");
} else {
throw new Exception("Invalid config value for " +
ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY + ": " + delayString +
". Value must be a valid duration (such as '100 milli' or '10 s')");
}
}
if (numberExecutionRetries > 0 && delay >= 0) {
return new FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(numberExecutionRetries, delay);
} else {
return NoRestartStrategy.createFactory(configuration);
}
case "off":
case "disable":
return NoRestartStrategy.createFactory(configuration);
......@@ -149,7 +152,7 @@ public abstract class RestartStrategyFactory implements Serializable {
}
// fallback in case of an error
return NoRestartStrategy.createFactory(configuration);
return new NoOrFixedIfCheckpointingEnabledRestartStrategyFactory();
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.executiongraph.restart;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
/**
* Utility method for resolving {@link RestartStrategy}.
*/
public final class RestartStrategyResolving {
/**
* Resolves which {@link RestartStrategy} to use. It should be used only on the server side.
* The resolving strategy is as follows:
* <ol>
* <li>Strategy set within job graph.</li>
* <li>Strategy set flink-conf.yaml on the server set, unless is set to {@link NoRestartStrategy} and checkpointing
* is enabled.</li>
* <li>If no strategy was set on client and server side and checkpointing was enabled then
* {@link FixedDelayRestartStrategy} is used</li>
* </ol>
*
* @param clientConfiguration restart configuration given within the job graph
* @param serverStrategyFactory default server side strategy factory
* @param isCheckpointingEnabled if checkpointing was enabled for the job
* @return resolved strategy
*/
public static RestartStrategy resolve(
RestartStrategies.RestartStrategyConfiguration clientConfiguration,
RestartStrategyFactory serverStrategyFactory,
boolean isCheckpointingEnabled) {
final RestartStrategy clientSideRestartStrategy =
RestartStrategyFactory.createRestartStrategy(clientConfiguration);
if (clientSideRestartStrategy != null) {
return clientSideRestartStrategy;
} else {
if (serverStrategyFactory instanceof NoOrFixedIfCheckpointingEnabledRestartStrategyFactory) {
return ((NoOrFixedIfCheckpointingEnabledRestartStrategyFactory) serverStrategyFactory)
.createRestartStrategy(isCheckpointingEnabled);
} else {
return serverStrategyFactory.createRestartStrategy();
}
}
}
private RestartStrategyResolving() {
}
}
......@@ -329,7 +329,7 @@ public class JobGraph implements Serializable {
* Sets the settings for asynchronous snapshots. A value of {@code null} means that
* snapshotting is not enabled.
*
* @param settings The snapshot settings, or null, to disable snapshotting.
* @param settings The snapshot settings
*/
public void setSnapshotSettings(JobCheckpointingSettings settings) {
this.snapshotSettings = settings;
......@@ -339,12 +339,28 @@ public class JobGraph implements Serializable {
* Gets the settings for asynchronous snapshots. This method returns null, when
* checkpointing is not enabled.
*
* @return The snapshot settings, or null, if checkpointing is not enabled.
* @return The snapshot settings
*/
public JobCheckpointingSettings getCheckpointingSettings() {
return snapshotSettings;
}
/**
* Checks if the checkpointing was enabled for this job graph
*
* @return true if checkpointing enabled
*/
public boolean isCheckpointingEnabled() {
if (snapshotSettings == null) {
return false;
}
long checkpointInterval = snapshotSettings.getCheckpointCoordinatorConfiguration().getCheckpointInterval();
return checkpointInterval > 0 &&
checkpointInterval < Long.MAX_VALUE;
}
/**
* Searches for a vertex with a matching ID and returns it.
*
......
......@@ -18,6 +18,7 @@
package org.apache.flink.runtime.jobmaster;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
......@@ -51,7 +52,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.IntermediateResult;
import org.apache.flink.runtime.executiongraph.JobStatusListener;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategyResolving;
import org.apache.flink.runtime.heartbeat.HeartbeatListener;
import org.apache.flink.runtime.heartbeat.HeartbeatManager;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
......@@ -273,11 +274,11 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
.deserializeValue(userCodeLoader)
.getRestartStrategy();
this.restartStrategy = (restartStrategyConfiguration != null) ?
RestartStrategyFactory.createRestartStrategy(restartStrategyConfiguration) :
jobManagerSharedServices.getRestartStrategyFactory().createRestartStrategy();
this.restartStrategy = RestartStrategyResolving.resolve(restartStrategyConfiguration,
jobManagerSharedServices.getRestartStrategyFactory(),
jobGraph.isCheckpointingEnabled());
log.info("Using restart strategy {} for {} ({}).", restartStrategy, jobName, jid);
log.info("Using restart strategy {} for {} ({}).", this.restartStrategy, jobName, jid);
resourceManagerLeaderRetriever = highAvailabilityServices.getResourceManagerLeaderRetriever();
......@@ -1649,4 +1650,9 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
return CompletableFuture.completedFuture(null);
}
}
@VisibleForTesting
RestartStrategy getRestartStrategy() {
return restartStrategy;
}
}
......@@ -49,7 +49,7 @@ import org.apache.flink.runtime.execution.SuppressRestartsException
import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders.ResolveOrder
import org.apache.flink.runtime.execution.librarycache.{BlobLibraryCacheManager, LibraryCacheManager}
import org.apache.flink.runtime.executiongraph._
import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
import org.apache.flink.runtime.executiongraph.restart.{RestartStrategyFactory, RestartStrategyResolving}
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.AddressResolution
import org.apache.flink.runtime.highavailability.{HighAvailabilityServices, HighAvailabilityServicesUtils}
import org.apache.flink.runtime.instance.{AkkaActorGateway, InstanceID, InstanceManager}
......@@ -1250,15 +1250,15 @@ class JobManager(
throw new JobSubmissionException(jobId, "The given job is empty")
}
val restartStrategy =
Option(jobGraph.getSerializedExecutionConfig()
.deserializeValue(userCodeLoader)
.getRestartStrategy())
.map(RestartStrategyFactory.createRestartStrategy)
.filter(p => p != null) match {
case Some(strategy) => strategy
case None => restartStrategyFactory.createRestartStrategy()
}
val restartStrategyConfiguration = jobGraph
.getSerializedExecutionConfig
.deserializeValue(userCodeLoader)
.getRestartStrategy
val restartStrategy = RestartStrategyResolving
.resolve(restartStrategyConfiguration,
restartStrategyFactory,
jobGraph.isCheckpointingEnabled)
log.info(s"Using restart strategy $restartStrategy for $jobId.")
......
......@@ -18,6 +18,8 @@
package org.apache.flink.runtime.checkpoint;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
......@@ -40,14 +42,14 @@ import org.apache.flink.util.TestLogger;
import org.junit.Test;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
......@@ -68,7 +70,10 @@ public class CoordinatorShutdownTest extends TestLogger {
JobVertex vertex = new JobVertex("Test Vertex");
vertex.setInvokableClass(FailingBlockingInvokable.class);
List<JobVertexID> vertexIdList = Collections.singletonList(vertex.getID());
final ExecutionConfig executionConfig = new ExecutionConfig();
executionConfig.setRestartStrategy(RestartStrategies.noRestart());
JobGraph testGraph = new JobGraph("test job", vertex);
testGraph.setSnapshotSettings(
new JobCheckpointingSettings(
......@@ -83,7 +88,9 @@ public class CoordinatorShutdownTest extends TestLogger {
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
true),
null));
testGraph.setExecutionConfig(executionConfig);
ActorGateway jmGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
FiniteDuration timeout = new FiniteDuration(60, TimeUnit.SECONDS);
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.executiongraph.restart;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
import static org.apache.flink.api.common.restartstrategy.RestartStrategies.fallBackRestart;
import static org.apache.flink.api.common.restartstrategy.RestartStrategies.noRestart;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.instanceOf;
/**
* Tests for {@link RestartStrategyResolving}.
*/
public class RestartStrategyResolvingTest extends TestLogger {
@Test
public void testClientSideHighestPriority() {
RestartStrategy resolvedStrategy = RestartStrategyResolving.resolve(noRestart(),
new FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(2, 1000L),
true);
assertThat(resolvedStrategy, instanceOf(NoRestartStrategy.class));
}
@Test
public void testFixedStrategySetWhenCheckpointingEnabled() {
RestartStrategy resolvedStrategy = RestartStrategyResolving.resolve(fallBackRestart(),
new NoOrFixedIfCheckpointingEnabledRestartStrategyFactory(),
true);
assertThat(resolvedStrategy, instanceOf(FixedDelayRestartStrategy.class));
}
@Test
public void testServerStrategyIsUsedSetWhenCheckpointingEnabled() {
RestartStrategy resolvedStrategy = RestartStrategyResolving.resolve(fallBackRestart(),
new FailureRateRestartStrategy.FailureRateRestartStrategyFactory(5, Time.seconds(5), Time.seconds(2)),
true);
assertThat(resolvedStrategy, instanceOf(FailureRateRestartStrategy.class));
}
@Test
public void testServerStrategyIsUsedSetWhenCheckpointingDisabled() {
RestartStrategy resolvedStrategy = RestartStrategyResolving.resolve(fallBackRestart(),
new FailureRateRestartStrategy.FailureRateRestartStrategyFactory(5, Time.seconds(5), Time.seconds(2)),
false);
assertThat(resolvedStrategy, instanceOf(FailureRateRestartStrategy.class));
}
}
......@@ -46,6 +46,9 @@ import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
......@@ -110,6 +113,8 @@ import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
/**
* Tests for {@link JobMaster}.
......@@ -357,6 +362,42 @@ public class JobMasterTest extends TestLogger {
}
}
/**
* Tests that in a streaming use case where checkpointing is enabled, a
* fixed delay with Integer.MAX_VALUE retries is instantiated if no other restart
* strategy has been specified.
*/
@Test
public void testAutomaticRestartingWhenCheckpointing() throws Exception {
// create savepoint data
final long savepointId = 42L;
final File savepointFile = createSavepoint(savepointId);
// set savepoint settings
final SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.forPath(
savepointFile.getAbsolutePath(),
true);
final JobGraph jobGraph = createJobGraphWithCheckpointing(savepointRestoreSettings);
final StandaloneCompletedCheckpointStore completedCheckpointStore = new StandaloneCompletedCheckpointStore(1);
final TestingCheckpointRecoveryFactory testingCheckpointRecoveryFactory = new TestingCheckpointRecoveryFactory(
completedCheckpointStore,
new StandaloneCheckpointIDCounter());
haServices.setCheckpointRecoveryFactory(testingCheckpointRecoveryFactory);
final JobMaster jobMaster = createJobMaster(
new Configuration(),
jobGraph,
haServices,
new TestingJobManagerSharedServicesBuilder()
.setRestartStrategyFactory(RestartStrategyFactory.createRestartStrategyFactory(configuration))
.build());
RestartStrategy restartStrategy = jobMaster.getRestartStrategy();
assertNotNull(restartStrategy);
assertTrue(restartStrategy instanceof FixedDelayRestartStrategy);
}
/**
* Tests that an existing checkpoint will have precedence over an savepoint
*/
......
......@@ -19,7 +19,7 @@
package org.apache.flink.runtime.jobmaster;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.NoOrFixedIfCheckpointingEnabledRestartStrategyFactory;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.StackTraceSampleCoordinator;
......@@ -48,7 +48,7 @@ public class TestingJobManagerSharedServicesBuilder {
public TestingJobManagerSharedServicesBuilder() {
scheduledExecutorService = TestingUtils.defaultExecutor();
libraryCacheManager = mock(LibraryCacheManager.class);
restartStrategyFactory = new NoRestartStrategy.NoRestartStrategyFactory();
restartStrategyFactory = new NoOrFixedIfCheckpointingEnabledRestartStrategyFactory();
stackTraceSampleCoordinator = mock(StackTraceSampleCoordinator.class);
backPressureStatsTracker = VoidBackPressureStatsTracker.INSTANCE;
}
......
......@@ -22,7 +22,6 @@ import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
......@@ -81,12 +80,6 @@ public class StreamingJobGraphGenerator {
private static final Logger LOG = LoggerFactory.getLogger(StreamingJobGraphGenerator.class);
/**
* Restart delay used for the FixedDelayRestartStrategy in case checkpointing was enabled but
* no restart strategy has been specified.
*/
private static final long DEFAULT_RESTART_DELAY = 0L;
// ------------------------------------------------------------------------
public static JobGraph createJobGraph(StreamGraph streamGraph) {
......@@ -590,17 +583,9 @@ public class StreamingJobGraphGenerator {
long interval = cfg.getCheckpointInterval();
if (interval > 0) {
ExecutionConfig executionConfig = streamGraph.getExecutionConfig();
// propagate the expected behaviour for checkpoint errors to task.
executionConfig.setFailTaskOnCheckpointError(cfg.isFailOnCheckpointingErrors());
// check if a restart strategy has been set, if not then set the FixedDelayRestartStrategy
if (executionConfig.getRestartStrategy() == null) {
// if the user enabled checkpointing, the default number of exec retries is infinite.
executionConfig.setRestartStrategy(
RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, DEFAULT_RESTART_DELAY));
}
} else {
// interval of max value means disable periodic checkpoint
interval = Long.MAX_VALUE;
......
......@@ -33,12 +33,11 @@ import org.junit.Test;
public class RestartStrategyTest extends TestLogger {
/**
* Tests that in a streaming use case where checkpointing is enabled, a
* fixed delay with Integer.MAX_VALUE retries is instantiated if no other restart
* strategy has been specified.
* Tests that in a streaming use case where checkpointing is enabled, there is no default strategy set on the
* client side.
*/
@Test
public void testAutomaticRestartingWhenCheckpointing() throws Exception {
public void testFallbackStrategyOnClientSideWhenCheckpointingEnabled() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(500);
......@@ -51,8 +50,7 @@ public class RestartStrategyTest extends TestLogger {
jobGraph.getSerializedExecutionConfig().deserializeValue(getClass().getClassLoader()).getRestartStrategy();
Assert.assertNotNull(restartStrategy);
Assert.assertTrue(restartStrategy instanceof RestartStrategies.FixedDelayRestartStrategyConfiguration);
Assert.assertEquals(Integer.MAX_VALUE, ((RestartStrategies.FixedDelayRestartStrategyConfiguration) restartStrategy).getRestartAttempts());
Assert.assertTrue(restartStrategy instanceof RestartStrategies.FallbackRestartStrategyConfiguration);
}
/**
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册