提交 abbb0a93 编写于 作者: S Stephan Ewen

[FLINK-1668] [core] Add a config option to specify delays between restarts

上级 500ddff4
......@@ -40,6 +40,12 @@ public final class ConfigConstants {
* value to 0 effectively disables fault tolerance.
*/
public static final String DEFAULT_EXECUTION_RETRIES_KEY = "execution-retries.default";
/**
* Config parameter for the delay between execution retries. The value must be specified in the
* notation "10 s" or "1 min" (style of Scala Finite Durations)
*/
public static final String DEFAULT_EXECUTION_RETRY_DELAY_KEY = "execution-retries.delay";
// -------------------------------- Runtime -------------------------------
......@@ -339,7 +345,7 @@ public final class ConfigConstants {
public static final String AKKA_ASK_TIMEOUT = "akka.ask.timeout";
/**
* Timeout for all blocking calls
* Timeout for all blocking calls that look up remote actors
*/
public static final String AKKA_LOOKUP_TIMEOUT = "akka.lookup.timeout";
......
......@@ -850,9 +850,21 @@ object JobManager {
ConfigConstants.DEFAULT_EXECUTION_RETRIES_KEY,
ConfigConstants.DEFAULT_EXECUTION_RETRIES)
val delayBetweenRetries = 2 * Duration(configuration.getString(
ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE,
ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT)).toMillis
// configure the delay between execution retries.
// unless explicitly specifies, this is dependent on the heartbeat timeout
val pauseString = configuration.getString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE,
ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT)
val delayString = configuration.getString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY,
pauseString)
val delayBetweenRetries: Long = try {
Duration(delayString).toMillis
}
catch {
case n: NumberFormatException => throw new Exception(
s"Invalid config value for ${ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY}: " +
s"${pauseString}. Value must be a valid duration (such as 100 milli or 1 min)");
}
val archiveProps: Props = Props(classOf[MemoryArchivist], archiveCount)
......
......@@ -49,6 +49,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots)
config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTaskManagers)
config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, heartbeatTimeout)
config.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, heartbeatTimeout)
new TestingCluster(config)
}
......
......@@ -56,8 +56,6 @@ public class AutoParallelismITCase {
Configuration config = new Configuration();
config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, NUM_TM);
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, SLOTS_PER_TM);
config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "2 s");
cluster = new ForkableFlinkMiniCluster(config, false);
}
......
......@@ -125,6 +125,7 @@ public class ProcessFailureBatchRecoveryITCase {
jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "500 ms");
jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "2 s");
jmConfig.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 2);
jmConfig.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "4 s");
jmActorSystem = AkkaUtils.createActorSystem(jmConfig, new Some<Tuple2<String, Object>>(localAddress));
ActorRef jmActor = JobManager.startJobManagerActors(jmConfig, jmActorSystem)._1();
......
......@@ -51,7 +51,7 @@ public class SimpleRecoveryITCase {
Configuration config = new Configuration();
config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 2);
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "2 s");
config.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "100 ms");
cluster = new ForkableFlinkMiniCluster(config, false);
}
......
......@@ -137,6 +137,7 @@ with WordSpecLike with Matchers with BeforeAndAfterAll {
config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTaskmanagers)
config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "1000 ms")
config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "4000 ms")
config.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "8000 ms")
config.setDouble(ConfigConstants.AKKA_WATCH_THRESHOLD, 5)
new ForkableFlinkMiniCluster(config, singleActorSystem = false)
......
......@@ -232,6 +232,7 @@ with WordSpecLike with Matchers with BeforeAndAfterAll {
config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTaskmanagers)
config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "1000 ms")
config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "4000 ms")
config.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "4000 ms")
config.setDouble(ConfigConstants.AKKA_WATCH_THRESHOLD, 5)
new ForkableFlinkMiniCluster(config, singleActorSystem = false)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册