From 57872d53c4584faace6dc8e4038ad1f2d068a453 Mon Sep 17 00:00:00 2001 From: Dawid Wysakowicz Date: Thu, 5 Jul 2018 13:48:23 +0200 Subject: [PATCH] [FLINK-9143] Use cluster strategy if none was set on client side Added NoOrFixedIfCheckpointingEnabledRestartStrategy This closes #6283. --- .../flink/api/common/ExecutionConfig.java | 10 +-- .../restartstrategy/RestartStrategies.java | 48 ++++++++++++- .../runtime/webmonitor/WebFrontendITCase.java | 2 +- ...pointingEnabledRestartStrategyFactory.java | 42 +++++++++++ .../restart/NoRestartStrategy.java | 4 +- .../restart/RestartStrategyFactory.java | 63 ++++++++-------- .../restart/RestartStrategyResolving.java | 66 +++++++++++++++++ .../flink/runtime/jobgraph/JobGraph.java | 20 +++++- .../flink/runtime/jobmaster/JobMaster.java | 16 +++-- .../flink/runtime/jobmanager/JobManager.scala | 20 +++--- .../checkpoint/CoordinatorShutdownTest.java | 19 +++-- .../restart/RestartStrategyResolvingTest.java | 71 +++++++++++++++++++ .../runtime/jobmaster/JobMasterTest.java | 41 +++++++++++ ...estingJobManagerSharedServicesBuilder.java | 4 +- .../api/graph/StreamingJobGraphGenerator.java | 15 ---- .../streaming/api/RestartStrategyTest.java | 10 ++- 16 files changed, 367 insertions(+), 84 deletions(-) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoOrFixedIfCheckpointingEnabledRestartStrategyFactory.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyResolving.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyResolvingTest.java diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java index 7a0a574ad20..59fa803791a 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java @@ -23,6 +23,7 @@ import org.apache.flink.annotation.Public; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.util.Preconditions; import com.esotericsoftware.kryo.Serializer; @@ -138,7 +139,8 @@ public class ExecutionConfig implements Serializable, Archiveable 0 && getExecutionRetryDelay() >= 0) { return RestartStrategies.fixedDelayRestart(getNumberOfExecutionRetries(), getExecutionRetryDelay()); } else if (getNumberOfExecutionRetries() == 0) { return RestartStrategies.noRestart(); } else { - return null; + return restartStrategyConfiguration; } } else { return restartStrategyConfiguration; diff --git a/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java b/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java index f3eb3a5c03c..4f67290ac36 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java @@ -22,6 +22,7 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.time.Time; import java.io.Serializable; +import java.util.Objects; import java.util.concurrent.TimeUnit; /** @@ -106,6 +107,19 @@ public class RestartStrategies { public String getDescription() { return "Restart deactivated."; } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + return o instanceof NoRestartStrategyConfiguration; + } + + @Override + public int hashCode() { + return Objects.hash(); + } } /** @@ -188,6 +202,25 @@ public class RestartStrategies { return "Failure rate restart with maximum of " + maxFailureRate + " failures within interval " + failureInterval.toString() + " and fixed delay " + delayBetweenAttemptsInterval.toString(); } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + FailureRateRestartStrategyConfiguration that = (FailureRateRestartStrategyConfiguration) o; + return maxFailureRate == that.maxFailureRate && + Objects.equals(failureInterval, that.failureInterval) && + Objects.equals(delayBetweenAttemptsInterval, that.delayBetweenAttemptsInterval); + } + + @Override + public int hashCode() { + return Objects.hash(maxFailureRate, failureInterval, delayBetweenAttemptsInterval); + } } /** @@ -195,12 +228,25 @@ public class RestartStrategies { * strategy. Useful especially when one has a custom implementation of restart strategy set via * flink-conf.yaml. */ - public static final class FallbackRestartStrategyConfiguration extends RestartStrategyConfiguration{ + public static final class FallbackRestartStrategyConfiguration extends RestartStrategyConfiguration { private static final long serialVersionUID = -4441787204284085544L; @Override public String getDescription() { return "Cluster level default restart strategy"; } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + return o instanceof FallbackRestartStrategyConfiguration; + } + + @Override + public int hashCode() { + return Objects.hash(); + } } } diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java index fb8258a6b98..b90277fa771 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java @@ -313,7 +313,7 @@ public class WebFrontendITCase extends TestLogger { assertEquals(HttpResponseStatus.OK, response.getStatus()); assertEquals("application/json; charset=UTF-8", response.getType()); assertEquals("{\"jid\":\"" + jid + "\",\"name\":\"Stoppable streaming test job\"," + - "\"execution-config\":{\"execution-mode\":\"PIPELINED\",\"restart-strategy\":\"default\"," + + "\"execution-config\":{\"execution-mode\":\"PIPELINED\",\"restart-strategy\":\"Cluster level default restart strategy\"," + "\"job-parallelism\":-1,\"object-reuse-mode\":false,\"user-config\":{}}}", response.getContent()); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoOrFixedIfCheckpointingEnabledRestartStrategyFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoOrFixedIfCheckpointingEnabledRestartStrategyFactory.java new file mode 100644 index 00000000000..7b5c1a70bfc --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoOrFixedIfCheckpointingEnabledRestartStrategyFactory.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph.restart; + +/** + * Default restart strategy that resolves either to {@link NoRestartStrategy} or {@link FixedDelayRestartStrategy} + * depending if checkpointing was enabled. + */ +public class NoOrFixedIfCheckpointingEnabledRestartStrategyFactory extends RestartStrategyFactory { + private static final long DEFAULT_RESTART_DELAY = 0; + + private static final long serialVersionUID = -1809462525812787862L; + + @Override + public RestartStrategy createRestartStrategy() { + return createRestartStrategy(false); + } + + RestartStrategy createRestartStrategy(boolean isCheckpointingEnabled) { + if (isCheckpointingEnabled) { + return new FixedDelayRestartStrategy(Integer.MAX_VALUE, DEFAULT_RESTART_DELAY); + } else { + return new NoRestartStrategy(); + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java index 5502d2d2eab..b639614c1c3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java @@ -38,10 +38,10 @@ public class NoRestartStrategy implements RestartStrategy { } /** - * Creates a NoRestartStrategy instance. + * Creates a NoRestartStrategyFactory instance. * * @param configuration Configuration object which is ignored - * @return NoRestartStrategy instance + * @return NoRestartStrategyFactory instance */ public static NoRestartStrategyFactory createFactory(Configuration configuration) { return new NoRestartStrategyFactory(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java index 717e1d26447..f15ee0bb097 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java @@ -82,38 +82,41 @@ public abstract class RestartStrategyFactory implements Serializable { * @throws Exception which indicates that the RestartStrategy could not be instantiated. */ public static RestartStrategyFactory createRestartStrategyFactory(Configuration configuration) throws Exception { - String restartStrategyName = configuration.getString(ConfigConstants.RESTART_STRATEGY, "none"); + String restartStrategyName = configuration.getString(ConfigConstants.RESTART_STRATEGY, null); + + if (restartStrategyName == null) { + // support deprecated ConfigConstants values + final int numberExecutionRetries = configuration.getInteger(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, + ConfigConstants.DEFAULT_EXECUTION_RETRIES); + String pauseString = configuration.getString(AkkaOptions.WATCH_HEARTBEAT_PAUSE); + String delayString = configuration.getString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, + pauseString); + + long delay; + + try { + delay = Duration.apply(delayString).toMillis(); + } catch (NumberFormatException nfe) { + if (delayString.equals(pauseString)) { + throw new Exception("Invalid config value for " + + AkkaOptions.WATCH_HEARTBEAT_PAUSE.key() + ": " + pauseString + + ". Value must be a valid duration (such as '10 s' or '1 min')"); + } else { + throw new Exception("Invalid config value for " + + ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY + ": " + delayString + + ". Value must be a valid duration (such as '100 milli' or '10 s')"); + } + } + + if (numberExecutionRetries > 0 && delay >= 0) { + return new FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(numberExecutionRetries, delay); + } else { + return new NoOrFixedIfCheckpointingEnabledRestartStrategyFactory(); + } + } switch (restartStrategyName.toLowerCase()) { case "none": - // support deprecated ConfigConstants values - final int numberExecutionRetries = configuration.getInteger(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, - ConfigConstants.DEFAULT_EXECUTION_RETRIES); - String pauseString = configuration.getString(AkkaOptions.WATCH_HEARTBEAT_PAUSE); - String delayString = configuration.getString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, - pauseString); - - long delay; - - try { - delay = Duration.apply(delayString).toMillis(); - } catch (NumberFormatException nfe) { - if (delayString.equals(pauseString)) { - throw new Exception("Invalid config value for " + - AkkaOptions.WATCH_HEARTBEAT_PAUSE.key() + ": " + pauseString + - ". Value must be a valid duration (such as '10 s' or '1 min')"); - } else { - throw new Exception("Invalid config value for " + - ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY + ": " + delayString + - ". Value must be a valid duration (such as '100 milli' or '10 s')"); - } - } - - if (numberExecutionRetries > 0 && delay >= 0) { - return new FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(numberExecutionRetries, delay); - } else { - return NoRestartStrategy.createFactory(configuration); - } case "off": case "disable": return NoRestartStrategy.createFactory(configuration); @@ -149,7 +152,7 @@ public abstract class RestartStrategyFactory implements Serializable { } // fallback in case of an error - return NoRestartStrategy.createFactory(configuration); + return new NoOrFixedIfCheckpointingEnabledRestartStrategyFactory(); } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyResolving.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyResolving.java new file mode 100644 index 00000000000..ad7aa9395db --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyResolving.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph.restart; + +import org.apache.flink.api.common.restartstrategy.RestartStrategies; + +/** + * Utility method for resolving {@link RestartStrategy}. + */ +public final class RestartStrategyResolving { + + /** + * Resolves which {@link RestartStrategy} to use. It should be used only on the server side. + * The resolving strategy is as follows: + *
    + *
  1. Strategy set within job graph.
  2. + *
  3. Strategy set flink-conf.yaml on the server set, unless is set to {@link NoRestartStrategy} and checkpointing + * is enabled.
  4. + *
  5. If no strategy was set on client and server side and checkpointing was enabled then + * {@link FixedDelayRestartStrategy} is used
  6. + *
+ * + * @param clientConfiguration restart configuration given within the job graph + * @param serverStrategyFactory default server side strategy factory + * @param isCheckpointingEnabled if checkpointing was enabled for the job + * @return resolved strategy + */ + public static RestartStrategy resolve( + RestartStrategies.RestartStrategyConfiguration clientConfiguration, + RestartStrategyFactory serverStrategyFactory, + boolean isCheckpointingEnabled) { + + final RestartStrategy clientSideRestartStrategy = + RestartStrategyFactory.createRestartStrategy(clientConfiguration); + + if (clientSideRestartStrategy != null) { + return clientSideRestartStrategy; + } else { + if (serverStrategyFactory instanceof NoOrFixedIfCheckpointingEnabledRestartStrategyFactory) { + return ((NoOrFixedIfCheckpointingEnabledRestartStrategyFactory) serverStrategyFactory) + .createRestartStrategy(isCheckpointingEnabled); + } else { + return serverStrategyFactory.createRestartStrategy(); + } + } + } + + private RestartStrategyResolving() { + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java index b3e03de5619..377f870ca0a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java @@ -329,7 +329,7 @@ public class JobGraph implements Serializable { * Sets the settings for asynchronous snapshots. A value of {@code null} means that * snapshotting is not enabled. * - * @param settings The snapshot settings, or null, to disable snapshotting. + * @param settings The snapshot settings */ public void setSnapshotSettings(JobCheckpointingSettings settings) { this.snapshotSettings = settings; @@ -339,12 +339,28 @@ public class JobGraph implements Serializable { * Gets the settings for asynchronous snapshots. This method returns null, when * checkpointing is not enabled. * - * @return The snapshot settings, or null, if checkpointing is not enabled. + * @return The snapshot settings */ public JobCheckpointingSettings getCheckpointingSettings() { return snapshotSettings; } + /** + * Checks if the checkpointing was enabled for this job graph + * + * @return true if checkpointing enabled + */ + public boolean isCheckpointingEnabled() { + + if (snapshotSettings == null) { + return false; + } + + long checkpointInterval = snapshotSettings.getCheckpointCoordinatorConfiguration().getCheckpointInterval(); + return checkpointInterval > 0 && + checkpointInterval < Long.MAX_VALUE; + } + /** * Searches for a vertex with a matching ID and returns it. * diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 7557bc30aa0..1660f95d267 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.jobmaster; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.time.Time; @@ -51,7 +52,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.executiongraph.IntermediateResult; import org.apache.flink.runtime.executiongraph.JobStatusListener; import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; -import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory; +import org.apache.flink.runtime.executiongraph.restart.RestartStrategyResolving; import org.apache.flink.runtime.heartbeat.HeartbeatListener; import org.apache.flink.runtime.heartbeat.HeartbeatManager; import org.apache.flink.runtime.heartbeat.HeartbeatServices; @@ -273,11 +274,11 @@ public class JobMaster extends FencedRpcEndpoint implements JobMast .deserializeValue(userCodeLoader) .getRestartStrategy(); - this.restartStrategy = (restartStrategyConfiguration != null) ? - RestartStrategyFactory.createRestartStrategy(restartStrategyConfiguration) : - jobManagerSharedServices.getRestartStrategyFactory().createRestartStrategy(); + this.restartStrategy = RestartStrategyResolving.resolve(restartStrategyConfiguration, + jobManagerSharedServices.getRestartStrategyFactory(), + jobGraph.isCheckpointingEnabled()); - log.info("Using restart strategy {} for {} ({}).", restartStrategy, jobName, jid); + log.info("Using restart strategy {} for {} ({}).", this.restartStrategy, jobName, jid); resourceManagerLeaderRetriever = highAvailabilityServices.getResourceManagerLeaderRetriever(); @@ -1649,4 +1650,9 @@ public class JobMaster extends FencedRpcEndpoint implements JobMast return CompletableFuture.completedFuture(null); } } + + @VisibleForTesting + RestartStrategy getRestartStrategy() { + return restartStrategy; + } } diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index cebff5881ac..1c8174f48ce 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -49,7 +49,7 @@ import org.apache.flink.runtime.execution.SuppressRestartsException import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders.ResolveOrder import org.apache.flink.runtime.execution.librarycache.{BlobLibraryCacheManager, LibraryCacheManager} import org.apache.flink.runtime.executiongraph._ -import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory +import org.apache.flink.runtime.executiongraph.restart.{RestartStrategyFactory, RestartStrategyResolving} import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.AddressResolution import org.apache.flink.runtime.highavailability.{HighAvailabilityServices, HighAvailabilityServicesUtils} import org.apache.flink.runtime.instance.{AkkaActorGateway, InstanceID, InstanceManager} @@ -1250,15 +1250,15 @@ class JobManager( throw new JobSubmissionException(jobId, "The given job is empty") } - val restartStrategy = - Option(jobGraph.getSerializedExecutionConfig() - .deserializeValue(userCodeLoader) - .getRestartStrategy()) - .map(RestartStrategyFactory.createRestartStrategy) - .filter(p => p != null) match { - case Some(strategy) => strategy - case None => restartStrategyFactory.createRestartStrategy() - } + val restartStrategyConfiguration = jobGraph + .getSerializedExecutionConfig + .deserializeValue(userCodeLoader) + .getRestartStrategy + + val restartStrategy = RestartStrategyResolving + .resolve(restartStrategyConfiguration, + restartStrategyFactory, + jobGraph.isCheckpointingEnabled) log.info(s"Using restart strategy $restartStrategy for $jobId.") diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java index 8a6a9d8d47e..f6b7730a72e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java @@ -18,6 +18,8 @@ package org.apache.flink.runtime.checkpoint; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.TaskManagerOptions; @@ -40,14 +42,14 @@ import org.apache.flink.util.TestLogger; import org.junit.Test; -import scala.concurrent.Await; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; - import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; + import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -68,7 +70,10 @@ public class CoordinatorShutdownTest extends TestLogger { JobVertex vertex = new JobVertex("Test Vertex"); vertex.setInvokableClass(FailingBlockingInvokable.class); List vertexIdList = Collections.singletonList(vertex.getID()); - + + final ExecutionConfig executionConfig = new ExecutionConfig(); + executionConfig.setRestartStrategy(RestartStrategies.noRestart()); + JobGraph testGraph = new JobGraph("test job", vertex); testGraph.setSnapshotSettings( new JobCheckpointingSettings( @@ -83,7 +88,9 @@ public class CoordinatorShutdownTest extends TestLogger { CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, true), null)); - + testGraph.setExecutionConfig(executionConfig); + + ActorGateway jmGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION()); FiniteDuration timeout = new FiniteDuration(60, TimeUnit.SECONDS); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyResolvingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyResolvingTest.java new file mode 100644 index 00000000000..4194e974e69 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyResolvingTest.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph.restart; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import static org.apache.flink.api.common.restartstrategy.RestartStrategies.fallBackRestart; +import static org.apache.flink.api.common.restartstrategy.RestartStrategies.noRestart; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.instanceOf; + +/** + * Tests for {@link RestartStrategyResolving}. + */ +public class RestartStrategyResolvingTest extends TestLogger { + + @Test + public void testClientSideHighestPriority() { + RestartStrategy resolvedStrategy = RestartStrategyResolving.resolve(noRestart(), + new FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(2, 1000L), + true); + + assertThat(resolvedStrategy, instanceOf(NoRestartStrategy.class)); + } + + @Test + public void testFixedStrategySetWhenCheckpointingEnabled() { + RestartStrategy resolvedStrategy = RestartStrategyResolving.resolve(fallBackRestart(), + new NoOrFixedIfCheckpointingEnabledRestartStrategyFactory(), + true); + + assertThat(resolvedStrategy, instanceOf(FixedDelayRestartStrategy.class)); + } + + @Test + public void testServerStrategyIsUsedSetWhenCheckpointingEnabled() { + RestartStrategy resolvedStrategy = RestartStrategyResolving.resolve(fallBackRestart(), + new FailureRateRestartStrategy.FailureRateRestartStrategyFactory(5, Time.seconds(5), Time.seconds(2)), + true); + + assertThat(resolvedStrategy, instanceOf(FailureRateRestartStrategy.class)); + } + + @Test + public void testServerStrategyIsUsedSetWhenCheckpointingDisabled() { + RestartStrategy resolvedStrategy = RestartStrategyResolving.resolve(fallBackRestart(), + new FailureRateRestartStrategy.FailureRateRestartStrategyFactory(5, Time.seconds(5), Time.seconds(2)), + false); + + assertThat(resolvedStrategy, instanceOf(FailureRateRestartStrategy.class)); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java index d7dc017a02c..82fdc94a2a6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java @@ -46,6 +46,9 @@ import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy; +import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; +import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; @@ -110,6 +113,8 @@ import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; /** * Tests for {@link JobMaster}. @@ -357,6 +362,42 @@ public class JobMasterTest extends TestLogger { } } + /** + * Tests that in a streaming use case where checkpointing is enabled, a + * fixed delay with Integer.MAX_VALUE retries is instantiated if no other restart + * strategy has been specified. + */ + @Test + public void testAutomaticRestartingWhenCheckpointing() throws Exception { + // create savepoint data + final long savepointId = 42L; + final File savepointFile = createSavepoint(savepointId); + + // set savepoint settings + final SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.forPath( + savepointFile.getAbsolutePath(), + true); + final JobGraph jobGraph = createJobGraphWithCheckpointing(savepointRestoreSettings); + + final StandaloneCompletedCheckpointStore completedCheckpointStore = new StandaloneCompletedCheckpointStore(1); + final TestingCheckpointRecoveryFactory testingCheckpointRecoveryFactory = new TestingCheckpointRecoveryFactory( + completedCheckpointStore, + new StandaloneCheckpointIDCounter()); + haServices.setCheckpointRecoveryFactory(testingCheckpointRecoveryFactory); + final JobMaster jobMaster = createJobMaster( + new Configuration(), + jobGraph, + haServices, + new TestingJobManagerSharedServicesBuilder() + .setRestartStrategyFactory(RestartStrategyFactory.createRestartStrategyFactory(configuration)) + .build()); + + RestartStrategy restartStrategy = jobMaster.getRestartStrategy(); + + assertNotNull(restartStrategy); + assertTrue(restartStrategy instanceof FixedDelayRestartStrategy); + } + /** * Tests that an existing checkpoint will have precedence over an savepoint */ diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerSharedServicesBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerSharedServicesBuilder.java index f0b232a4d7e..030e4e67e7a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerSharedServicesBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerSharedServicesBuilder.java @@ -19,7 +19,7 @@ package org.apache.flink.runtime.jobmaster; import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager; -import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; +import org.apache.flink.runtime.executiongraph.restart.NoOrFixedIfCheckpointingEnabledRestartStrategyFactory; import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory; import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker; import org.apache.flink.runtime.rest.handler.legacy.backpressure.StackTraceSampleCoordinator; @@ -48,7 +48,7 @@ public class TestingJobManagerSharedServicesBuilder { public TestingJobManagerSharedServicesBuilder() { scheduledExecutorService = TestingUtils.defaultExecutor(); libraryCacheManager = mock(LibraryCacheManager.class); - restartStrategyFactory = new NoRestartStrategy.NoRestartStrategyFactory(); + restartStrategyFactory = new NoOrFixedIfCheckpointingEnabledRestartStrategyFactory(); stackTraceSampleCoordinator = mock(StackTraceSampleCoordinator.class); backPressureStatsTracker = VoidBackPressureStatsTracker.INSTANCE; } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java index 603b9e411be..e905eac60f1 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java @@ -22,7 +22,6 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.common.operators.ResourceSpec; import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper; -import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.IllegalConfigurationException; @@ -81,12 +80,6 @@ public class StreamingJobGraphGenerator { private static final Logger LOG = LoggerFactory.getLogger(StreamingJobGraphGenerator.class); - /** - * Restart delay used for the FixedDelayRestartStrategy in case checkpointing was enabled but - * no restart strategy has been specified. - */ - private static final long DEFAULT_RESTART_DELAY = 0L; - // ------------------------------------------------------------------------ public static JobGraph createJobGraph(StreamGraph streamGraph) { @@ -590,17 +583,9 @@ public class StreamingJobGraphGenerator { long interval = cfg.getCheckpointInterval(); if (interval > 0) { - ExecutionConfig executionConfig = streamGraph.getExecutionConfig(); // propagate the expected behaviour for checkpoint errors to task. executionConfig.setFailTaskOnCheckpointError(cfg.isFailOnCheckpointingErrors()); - - // check if a restart strategy has been set, if not then set the FixedDelayRestartStrategy - if (executionConfig.getRestartStrategy() == null) { - // if the user enabled checkpointing, the default number of exec retries is infinite. - executionConfig.setRestartStrategy( - RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, DEFAULT_RESTART_DELAY)); - } } else { // interval of max value means disable periodic checkpoint interval = Long.MAX_VALUE; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/RestartStrategyTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/RestartStrategyTest.java index b231beac904..03b5a53bb60 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/RestartStrategyTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/RestartStrategyTest.java @@ -33,12 +33,11 @@ import org.junit.Test; public class RestartStrategyTest extends TestLogger { /** - * Tests that in a streaming use case where checkpointing is enabled, a - * fixed delay with Integer.MAX_VALUE retries is instantiated if no other restart - * strategy has been specified. + * Tests that in a streaming use case where checkpointing is enabled, there is no default strategy set on the + * client side. */ @Test - public void testAutomaticRestartingWhenCheckpointing() throws Exception { + public void testFallbackStrategyOnClientSideWhenCheckpointingEnabled() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(500); @@ -51,8 +50,7 @@ public class RestartStrategyTest extends TestLogger { jobGraph.getSerializedExecutionConfig().deserializeValue(getClass().getClassLoader()).getRestartStrategy(); Assert.assertNotNull(restartStrategy); - Assert.assertTrue(restartStrategy instanceof RestartStrategies.FixedDelayRestartStrategyConfiguration); - Assert.assertEquals(Integer.MAX_VALUE, ((RestartStrategies.FixedDelayRestartStrategyConfiguration) restartStrategy).getRestartAttempts()); + Assert.assertTrue(restartStrategy instanceof RestartStrategies.FallbackRestartStrategyConfiguration); } /** -- GitLab