diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java index 246a75c9fd87ae860b9daa602b9e4c9ff41436e7..eb9f3c5f83bdcb45264d9bf5fa2ca1a6583d98b2 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java @@ -21,6 +21,7 @@ package org.apache.flink.client.program; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; +import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.testutils.CommonTestUtils; @@ -92,8 +93,8 @@ public class ClientConnectionTest extends TestLogger { private static void testFailureBehavior(final InetSocketAddress unreachableEndpoint) throws Exception { final Configuration config = new Configuration(); - config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, (ASK_STARTUP_TIMEOUT) + " ms"); - config.setString(ConfigConstants.AKKA_LOOKUP_TIMEOUT, (CONNECT_TIMEOUT) + " ms"); + config.setString(AkkaOptions.ASK_TIMEOUT, ASK_STARTUP_TIMEOUT + " ms"); + config.setString(AkkaOptions.LOOKUP_TIMEOUT, CONNECT_TIMEOUT + " ms"); config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, unreachableEndpoint.getHostName()); config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, unreachableEndpoint.getPort()); diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java index b7ade2a5e09fe949d6931c6c7e060ada510d9d60..13a25642f5e8a5902d38fe2b500882733b95e82f 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java @@ -32,6 +32,7 @@ import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.client.program.DetachedEnvironment.DetachedJobExecutionResult; +import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.optimizer.DataStatistics; @@ -97,7 +98,7 @@ public class ClientTest extends TestLogger { config = new Configuration(); config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost"); config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, freePort); - config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT); + config.setString(AkkaOptions.ASK_TIMEOUT, AkkaOptions.ASK_TIMEOUT.defaultValue()); try { scala.Tuple2 address = new scala.Tuple2("localhost", freePort); diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java index 21794f9fefc36d505acb772a66b854049694e977..626335d948e2f9508d6caae81adcb60949b95559 100644 --- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java +++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java @@ -39,6 +39,7 @@ import org.apache.flink.client.program.ClusterClient; import org.apache.flink.client.program.JobWithJars; import org.apache.flink.client.program.ProgramInvocationException; import org.apache.flink.client.program.StandaloneClusterClient; +import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.GlobalConfiguration; @@ -269,7 +270,7 @@ public class FlinkClient { JobID getTopologyJobId(final String id) { final Configuration configuration = GlobalConfiguration.loadConfiguration(); if (this.timeout != null) { - configuration.setString(ConfigConstants.AKKA_ASK_TIMEOUT, this.timeout); + configuration.setString(AkkaOptions.ASK_TIMEOUT, this.timeout); } try { @@ -309,7 +310,7 @@ public class FlinkClient { private FiniteDuration getTimeout() { final Configuration configuration = GlobalConfiguration.loadConfiguration(); if (this.timeout != null) { - configuration.setString(ConfigConstants.AKKA_ASK_TIMEOUT, this.timeout); + configuration.setString(AkkaOptions.ASK_TIMEOUT, this.timeout); } return AkkaUtils.getClientTimeout(configuration); diff --git a/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java index 97b209e6136a9b6f1455b5f8adce9dcf6852d99f..9bfc237546473f6dde2c4f66649fe22cbc12985f 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java @@ -29,30 +29,114 @@ import org.apache.flink.annotation.PublicEvolving; public class AkkaOptions { /** - * Timeout for akka ask calls + * Timeout for akka ask calls. */ - public static final ConfigOption AKKA_ASK_TIMEOUT = ConfigOptions + public static final ConfigOption ASK_TIMEOUT = ConfigOptions .key("akka.ask.timeout") .defaultValue("10 s"); + /** + * The Akka death watch heartbeat interval. + */ + public static final ConfigOption WATCH_HEARTBEAT_INTERVAL = ConfigOptions + .key("akka.watch.heartbeat.interval") + .defaultValue(ASK_TIMEOUT.defaultValue()); + + /** + * The maximum acceptable Akka death watch heartbeat pause. + */ + public static final ConfigOption WATCH_HEARTBEAT_PAUSE = ConfigOptions + .key("akka.watch.heartbeat.pause") + .defaultValue(ASK_TIMEOUT.defaultValue()); + /** * The Akka tcp connection timeout. */ - public static final ConfigOption AKKA_TCP_TIMEOUT = ConfigOptions + public static final ConfigOption TCP_TIMEOUT = ConfigOptions .key("akka.tcp.timeout") .defaultValue("20 s"); /** - * The Akka death watch heartbeat interval. + * Timeout for the startup of the actor system. */ - public static final ConfigOption AKKA_WATCH_HEARTBEAT_INTERVAL = ConfigOptions - .key("akka.watch.heartbeat.interval") + public static final ConfigOption STARTUP_TIMEOUT = ConfigOptions + .key("akka.startup-timeout") + .noDefaultValue(); + + /** + * Heartbeat interval of the transport failure detector. + */ + public static final ConfigOption TRANSPORT_HEARTBEAT_INTERVAL = ConfigOptions + .key("akka.transport.heartbeat.interval") + .defaultValue("1000 s"); + + /** + * Allowed heartbeat pause for the transport failure detector. + */ + public static final ConfigOption TRANSPORT_HEARTBEAT_PAUSE = ConfigOptions + .key("akka.transport.heartbeat.pause") + .defaultValue("6000 s"); + + /** + * Detection threshold of transport failure detector. + */ + public static final ConfigOption TRANSPORT_THRESHOLD = ConfigOptions + .key("akka.transport.threshold") + .defaultValue(300.0); + + /** + * Detection threshold for the phi accrual watch failure detector. + */ + public static final ConfigOption WATCH_THRESHOLD = ConfigOptions + .key("akka.watch.threshold") + .defaultValue(12); + + /** + * Override SSL support for the Akka transport. + */ + public static final ConfigOption SSL_ENABLED = ConfigOptions + .key("akka.ssl.enabled") + .defaultValue(true); + + /** + * Maximum framesize of akka messages. + */ + public static final ConfigOption FRAMESIZE = ConfigOptions + .key("akka.framesize") + .defaultValue("10485760b"); + + /** + * Maximum number of messages until another actor is executed by the same thread. + */ + public static final ConfigOption DISPATCHER_THROUGHPUT = ConfigOptions + .key("akka.throughput") + .defaultValue(15); + + /** + * Log lifecycle events. + */ + public static final ConfigOption LOG_LIFECYCLE_EVENTS = ConfigOptions + .key("akka.log.lifecycle.events") + .defaultValue(false); + + /** + * Timeout for all blocking calls that look up remote actors. + */ + public static final ConfigOption LOOKUP_TIMEOUT = ConfigOptions + .key("akka.lookup.timeout") .defaultValue("10 s"); /** - * The maximum acceptable Akka death watch heartbeat pause. + * Timeout for all blocking calls on the client side. */ - public static final ConfigOption AKKA_WATCH_HEARTBEAT_PAUSE = ConfigOptions - .key("akka.watch.heartbeat.pause") + public static final ConfigOption CLIENT_TIMEOUT = ConfigOptions + .key("akka.client.timeout") .defaultValue("60 s"); + + /** + * Exit JVM on fatal Akka errors. + */ + public static final ConfigOption JVM_EXIT_ON_FATAL_ERROR = ConfigOptions + .key("akka.jvm-exit-on-fatal-error") + .defaultValue(true); } diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index b5b5486b350edeae6fa9ac39aee9d6ca7f554f05..65e6c762f5f66aac0f0b43ca0447229267903c54 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -708,82 +708,130 @@ public final class ConfigConstants { /** * Timeout for the startup of the actor system + * + * @deprecated Use {@link AkkaOptions#STARTUP_TIMEOUT} instead. */ + @Deprecated public static final String AKKA_STARTUP_TIMEOUT = "akka.startup-timeout"; /** * Heartbeat interval of the transport failure detector + * + * @deprecated Use {@link AkkaOptions#TRANSPORT_HEARTBEAT_INTERVAL} instead. */ + @Deprecated public static final String AKKA_TRANSPORT_HEARTBEAT_INTERVAL = "akka.transport.heartbeat.interval"; /** * Allowed heartbeat pause for the transport failure detector + * + * @deprecated Use {@link AkkaOptions#TRANSPORT_HEARTBEAT_PAUSE} instead. */ + @Deprecated public static final String AKKA_TRANSPORT_HEARTBEAT_PAUSE = "akka.transport.heartbeat.pause"; /** * Detection threshold of transport failure detector + * + * @deprecated Use {@link AkkaOptions#TRANSPORT_THRESHOLD} instead. */ + @Deprecated public static final String AKKA_TRANSPORT_THRESHOLD = "akka.transport.threshold"; /** * Heartbeat interval of watch failure detector + * + * @deprecated Use {@link AkkaOptions#WATCH_HEARTBEAT_INTERVAL} instead. */ + @Deprecated public static final String AKKA_WATCH_HEARTBEAT_INTERVAL = "akka.watch.heartbeat.interval"; /** * Allowed heartbeat pause for the watch failure detector + * + * @deprecated Use {@link AkkaOptions#WATCH_HEARTBEAT_PAUSE} instead. */ + @Deprecated public static final String AKKA_WATCH_HEARTBEAT_PAUSE = "akka.watch.heartbeat.pause"; /** * Detection threshold for the phi accrual watch failure detector + * + * @deprecated Use {@link AkkaOptions#WATCH_THRESHOLD} instead. */ + @Deprecated public static final String AKKA_WATCH_THRESHOLD = "akka.watch.threshold"; /** * Akka TCP timeout + * + * @deprecated Use {@link AkkaOptions#TCP_TIMEOUT} instead. */ + @Deprecated public static final String AKKA_TCP_TIMEOUT = "akka.tcp.timeout"; /** * Override SSL support for the Akka transport + * + * @deprecated Use {@link AkkaOptions#SSL_ENABLED} instead. */ + @Deprecated public static final String AKKA_SSL_ENABLED = "akka.ssl.enabled"; /** * Maximum framesize of akka messages + * + * @deprecated Use {@link AkkaOptions#FRAMESIZE} instead. */ + @Deprecated public static final String AKKA_FRAMESIZE = "akka.framesize"; /** * Maximum number of messages until another actor is executed by the same thread + * + * @deprecated Use {@link AkkaOptions#DISPATCHER_THROUGHPUT} instead. */ + @Deprecated public static final String AKKA_DISPATCHER_THROUGHPUT = "akka.throughput"; /** * Log lifecycle events + * + * @deprecated Use {@link AkkaOptions#LOG_LIFECYCLE_EVENTS} instead. */ + @Deprecated public static final String AKKA_LOG_LIFECYCLE_EVENTS = "akka.log.lifecycle.events"; /** * Timeout for all blocking calls on the cluster side + * + * @deprecated Use {@link AkkaOptions#ASK_TIMEOUT} instead. */ + @Deprecated public static final String AKKA_ASK_TIMEOUT = "akka.ask.timeout"; /** * Timeout for all blocking calls that look up remote actors + * + * @deprecated Use {@link AkkaOptions#LOOKUP_TIMEOUT} instead. */ + @Deprecated public static final String AKKA_LOOKUP_TIMEOUT = "akka.lookup.timeout"; /** * Timeout for all blocking calls on the client side + * + * @deprecated Use {@link AkkaOptions#CLIENT_TIMEOUT} instead. */ + @Deprecated public static final String AKKA_CLIENT_TIMEOUT = "akka.client.timeout"; /** * Exit JVM on fatal Akka errors + * + * @deprecated Use {@link AkkaOptions#JVM_EXIT_ON_FATAL_ERROR} instead. */ + @Deprecated public static final String AKKA_JVM_EXIT_ON_FATAL_ERROR = "akka.jvm-exit-on-fatal-error"; // ----------------------------- Transport SSL Settings-------------------- @@ -1425,26 +1473,70 @@ public final class ConfigConstants { // ------------------------------ Akka Values ------------------------------ + /** + * @deprecated Use {@link AkkaOptions#TRANSPORT_HEARTBEAT_INTERVAL} instead. + */ + @Deprecated public static String DEFAULT_AKKA_TRANSPORT_HEARTBEAT_INTERVAL = "1000 s"; + /** + * @deprecated Use {@link AkkaOptions#TRANSPORT_HEARTBEAT_PAUSE} instead. + */ + @Deprecated public static String DEFAULT_AKKA_TRANSPORT_HEARTBEAT_PAUSE = "6000 s"; + /** + * @deprecated Use {@link AkkaOptions#TRANSPORT_THRESHOLD} instead. + */ + @Deprecated public static double DEFAULT_AKKA_TRANSPORT_THRESHOLD = 300.0; + /** + * @deprecated Use {@link AkkaOptions#WATCH_THRESHOLD} instead. + */ + @Deprecated public static double DEFAULT_AKKA_WATCH_THRESHOLD = 12; + /** + * @deprecated Use {@link AkkaOptions#DISPATCHER_THROUGHPUT} instead. + */ + @Deprecated public static int DEFAULT_AKKA_DISPATCHER_THROUGHPUT = 15; + /** + * @deprecated Use {@link AkkaOptions#LOG_LIFECYCLE_EVENTS} instead. + */ + @Deprecated public static boolean DEFAULT_AKKA_LOG_LIFECYCLE_EVENTS = false; + /** + * @deprecated Use {@link AkkaOptions#FRAMESIZE} instead. + */ + @Deprecated public static String DEFAULT_AKKA_FRAMESIZE = "10485760b"; + /** + * @deprecated Use {@link AkkaOptions#ASK_TIMEOUT} instead. + */ + @Deprecated public static String DEFAULT_AKKA_ASK_TIMEOUT = "10 s"; + /** + * @deprecated Use {@link AkkaOptions#LOOKUP_TIMEOUT} instead. + */ + @Deprecated public static String DEFAULT_AKKA_LOOKUP_TIMEOUT = "10 s"; + /** + * @deprecated Use {@link AkkaOptions#CLIENT_TIMEOUT} instead. + */ + @Deprecated public static String DEFAULT_AKKA_CLIENT_TIMEOUT = "60 s"; + /** + * @deprecated Use {@link AkkaOptions#SSL_ENABLED} instead. + */ + @Deprecated public static boolean DEFAULT_AKKA_SSL_ENABLED = true; // ----------------------------- SSL Values -------------------------------- diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java index 206c71beacec79b50ffa4b6fb1d3ab318c6c0af7..625880b445413ceaa6c2406415d7c608c5a27fad 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java @@ -26,6 +26,7 @@ import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.Options; import org.apache.commons.cli.PosixParser; +import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.GlobalConfiguration; @@ -108,7 +109,7 @@ public class MesosTaskManagerRunner { } // tell akka to die in case of an error - configuration.setBoolean(ConfigConstants.AKKA_JVM_EXIT_ON_FATAL_ERROR, true); + configuration.setBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, true); // Infer the resource identifier from the environment variable String containerID = Preconditions.checkNotNull(envs.get(MesosConfigKeys.ENV_FLINK_CONTAINER_ID)); diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java index 4f9214821e90acb1a88ca83d6680a0f43610f0a5..c0dcc9911a24704032b3230926a5d259bc122015 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java @@ -23,7 +23,7 @@ import akka.dispatch.OnFailure; import akka.dispatch.OnSuccess; import akka.pattern.Patterns; import akka.util.Timeout; -import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.instance.Instance; import org.apache.flink.runtime.messages.JobManagerMessages; @@ -61,7 +61,7 @@ public class MetricFetcher { private final ActorSystem actorSystem; private final JobManagerRetriever retriever; private final ExecutionContext ctx; - private final FiniteDuration timeout = new FiniteDuration(Duration.create(ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT).toMillis(), TimeUnit.MILLISECONDS); + private final FiniteDuration timeout = new FiniteDuration(Duration.create(AkkaOptions.ASK_TIMEOUT.defaultValue()).toMillis(), TimeUnit.MILLISECONDS); private MetricStore metrics = new MetricStore(); private MetricDumpDeserializer deserializer = new MetricDumpDeserializer(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobAttachmentClientActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobAttachmentClientActor.java index ffab9cc77636a4a52836298267042e5363b50f7a..9451e20e45f0a251729833b9fd71266403b37032 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobAttachmentClientActor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobAttachmentClientActor.java @@ -23,7 +23,7 @@ import akka.actor.Props; import akka.actor.Status; import akka.dispatch.Futures; import org.apache.flink.api.common.JobID; -import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.runtime.akka.ListeningBehaviour; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.messages.JobClientMessages; @@ -114,7 +114,7 @@ public class JobAttachmentClientActor extends JobClientActor { client.tell( decorateMessage(new Status.Failure( new JobClientActorRegistrationTimeoutException("Registration for Job at the JobManager " + - "timed out. " + "You may increase '" + ConfigConstants.AKKA_CLIENT_TIMEOUT + + "timed out. " + "You may increase '" + AkkaOptions.CLIENT_TIMEOUT.key() + "' in case the JobManager needs more time to confirm the job client registration."))), getSelf()); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java index a3fee21525dfa19c1ab918fae3b15d8da75e3615..babb0f66dfc53a2a0c6c36f69eab72065db32ec5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java @@ -22,7 +22,7 @@ import akka.actor.ActorRef; import akka.actor.Props; import akka.actor.Status; import akka.dispatch.Futures; -import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.ListeningBehaviour; import org.apache.flink.runtime.instance.ActorGateway; @@ -119,7 +119,7 @@ public class JobSubmissionClientActor extends JobClientActor { client.tell( decorateMessage(new Status.Failure( new JobClientActorSubmissionTimeoutException("Job submission to the JobManager timed out. " + - "You may increase '" + ConfigConstants.AKKA_CLIENT_TIMEOUT + "' in case the JobManager " + + "You may increase '" + AkkaOptions.CLIENT_TIMEOUT.key() + "' in case the JobManager " + "needs more time to configure and confirm the job submission."))), getSelf()); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java index 77dbad45a17e3e928798ff92abd77530f14154ea..f9c39c1c92a2e425b023f3f72f7eec34dbb6466c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java @@ -26,7 +26,7 @@ import akka.dispatch.OnComplete; import akka.pattern.Patterns; import akka.util.Timeout; -import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.akka.FlinkUntypedActor; @@ -148,7 +148,7 @@ public abstract class FlinkResourceManager timeoutOption = ConfigOptions - .key(ConfigConstants.AKKA_ASK_TIMEOUT) - .defaultValue(ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT); - - final String strTimeout = configuration.getString(timeoutOption); + final String strTimeout = configuration.getString(AkkaOptions.ASK_TIMEOUT); final Time timeout; try { timeout = Time.milliseconds(Duration.apply(strTimeout).toMillis()); } catch (NumberFormatException e) { throw new ConfigurationException("Could not parse the resource manager's timeout " + - "value " + timeoutOption + '.', e); + "value " + AkkaOptions.ASK_TIMEOUT + '.', e); } return new SlotManagerConfiguration(timeout, timeout, timeout); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java index 8789eedf0427bfaeb718bf5a38f08c2e82fb1b29..810efff659e86d6c40dc35342e7145584e520b06 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java @@ -21,7 +21,7 @@ package org.apache.flink.runtime.rpc.akka; import akka.actor.ActorSystem; import com.typesafe.config.Config; import org.apache.flink.api.common.time.Time; -import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; @@ -130,9 +130,7 @@ public class AkkaRpcServiceUtils { checkNotNull(config, "config is null"); - final boolean sslEnabled = config.getBoolean( - ConfigConstants.AKKA_SSL_ENABLED, - ConfigConstants.DEFAULT_AKKA_SSL_ENABLED) && + final boolean sslEnabled = config.getBoolean(AkkaOptions.SSL_ENABLED) && SSLUtils.getSSLEnabled(config); return getRpcUrl( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java index a6e47481084a6ad3c4b2b1156a0ce0c505ec79ae..ea9f5767b01571d6c8e6cb76fe52107d55f114b1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.taskexecutor; import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.TaskManagerOptions; @@ -146,7 +147,7 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo { timeout = Time.milliseconds(AkkaUtils.getTimeout(configuration).toMillis()); } catch (Exception e) { throw new IllegalArgumentException( - "Invalid format for '" + ConfigConstants.AKKA_ASK_TIMEOUT + + "Invalid format for '" + AkkaOptions.ASK_TIMEOUT.key() + "'.Use formats like '50 s' or '1 min' to specify the timeout."); } diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala index 62fa73d1ddb3195db5411f1c3de6f872efe77630..60a33ba5f9e6b1363431641c8576b7ed3e1490f9 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala @@ -184,13 +184,11 @@ object AkkaUtils { * @return Flink's basic Akka config */ private def getBasicAkkaConfig(configuration: Configuration): Config = { - val akkaThroughput = configuration.getInteger(ConfigConstants.AKKA_DISPATCHER_THROUGHPUT, - ConfigConstants.DEFAULT_AKKA_DISPATCHER_THROUGHPUT) - val lifecycleEvents = configuration.getBoolean(ConfigConstants.AKKA_LOG_LIFECYCLE_EVENTS, - ConfigConstants.DEFAULT_AKKA_LOG_LIFECYCLE_EVENTS) + val akkaThroughput = configuration.getInteger(AkkaOptions.DISPATCHER_THROUGHPUT) + val lifecycleEvents = configuration.getBoolean(AkkaOptions.LOG_LIFECYCLE_EVENTS) val jvmExitOnFatalError = if ( - configuration.getBoolean(ConfigConstants.AKKA_JVM_EXIT_ON_FATAL_ERROR, true)){ + configuration.getBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR)){ "on" } else { "off" @@ -269,48 +267,36 @@ object AkkaUtils { bindAddress: String, port: Int, externalHostname: String, externalPort: Int): Config = { - val akkaAskTimeout = Duration(configuration.getString( - ConfigConstants.AKKA_ASK_TIMEOUT, - ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT)) + val akkaAskTimeout = Duration(configuration.getString(AkkaOptions.ASK_TIMEOUT)) val startupTimeout = configuration.getString( - ConfigConstants.AKKA_STARTUP_TIMEOUT, + AkkaOptions.STARTUP_TIMEOUT, (akkaAskTimeout * 10).toString) val transportHeartbeatInterval = configuration.getString( - ConfigConstants.AKKA_TRANSPORT_HEARTBEAT_INTERVAL, - ConfigConstants.DEFAULT_AKKA_TRANSPORT_HEARTBEAT_INTERVAL) + AkkaOptions.TRANSPORT_HEARTBEAT_INTERVAL) val transportHeartbeatPause = configuration.getString( - ConfigConstants.AKKA_TRANSPORT_HEARTBEAT_PAUSE, - ConfigConstants.DEFAULT_AKKA_TRANSPORT_HEARTBEAT_PAUSE) + AkkaOptions.TRANSPORT_HEARTBEAT_PAUSE) - val transportThreshold = configuration.getDouble( - ConfigConstants.AKKA_TRANSPORT_THRESHOLD, - ConfigConstants.DEFAULT_AKKA_TRANSPORT_THRESHOLD) + val transportThreshold = configuration.getDouble(AkkaOptions.TRANSPORT_THRESHOLD) - val watchHeartbeatInterval = configuration.getString(AkkaOptions.AKKA_WATCH_HEARTBEAT_INTERVAL); + val watchHeartbeatInterval = configuration.getString( + AkkaOptions.WATCH_HEARTBEAT_INTERVAL) - val watchHeartbeatPause = configuration.getString(AkkaOptions.AKKA_WATCH_HEARTBEAT_PAUSE); + val watchHeartbeatPause = configuration.getString(AkkaOptions.WATCH_HEARTBEAT_PAUSE) - val watchThreshold = configuration.getDouble( - ConfigConstants.AKKA_WATCH_THRESHOLD, - ConfigConstants.DEFAULT_AKKA_WATCH_THRESHOLD) + val watchThreshold = configuration.getInteger(AkkaOptions.WATCH_THRESHOLD) - val akkaTCPTimeout = configuration.getString(AkkaOptions.AKKA_TCP_TIMEOUT); + val akkaTCPTimeout = configuration.getString(AkkaOptions.TCP_TIMEOUT) - val akkaFramesize = configuration.getString( - ConfigConstants.AKKA_FRAMESIZE, - ConfigConstants.DEFAULT_AKKA_FRAMESIZE) + val akkaFramesize = configuration.getString(AkkaOptions.FRAMESIZE) - val lifecycleEvents = configuration.getBoolean( - ConfigConstants.AKKA_LOG_LIFECYCLE_EVENTS, - ConfigConstants.DEFAULT_AKKA_LOG_LIFECYCLE_EVENTS) + val lifecycleEvents = configuration.getBoolean(AkkaOptions.LOG_LIFECYCLE_EVENTS) val logLifecycleEvents = if (lifecycleEvents) "on" else "off" - val akkaEnableSSLConfig = configuration.getBoolean(ConfigConstants.AKKA_SSL_ENABLED, - ConfigConstants.DEFAULT_AKKA_SSL_ENABLED) && + val akkaEnableSSLConfig = configuration.getBoolean(AkkaOptions.SSL_ENABLED) && SSLUtils.getSSLEnabled(configuration) val akkaEnableSSL = if (akkaEnableSSLConfig) "on" else "off" @@ -588,14 +574,13 @@ object AkkaUtils { } def getTimeout(config: Configuration): FiniteDuration = { - val duration = Duration(config.getString(ConfigConstants.AKKA_ASK_TIMEOUT, - ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT)) + val duration = Duration(config.getString(AkkaOptions.ASK_TIMEOUT)) new FiniteDuration(duration.toMillis, TimeUnit.MILLISECONDS) } def getDefaultTimeout: Time = { - val duration = Duration(ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT) + val duration = Duration(AkkaOptions.ASK_TIMEOUT.defaultValue()) Time.milliseconds(duration.toMillis) } @@ -607,30 +592,24 @@ object AkkaUtils { } def getLookupTimeout(config: Configuration): FiniteDuration = { - val duration = Duration(config.getString( - ConfigConstants.AKKA_LOOKUP_TIMEOUT, - ConfigConstants.DEFAULT_AKKA_LOOKUP_TIMEOUT)) + val duration = Duration(config.getString(AkkaOptions.LOOKUP_TIMEOUT)) new FiniteDuration(duration.toMillis, TimeUnit.MILLISECONDS) } def getDefaultLookupTimeout: FiniteDuration = { - val duration = Duration(ConfigConstants.DEFAULT_AKKA_LOOKUP_TIMEOUT) + val duration = Duration(AkkaOptions.LOOKUP_TIMEOUT.defaultValue()) new FiniteDuration(duration.toMillis, TimeUnit.MILLISECONDS) } def getClientTimeout(config: Configuration): FiniteDuration = { - val duration = Duration( - config.getString( - ConfigConstants.AKKA_CLIENT_TIMEOUT, - ConfigConstants.DEFAULT_AKKA_CLIENT_TIMEOUT - )) + val duration = Duration(config.getString(AkkaOptions.CLIENT_TIMEOUT)) new FiniteDuration(duration.toMillis, TimeUnit.MILLISECONDS) } def getDefaultClientTimeout: FiniteDuration = { - val duration = Duration(ConfigConstants.DEFAULT_AKKA_CLIENT_TIMEOUT) + val duration = Duration(AkkaOptions.CLIENT_TIMEOUT.defaultValue()) new FiniteDuration(duration.toMillis, TimeUnit.MILLISECONDS) } diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala index 2ace8db4c09d5f5aa0b1143d481f0a17471161f2..abc8946fbf428a7483c6243229304ad47cc5fe9b 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala @@ -27,7 +27,7 @@ import akka.pattern.ask import akka.actor.{ActorRef, ActorSystem} import com.typesafe.config.Config import org.apache.flink.api.common.{JobExecutionResult, JobID, JobSubmissionResult} -import org.apache.flink.configuration.{ConfigConstants, Configuration} +import org.apache.flink.configuration.{AkkaOptions, ConfigConstants, Configuration} import org.apache.flink.core.fs.Path import org.apache.flink.runtime.akka.AkkaUtils import org.apache.flink.runtime.client.{JobClient, JobExecutionException} @@ -265,9 +265,9 @@ abstract class FlinkMiniCluster( // https://docs.travis-ci.com/user/environment-variables#Default-Environment-Variables if (sys.env.contains("CI")) { // Only set if nothing specified in config - if (config.getString(ConfigConstants.AKKA_ASK_TIMEOUT, null) == null) { - val duration = Duration(ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT) * 10 - config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, s"${duration.toSeconds}s") + if (!config.contains(AkkaOptions.ASK_TIMEOUT)) { + val duration = Duration(AkkaOptions.ASK_TIMEOUT.defaultValue()) * 10 + config.setString(AkkaOptions.ASK_TIMEOUT, s"${duration.toSeconds}s") LOG.info(s"Akka ask timeout set to ${duration.toSeconds}s") } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java index 5aa31ff9aa793201b95803bdf7223a0375ec89b3..f1bc43b9ad5fb03b2f696cc2ef555a7be972c7c4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java @@ -22,7 +22,7 @@ import akka.actor.ActorSystem; import akka.testkit.JavaTestKit; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; -import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.clusterframework.messages.NotifyResourceStarted; @@ -192,7 +192,7 @@ public class ResourceManagerTest extends TestLogger { // set a short timeout for lookups Configuration shortTimeoutConfig = config.clone(); - shortTimeoutConfig.setString(ConfigConstants.AKKA_LOOKUP_TIMEOUT, "1 s"); + shortTimeoutConfig.setString(AkkaOptions.LOOKUP_TIMEOUT, "1 s"); fakeJobManager = TestingUtils.createForwardingActor( system, @@ -234,7 +234,7 @@ public class ResourceManagerTest extends TestLogger { // set a long timeout for lookups such that the test fails in case of timeouts Configuration shortTimeoutConfig = config.clone(); - shortTimeoutConfig.setString(ConfigConstants.AKKA_LOOKUP_TIMEOUT, "99999 s"); + shortTimeoutConfig.setString(AkkaOptions.LOOKUP_TIMEOUT, "99999 s"); fakeJobManager = TestingUtils.createForwardingActor( system, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java index f19ca4e58a3a8712ef13da0b86fbd930879e6f03..0346e483d4d384d9d3397e5a32cb1452bd05f4fc 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.io.network.partition; +import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.TaskManagerOptions; @@ -51,7 +52,7 @@ public class PartialConsumePipelinedResultTest { final Configuration config = new Configuration(); config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUMBER_OF_TMS); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUMBER_OF_SLOTS_PER_TM); - config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT()); + config.setString(AkkaOptions.ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT()); config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, NUMBER_OF_NETWORK_BUFFERS); flink = new TestingCluster(config, true); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java index c8459e7fbab2c2a03479e9205c611bdc816bffc5..1a4396ec1dda7672977c6115f1c127e59d4add6d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java @@ -26,6 +26,7 @@ import akka.testkit.JavaTestKit; import akka.testkit.TestProbe; import com.typesafe.config.Config; import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; @@ -601,7 +602,7 @@ public class JobManagerTest extends TestLogger { Deadline deadline = new FiniteDuration(100, TimeUnit.SECONDS).fromNow(); Configuration config = new Configuration(); - config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "100ms"); + config.setString(AkkaOptions.ASK_TIMEOUT, "100ms"); ActorRef jobManagerActor = JobManager.startJobManagerActors( config, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java index 9dcfc70c26bae48ee12fd601c6b38305eea357bf..7234feab3e540ed86dbb3e8c1ea0963978fb7274 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java @@ -27,6 +27,7 @@ import akka.actor.Props; import akka.testkit.JavaTestKit; import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.memory.MemoryType; @@ -77,9 +78,9 @@ public class TaskManagerComponentsStartupShutdownTest extends TestLogger { final int BUFFER_SIZE = 32 * 1024; Configuration config = new Configuration(); - config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "200 ms"); - config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "1 s"); - config.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 1); + config.setString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL, "200 ms"); + config.setString(AkkaOptions.WATCH_HEARTBEAT_PAUSE, "1 s"); + config.setInteger(AkkaOptions.WATCH_THRESHOLD, 1); ActorSystem actorSystem = null; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java index 0844aad18a499fc48f0ad9c094159ab00a86f7fa..3953072e179d5f26ce1f43ed5f42156ae13ec31f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java @@ -23,6 +23,7 @@ import akka.actor.ActorSystem; import akka.actor.InvalidActorNameException; import akka.actor.Terminated; import akka.testkit.JavaTestKit; +import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; @@ -90,10 +91,10 @@ public class TaskManagerRegistrationTest extends TestLogger { @BeforeClass public static void startActorSystem() { config = new Configuration(); - config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "5 s"); - config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "200 ms"); - config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "2 s"); - config.setDouble(ConfigConstants.AKKA_WATCH_THRESHOLD, 2.0); + config.setString(AkkaOptions.ASK_TIMEOUT, "5 s"); + config.setString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL, "200 ms"); + config.setString(AkkaOptions.WATCH_HEARTBEAT_PAUSE, "2 s"); + config.setInteger(AkkaOptions.WATCH_THRESHOLD, 2); actorSystem = AkkaUtils.createLocalActorSystem(config); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java index 42338cda4452fa1072221ce61a7d2c64f476e6c4..48eb39222ce20c8f9c4314eeee3b83c32639243d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.testutils; +import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; @@ -86,10 +87,10 @@ public class ZooKeeperTestUtils { config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, fsStateHandlePath + "/recovery"); // Akka failure detection and execution retries - config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "1000 ms"); - config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "6 s"); - config.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 9); - config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "100 s"); + config.setString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL, "1000 ms"); + config.setString(AkkaOptions.WATCH_HEARTBEAT_PAUSE, "6 s"); + config.setInteger(AkkaOptions.WATCH_THRESHOLD, 9); + config.setString(AkkaOptions.ASK_TIMEOUT, "100 s"); config.setString(HighAvailabilityOptions.HA_JOB_DELAY, "10 s"); return config; diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala index 9f8e3e193b53276bfd1c1b1a5140c45aab8281ac..daf0f4734701dc84a86a1f983a72d4d717294691 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala @@ -20,7 +20,7 @@ package org.apache.flink.runtime.akka import akka.actor.ActorSystem import akka.testkit.{ImplicitSender, TestKit} -import org.apache.flink.configuration.{ConfigConstants, Configuration} +import org.apache.flink.configuration.{AkkaOptions, ConfigConstants, Configuration} import org.apache.flink.runtime.testingUtils.{TestingCluster, TestingUtils, ScalaTestingUtils} import org.junit.runner.RunWith import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike} @@ -119,7 +119,7 @@ class AkkaSslITCase(_system: ActorSystem) val config = new Configuration() config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1) config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1) - config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "2 s") + config.setString(AkkaOptions.ASK_TIMEOUT, "2 s") config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true) config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, "invalid.keystore") @@ -141,7 +141,7 @@ class AkkaSslITCase(_system: ActorSystem) val config = new Configuration() config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1) config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1) - config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "2 s") + config.setString(AkkaOptions.ASK_TIMEOUT, "2 s") config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true) diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerConnectionTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerConnectionTest.scala index 97a001d07d55418c503eadf6a94637e1f6480905..6d7d87cbf2b68ed67e4afcb70c84db528e07f94e 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerConnectionTest.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerConnectionTest.scala @@ -23,7 +23,7 @@ import java.net.{InetAddress, InetSocketAddress} import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference} -import org.apache.flink.configuration.{ConfigConstants, Configuration} +import org.apache.flink.configuration.{AkkaOptions, ConfigConstants, Configuration} import org.apache.flink.runtime.akka.AkkaUtils import org.apache.flink.util.NetUtils import org.junit.Assert._ @@ -122,7 +122,7 @@ class JobManagerConnectionTest { private def createConfigWithLowTimeout() : Configuration = { val config = new Configuration() - config.setString(ConfigConstants.AKKA_LOOKUP_TIMEOUT, + config.setString(AkkaOptions.LOOKUP_TIMEOUT, Duration(timeout, TimeUnit.MILLISECONDS).toSeconds + " s") config } diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala index f3ab40905782d61e3d210606bfc14702b6294479..4fc40423dadaebcb1590263f4ca9d1129995144b 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala @@ -21,7 +21,7 @@ package org.apache.flink.runtime.jobmanager import akka.actor.{ActorSystem, PoisonPill} import akka.testkit.{ImplicitSender, TestKit} import org.apache.flink.api.common.ExecutionConfig -import org.apache.flink.configuration.{ConfigConstants, Configuration} +import org.apache.flink.configuration.{AkkaOptions, ConfigConstants, Configuration} import org.apache.flink.runtime.akka.ListeningBehaviour import org.apache.flink.runtime.io.network.partition.ResultPartitionType import org.apache.flink.runtime.jobgraph.{DistributionPattern, JobGraph, JobStatus, JobVertex} @@ -60,7 +60,7 @@ class RecoveryITCase(_system: ActorSystem) val config = new Configuration() config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots) config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers) - config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, heartbeatTimeout) + config.setString(AkkaOptions.WATCH_HEARTBEAT_PAUSE, heartbeatTimeout) config.setString(ConfigConstants.RESTART_STRATEGY, "fixeddelay") config.setInteger(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1) config.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, heartbeatTimeout) diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala index c8977f095ded76c245dbbe9ff3e57d1bb3ad498b..858bbbb9d01c542ca0fa05885c3bba11b64a7463 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala @@ -28,7 +28,7 @@ import com.google.common.util.concurrent.MoreExecutors import com.typesafe.config.ConfigFactory import grizzled.slf4j.Logger import org.apache.flink.api.common.time.Time -import org.apache.flink.configuration.{ConfigConstants, Configuration, HighAvailabilityOptions, TaskManagerOptions} +import org.apache.flink.configuration._ import org.apache.flink.runtime.akka.AkkaUtils import org.apache.flink.runtime.clusterframework.FlinkResourceManager import org.apache.flink.runtime.clusterframework.types.ResourceID @@ -89,7 +89,7 @@ object TestingUtils { val config = new Configuration() config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots) config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs) - config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, timeout) + config.setString(AkkaOptions.ASK_TIMEOUT, timeout) val cluster = new TestingCluster(config) diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java index 437dd5f1deb07014265d114b1c52d2eff2deca42..5f6f5c46205c0f040598eda47773b638273000ad 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java @@ -27,6 +27,7 @@ import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; @@ -146,8 +147,8 @@ public class TestBaseUtils extends TestLogger { config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, TASK_MANAGER_MEMORY_SIZE); config.setBoolean(ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY, true); - config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, DEFAULT_AKKA_ASK_TIMEOUT + "s"); - config.setString(ConfigConstants.AKKA_STARTUP_TIMEOUT, DEFAULT_AKKA_STARTUP_TIMEOUT); + config.setString(AkkaOptions.ASK_TIMEOUT, DEFAULT_AKKA_ASK_TIMEOUT + "s"); + config.setString(AkkaOptions.STARTUP_TIMEOUT, DEFAULT_AKKA_STARTUP_TIMEOUT); config.setInteger(JobManagerOptions.WEB_PORT, 8081); config.setString(JobManagerOptions.WEB_LOG_PATH, logFile.toString()); @@ -287,7 +288,7 @@ public class TestBaseUtils extends TestLogger { String resultPath, String[] excludePrefixes, boolean inOrderOfFiles) throws IOException { - + checkArgument(resultPath != null, "resultPath cannot be be null"); final BufferedReader[] readers = getResultReader(resultPath, excludePrefixes, inOrderOfFiles); @@ -328,8 +329,8 @@ public class TestBaseUtils extends TestLogger { String msg = String.format( "Different elements in arrays: expected %d elements and received %d\n" + "files: %s\n expected: %s\n received: %s", - expected.length, result.length, - Arrays.toString(getAllInvolvedFiles(resultPath, excludePrefixes)), + expected.length, result.length, + Arrays.toString(getAllInvolvedFiles(resultPath, excludePrefixes)), Arrays.toString(expected), Arrays.toString(result)); fail(msg); } diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java index 49ff744a92cd33e6885f6a79aaa9b25501f122e0..92e57683385031f02c50c964233d459465ce7500 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java @@ -34,6 +34,7 @@ import org.apache.flink.api.common.io.OutputFormat; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.LocalEnvironment; +import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.optimizer.DataStatistics; @@ -120,7 +121,7 @@ public class AccumulatorLiveITCase extends TestLogger { Configuration config = new Configuration(); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1); config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1); - config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT()); + config.setString(AkkaOptions.ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT()); TestingCluster testingCluster = new TestingCluster(config, false, true); testingCluster.start(); diff --git a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java index 06233d6bfdf16d0fe8d1bc21a41a0a68d163a48b..27673121b63265035c241e1c15ae84f59a9eca22 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java @@ -20,6 +20,7 @@ package org.apache.flink.test.cancelling; import org.apache.flink.api.common.Plan; +import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.TaskManagerOptions; @@ -88,7 +89,7 @@ public abstract class CancelingTestBase extends TestLogger { config.setBoolean(ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY, true); config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4); - config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT()); + config.setString(AkkaOptions.ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT()); config.setInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE, 4096); config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 2048); diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java index a573be63deca2db67d93ebcf6bf72d69192b8244..bda16796eb0eb3fcd0fad8c3fc8c7debb7dcc545 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.TaskManagerOptions; @@ -74,8 +75,8 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger { config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM / 2); config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 48L); - config.setString(ConfigConstants.DEFAULT_AKKA_LOOKUP_TIMEOUT, "60 s"); - config.setString(ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT, "60 s"); + config.setString(AkkaOptions.LOOKUP_TIMEOUT, "60 s"); + config.setString(AkkaOptions.ASK_TIMEOUT, "60 s"); cluster = new LocalFlinkMiniCluster(config, false); cluster.start(); diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingCustomInputSplitProgram.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingCustomInputSplitProgram.java index e7bd5225b6aef8b36ebeb1bc291282fc206d7402..4905d43c2a947ff04997b9a3263c0b042ee8d818 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingCustomInputSplitProgram.java +++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingCustomInputSplitProgram.java @@ -25,7 +25,7 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; -import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.InputSplit; import org.apache.flink.core.io.InputSplitAssigner; @@ -44,7 +44,7 @@ public class StreamingCustomInputSplitProgram { public static void main(String[] args) throws Exception { Configuration config = new Configuration(); - config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "5 s"); + config.setString(AkkaOptions.ASK_TIMEOUT, "5 s"); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java index 7c6f73aec462073afe6e48cb2e0ec799d3938500..85961db1d5298ce3bc60bd3eff6ef0a35828c6af 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java @@ -24,6 +24,7 @@ import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.io.LocalCollectionOutputFormat; import org.apache.flink.client.program.ProgramInvocationException; +import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.GenericInputSplit; @@ -78,7 +79,7 @@ public class RemoteEnvironmentITCase extends TestLogger { @Test(expected=FlinkException.class) public void testInvalidAkkaConfiguration() throws Throwable { Configuration config = new Configuration(); - config.setString(ConfigConstants.AKKA_STARTUP_TIMEOUT, INVALID_STARTUP_TIMEOUT); + config.setString(AkkaOptions.STARTUP_TIMEOUT, INVALID_STARTUP_TIMEOUT); final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment( cluster.getHostname(), @@ -103,7 +104,7 @@ public class RemoteEnvironmentITCase extends TestLogger { @Test public void testUserSpecificParallelism() throws Exception { Configuration config = new Configuration(); - config.setString(ConfigConstants.AKKA_STARTUP_TIMEOUT, VALID_STARTUP_TIMEOUT); + config.setString(AkkaOptions.STARTUP_TIMEOUT, VALID_STARTUP_TIMEOUT); final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment( cluster.getHostname(), diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java index c7c07ce1472ef1828edf566218a09800b3a29072..5c65a7ff528a2b23e93faa99c08714c2e9aaaece 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java @@ -24,6 +24,7 @@ import akka.pattern.Patterns; import akka.util.Timeout; import org.apache.commons.io.FileUtils; +import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; @@ -127,11 +128,11 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test Tuple2 localAddress = new Tuple2("localhost", jobManagerPort); Configuration jmConfig = new Configuration(); - jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "1000 ms"); - jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "6 s"); - jmConfig.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 9); + jmConfig.setString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL, "1000 ms"); + jmConfig.setString(AkkaOptions.WATCH_HEARTBEAT_PAUSE, "6 s"); + jmConfig.setInteger(AkkaOptions.WATCH_THRESHOLD, 9); jmConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "10 s"); - jmConfig.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "100 s"); + jmConfig.setString(AkkaOptions.ASK_TIMEOUT, "100 s"); jmConfig.setString(JobManagerOptions.ADDRESS, localAddress._1()); jmConfig.setInteger(JobManagerOptions.PORT, jobManagerPort); @@ -409,7 +410,7 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test cfg.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L); cfg.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 100); cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2); - cfg.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "100 s"); + cfg.setString(AkkaOptions.ASK_TIMEOUT, "100 s"); TaskManager.selectNetworkInterfaceAndRunTaskManager(cfg, ResourceID.generate(), TaskManager.class); diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java index 6d53b9f6d575abe765a5fa56d9ebc850d394fc6e..c8c8d2afd7785552e93702355a1458baf6a01ed2 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java @@ -22,6 +22,7 @@ import akka.actor.ActorRef; import akka.actor.ActorSystem; import org.apache.commons.io.FileUtils; import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; @@ -156,9 +157,9 @@ public class ChaosMonkeyITCase extends TestLogger { ZooKeeper.getConnectString(), FileStateBackendBasePath.toURI().toString()); // Akka and restart timeouts - config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "1000 ms"); - config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "6 s"); - config.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 9); + config.setString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL, "1000 ms"); + config.setString(AkkaOptions.WATCH_HEARTBEAT_PAUSE, "6 s"); + config.setInteger(AkkaOptions.WATCH_THRESHOLD, 9); if (checkpointingIntervalMs >= killEvery.toMillis()) { throw new IllegalArgumentException("Relax! You want to kill processes every " + diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java index 9d2806c5483a0d8e942755393458cafddd8834da..59d5a519ffac739f852dd63d250b6852704417e3 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java @@ -29,7 +29,7 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.client.program.ProgramInvocationException; -import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.runtime.akka.AkkaUtils; @@ -101,10 +101,10 @@ public class ProcessFailureCancelingITCase extends TestLogger { Tuple2 localAddress = new Tuple2("localhost", jobManagerPort); Configuration jmConfig = new Configuration(); - jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "5 s"); - jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "2000 s"); - jmConfig.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 10); - jmConfig.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "100 s"); + jmConfig.setString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL, "5 s"); + jmConfig.setString(AkkaOptions.WATCH_HEARTBEAT_PAUSE, "2000 s"); + jmConfig.setInteger(AkkaOptions.WATCH_THRESHOLD, 10); + jmConfig.setString(AkkaOptions.ASK_TIMEOUT, "100 s"); jmConfig.setString(JobManagerOptions.ADDRESS, localAddress._1()); jmConfig.setInteger(JobManagerOptions.PORT, jobManagerPort); diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java index bafdd9f685d427a01d04ef275cde7e378ebb76ec..93d369a3b3487a6c902e0000aa800c2b4d0a601b 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java @@ -27,6 +27,7 @@ import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.io.LocalCollectionOutputFormat; +import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.TaskManagerOptions; @@ -77,9 +78,9 @@ public class TaskManagerFailureRecoveryITCase extends TestLogger { config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM); config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16L); - config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "500 ms"); - config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "20 s"); - config.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 20); + config.setString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL, "500 ms"); + config.setString(AkkaOptions.WATCH_HEARTBEAT_PAUSE, "20 s"); + config.setInteger(AkkaOptions.WATCH_THRESHOLD, 20); cluster = new LocalFlinkMiniCluster(config, false); diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java index 40a8f09d55fe39b75235ddeaa3adf765a66e938a..37e89e926bdff97445fb039040f0117995d16c90 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java @@ -23,6 +23,7 @@ import akka.actor.Kill; import akka.actor.PoisonPill; import org.apache.curator.test.TestingServer; import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; @@ -151,7 +152,7 @@ public class ZooKeeperLeaderElectionITCase extends TestLogger { // we "effectively" disable the automatic RecoverAllJobs message and sent it manually to make // sure that all TMs have registered to the JM prior to issueing the RecoverAllJobs message - configuration.setString(ConfigConstants.AKKA_ASK_TIMEOUT, AkkaUtils.INF_TIMEOUT().toString()); + configuration.setString(AkkaOptions.ASK_TIMEOUT, AkkaUtils.INF_TIMEOUT().toString()); Tasks.BlockingOnceReceiver$.MODULE$.blocking_$eq(true); diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java index 1da52d496242f251bae8b3862ecfb2ad1ab96cce..398a5eb5d3a9c048620beca3bc579cfac1cc4d9a 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java @@ -18,6 +18,7 @@ package org.apache.flink.yarn; +import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.GlobalConfiguration; @@ -127,7 +128,7 @@ public class YarnTaskExecutorRunner { } // tell akka to die in case of an error - configuration.setBoolean(ConfigConstants.AKKA_JVM_EXIT_ON_FATAL_ERROR, true); + configuration.setBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, true); String keytabPath = null; if(remoteKeytabPath != null) { diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java index 849a8a64d9450fba63a08be981f6a4558db468e1..047a1fae14db375c981f5256522ba9364d769bef 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.util.Map; import java.util.concurrent.Callable; +import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.SecurityOptions; @@ -91,7 +92,7 @@ public class YarnTaskManagerRunner { } // tell akka to die in case of an error - configuration.setBoolean(ConfigConstants.AKKA_JVM_EXIT_ON_FATAL_ERROR, true); + configuration.setBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, true); String localKeytabPath = null; if(remoteKeytabPath != null) {