diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java index fd179c0e0a96648dcf56df6c2b3c423e668447cc..7517504ba08efb3eb375420eaa745a4df54c1849 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java @@ -18,8 +18,8 @@ package org.apache.flink.client.program; import org.apache.flink.api.common.JobSubmissionResult; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.runtime.clusterframework.messages.GetClusterStatus; import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; @@ -53,8 +53,7 @@ public class StandaloneClusterClient extends ClusterClient { @Override public String getWebInterfaceURL() { String host = this.getJobManagerAddress().getHostString(); - int port = getFlinkConfiguration().getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, - ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT); + int port = getFlinkConfiguration().getInteger(JobManagerOptions.WEB_PORT); return "http://" + host + ":" + port; } diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index 92e6b5d018efbb180ffe268e11b15b24383e5e02..c3704be43e29fadb263e489c124d34fdc5aa96af 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -1338,36 +1338,81 @@ public final class ConfigConstants { key("jobmanager.web.address") .noDefaultValue(); - /** The config key for the port of the JobManager web frontend. - * Setting this value to {@code -1} disables the web frontend. */ + /** + * The config key for the port of the JobManager web frontend. + * Setting this value to {@code -1} disables the web frontend. + * + * @deprecated use {@link JobManagerOptions#WEB_PORT} instead + */ + @Deprecated public static final int DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT = 8081; - /** Default value to override SSL support for the JobManager web UI */ + /** + * Default value to override SSL support for the JobManager web UI + * + * @deprecated use {@link JobManagerOptions#WEB_SSL_ENABLED} instead + */ + @Deprecated public static final boolean DEFAULT_JOB_MANAGER_WEB_SSL_ENABLED = true; - /** The default number of archived jobs for the jobmanager */ + /** + * The default number of archived jobs for the jobmanager + * + * @deprecated use {@link JobManagerOptions#WEB_ARCHIVE_COUNT} instead + */ + @Deprecated public static final int DEFAULT_JOB_MANAGER_WEB_ARCHIVE_COUNT = 5; - /** By default, submitting jobs from the web-frontend is allowed. */ + /** + * By default, submitting jobs from the web-frontend is allowed. + * + * @deprecated use {@link JobManagerOptions#WEB_SUBMIT_ENABLE} instead + */ + @Deprecated public static final boolean DEFAULT_JOB_MANAGER_WEB_SUBMIT_ENABLED = true; /** @deprecated Config key has been deprecated. Therefore, no default value required. */ @Deprecated public static final boolean DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_DISABLE = false; - /** Default number of checkpoints to remember for recent history. */ + /** + * Default number of checkpoints to remember for recent history. + * + * @deprecated use {@link JobManagerOptions#WEB_CHECKPOINTS_HISTORY_SIZE} instead + */ + @Deprecated public static final int DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE = 10; - /** Time after which cached stats are cleaned up. */ + /** + * Time after which cached stats are cleaned up. + * + * @@deprecated use {@link JobManagerOptions#WEB_BACKPRESSURE_CLEANUP_INTERVAL} instead + */ + @Deprecated public static final int DEFAULT_JOB_MANAGER_WEB_BACK_PRESSURE_CLEAN_UP_INTERVAL = 10 * 60 * 1000; - /** Time after which available stats are deprecated and need to be refreshed (by resampling). */ + /** + * Time after which available stats are deprecated and need to be refreshed (by resampling). + * + * @deprecated use {@link JobManagerOptions#WEB_BACKPRESSURE_REFRESH_INTERVAL} instead + */ + @Deprecated public static final int DEFAULT_JOB_MANAGER_WEB_BACK_PRESSURE_REFRESH_INTERVAL = 60 * 1000; - /** Number of samples to take to determine back pressure. */ + /** + * Number of samples to take to determine back pressure. + * + * @deprecated use {@link JobManagerOptions#WEB_BACKPRESSURE_NUM_SAMPLES} instead + */ + @Deprecated public static final int DEFAULT_JOB_MANAGER_WEB_BACK_PRESSURE_NUM_SAMPLES = 100; - /** Delay between samples to determine back pressure. */ + /** + * Delay between samples to determine back pressure. + * + * @deprecated use {@link JobManagerOptions#WEB_BACKPRESSURE_DELAY} instead + */ + @Deprecated public static final int DEFAULT_JOB_MANAGER_WEB_BACK_PRESSURE_DELAY = 50; // ------------------------------ Akka Values ------------------------------ diff --git a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java index b924e8e23a314150f20f09e04491d98ff17f5c3a..76b6bed590b18afb0488b6378d12d09e1da7f1b4 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java @@ -115,6 +115,13 @@ public class JobManagerOptions { key("jobmanager.web.access-control-allow-origin") .defaultValue("*"); + /** + * The config parameter defining the refresh interval for the web-frontend. + */ + public static final ConfigOption WEB_REFRESH_INTERVAL = + key("jobmanager.web.refresh-interval") + .defaultValue(3000L); + /** * Config parameter to override SSL support for the JobManager Web UI */ diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java index 709ef0940180a348030b02d66bcccac4d22fd640..3d8a38471c990a45be864d24141be16039d59dd5 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java @@ -1235,10 +1235,6 @@ public abstract class ExecutionEnvironment { public static ExecutionEnvironment createLocalEnvironmentWithWebUI(Configuration conf) { checkNotNull(conf, "conf"); - if (!conf.containsKey(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY)) { - int port = ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT; - conf.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, port); - } conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true); LocalEnvironment localEnv = new LocalEnvironment(conf); diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java index dba21459c0af8fe87363136ed3b77118748ef963..77537a2d2d43c16308ab2e6e7f0652ff01712dd6 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java @@ -18,39 +18,11 @@ package org.apache.flink.runtime.webmonitor; - -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; public class WebMonitorConfig { - // ------------------------------------------------------------------------ - // Config Keys - // ------------------------------------------------------------------------ - - /** The port for the runtime monitor web-frontend server. */ - public static final String JOB_MANAGER_WEB_PORT_KEY = ConfigConstants.JOB_MANAGER_WEB_PORT_KEY; - - /** The initial refresh interval for the web dashboard */ - public static final String JOB_MANAGER_WEB_REFRESH_INTERVAL_KEY = "jobmanager.web.refresh-interval"; - - - // ------------------------------------------------------------------------ - // Default values - // ------------------------------------------------------------------------ - - /** Default port for the web dashboard (= 8081) */ - public static final int DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT = ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT; - - /** Default refresh interval for the web dashboard (= 3000 msecs) */ - public static final long DEFAULT_JOB_MANAGER_WEB_REFRESH_INTERVAL = 3000; - - - // ------------------------------------------------------------------------ - // Config - // ------------------------------------------------------------------------ - /** The configuration queried by this config object */ private final Configuration config; @@ -67,17 +39,15 @@ public class WebMonitorConfig { } public int getWebFrontendPort() { - return config.getInteger(JOB_MANAGER_WEB_PORT_KEY, DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT); + return config.getInteger(JobManagerOptions.WEB_PORT); } public long getRefreshInterval() { - return config.getLong(JOB_MANAGER_WEB_REFRESH_INTERVAL_KEY, DEFAULT_JOB_MANAGER_WEB_REFRESH_INTERVAL); + return config.getLong(JobManagerOptions.WEB_REFRESH_INTERVAL); } public boolean isProgramSubmitEnabled() { - return config.getBoolean( - ConfigConstants.JOB_MANAGER_WEB_SUBMIT_ENABLED_KEY, - ConfigConstants.DEFAULT_JOB_MANAGER_WEB_SUBMIT_ENABLED); + return config.getBoolean(JobManagerOptions.WEB_SUBMIT_ENABLE); } public String getAllowOrigin() { diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java index f83fa272ab5b173545482834498ac6ee855a077a..03b53ad245792a8c05f9475a9c89eb2e60eb60b7 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java @@ -25,6 +25,7 @@ import io.netty.handler.codec.http.router.Router; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.jobmanager.MemoryArchivist; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; @@ -191,21 +192,13 @@ public class WebRuntimeMonitor implements WebMonitor { stackTraceSamples = new StackTraceSampleCoordinator(actorSystem.dispatcher(), 60000); // Back pressure stats tracker config - int cleanUpInterval = config.getInteger( - ConfigConstants.JOB_MANAGER_WEB_BACK_PRESSURE_CLEAN_UP_INTERVAL, - ConfigConstants.DEFAULT_JOB_MANAGER_WEB_BACK_PRESSURE_CLEAN_UP_INTERVAL); + int cleanUpInterval = config.getInteger(JobManagerOptions.WEB_BACKPRESSURE_CLEANUP_INTERVAL); - int refreshInterval = config.getInteger( - ConfigConstants.JOB_MANAGER_WEB_BACK_PRESSURE_REFRESH_INTERVAL, - ConfigConstants.DEFAULT_JOB_MANAGER_WEB_BACK_PRESSURE_REFRESH_INTERVAL); + int refreshInterval = config.getInteger(JobManagerOptions.WEB_BACKPRESSURE_REFRESH_INTERVAL); - int numSamples = config.getInteger( - ConfigConstants.JOB_MANAGER_WEB_BACK_PRESSURE_NUM_SAMPLES, - ConfigConstants.DEFAULT_JOB_MANAGER_WEB_BACK_PRESSURE_NUM_SAMPLES); + int numSamples = config.getInteger(JobManagerOptions.WEB_BACKPRESSURE_NUM_SAMPLES); - int delay = config.getInteger( - ConfigConstants.JOB_MANAGER_WEB_BACK_PRESSURE_DELAY, - ConfigConstants.DEFAULT_JOB_MANAGER_WEB_BACK_PRESSURE_DELAY); + int delay = config.getInteger(JobManagerOptions.WEB_BACKPRESSURE_DELAY); Time delayBetweenSamples = Time.milliseconds(delay); @@ -219,10 +212,7 @@ public class WebRuntimeMonitor implements WebMonitor { ExecutionContextExecutor context = ExecutionContext$.MODULE$.fromExecutor(executorService); // Config to enable https access to the web-ui - boolean enableSSL = config.getBoolean( - ConfigConstants.JOB_MANAGER_WEB_SSL_ENABLED, - ConfigConstants.DEFAULT_JOB_MANAGER_WEB_SSL_ENABLED) && - SSLUtils.getSSLEnabled(config); + boolean enableSSL = config.getBoolean(JobManagerOptions.WEB_SSL_ENABLED) && SSLUtils.getSSLEnabled(config); if (enableSSL) { LOG.info("Enabling ssl for the web frontend"); @@ -310,9 +300,7 @@ public class WebRuntimeMonitor implements WebMonitor { // DELETE is the preferred way of stopping a job (Rest-conform) DELETE(router, new JobStoppingHandler()); - int maxCachedEntries = config.getInteger( - ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE, - ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE); + int maxCachedEntries = config.getInteger(JobManagerOptions.WEB_CHECKPOINTS_HISTORY_SIZE); CheckpointStatsCache cache = new CheckpointStatsCache(maxCachedEntries); // Register the checkpoint stats handlers @@ -525,14 +513,14 @@ public class WebRuntimeMonitor implements WebMonitor { } private String getBaseDirStr(Configuration configuration) { - return configuration.getString(ConfigConstants.JOB_MANAGER_WEB_TMPDIR_KEY, System.getProperty("java.io.tmpdir")); + return configuration.getString(JobManagerOptions.WEB_TMP_DIR); } private File getUploadDir(Configuration configuration) { - File baseDir = new File(configuration.getString(ConfigConstants.JOB_MANAGER_WEB_UPLOAD_DIR_KEY, + File baseDir = new File(configuration.getString(JobManagerOptions.WEB_UPLOAD_DIR, getBaseDirStr(configuration))); - boolean uploadDirSpecified = configuration.containsKey(ConfigConstants.JOB_MANAGER_WEB_UPLOAD_DIR_KEY); + boolean uploadDirSpecified = configuration.contains(JobManagerOptions.WEB_UPLOAD_DIR); return uploadDirSpecified ? baseDir : new File(baseDir, "flink-web-" + UUID.randomUUID()); } } diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java index 5ccfe909218043ee522a6f042e68da1f05b7369d..a51a2340b985fa598a7334a4707ae3cb0d9751cc 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java @@ -25,6 +25,7 @@ import org.apache.curator.test.TestingServer; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; @@ -137,8 +138,8 @@ public class WebRuntimeMonitorITCase extends TestLogger { Path logFile = Files.createFile(new File(logDir, "jobmanager.log").toPath()); Files.createFile(new File(logDir, "jobmanager.out").toPath()); - config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0); - config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logFile.toString()); + config.setInteger(JobManagerOptions.WEB_PORT, 0); + config.setString(JobManagerOptions.WEB_LOG_PATH, logFile.toString()); highAvailabilityServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices( config, @@ -286,8 +287,8 @@ public class WebRuntimeMonitorITCase extends TestLogger { Files.createFile(new File(logDir, "jobmanager.out").toPath()); final Configuration config = new Configuration(); - config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0); - config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logFile.toString()); + config.setInteger(JobManagerOptions.WEB_PORT, 0); + config.setString(JobManagerOptions.WEB_LOG_PATH, logFile.toString()); config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zooKeeper.getConnectString()); @@ -463,8 +464,8 @@ public class WebRuntimeMonitorITCase extends TestLogger { // Web frontend on random port Configuration config = new Configuration(); - config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0); - config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logFile.toString()); + config.setInteger(JobManagerOptions.WEB_PORT, 0); + config.setString(JobManagerOptions.WEB_LOG_PATH, logFile.toString()); WebRuntimeMonitor webMonitor = new WebRuntimeMonitor( config, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java index 88863e485197e542042eaee7bda84aa4a6fe27ef..aa28fbcae24d3eb9eedcdda2332fc7dd1e75e9ae 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java @@ -25,6 +25,7 @@ import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; @@ -204,9 +205,7 @@ public class ExecutionGraphBuilder { } // Maximum number of remembered checkpoints - int historySize = jobManagerConfig.getInteger( - ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE, - ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE); + int historySize = jobManagerConfig.getInteger(JobManagerOptions.WEB_CHECKPOINTS_HISTORY_SIZE); CheckpointStatsTracker checkpointStatsTracker = new CheckpointStatsTracker( historySize, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java index 2baadb50b2fa75078f6324acef07fe68c7417ebd..dd9527ea328f0a19e7da7a90c55c2cd9ed90547e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java @@ -25,8 +25,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ArrayNode; import java.net.URI; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; @@ -80,14 +80,14 @@ public final class WebMonitorUtils { if (logFilePath == null) { LOG.warn("Log file environment variable '{}' is not set.", logEnv); - logFilePath = config.getString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, null); + logFilePath = config.getString(JobManagerOptions.WEB_LOG_PATH); } // not configured, cannot serve log files if (logFilePath == null || logFilePath.length() < 4) { LOG.warn("JobManager log files are unavailable in the web dashboard. " + "Log file location not found in environment variable '{}' or configuration key '{}'.", - logEnv, ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY); + logEnv, JobManagerOptions.WEB_LOG_PATH.key()); return new LogFileLocation(null, null); } diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 5092643c490e5fb90ee8bbb757ecc07b0d99e568..57a6415c31975eba28df31cb1ba86798d07b40c9 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -2532,8 +2532,7 @@ object JobManager { val restartStrategy = RestartStrategyFactory.createRestartStrategyFactory(configuration) - val archiveCount = configuration.getInteger(ConfigConstants.JOB_MANAGER_WEB_ARCHIVE_COUNT, - ConfigConstants.DEFAULT_JOB_MANAGER_WEB_ARCHIVE_COUNT) + val archiveCount = configuration.getInteger(JobManagerOptions.WEB_ARCHIVE_COUNT) val archiveDir = configuration.getString(JobManagerOptions.ARCHIVE_DIR) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index aad3a4b74c33b3bf7d955bb48dd5f20921f3b05d..97117d214eb40c663a5a46822b787889bdbcce70 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -1661,10 +1661,6 @@ public abstract class StreamExecutionEnvironment { public static StreamExecutionEnvironment createLocalEnvironmentWithWebUI(Configuration conf) { checkNotNull(conf, "conf"); - if (!conf.containsKey(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY)) { - int port = ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT; - conf.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, port); - } conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true); LocalStreamEnvironment localEnv = new LocalStreamEnvironment(conf); diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java index f96ab3de47d7b1d8645c2ec2dcf2fbfea7359e0d..437dd5f1deb07014265d114b1c52d2eff2deca42 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java @@ -30,6 +30,7 @@ import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.core.testutils.CommonTestUtils; import org.apache.flink.runtime.messages.TaskManagerMessages; @@ -148,8 +149,8 @@ public class TestBaseUtils extends TestLogger { config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, DEFAULT_AKKA_ASK_TIMEOUT + "s"); config.setString(ConfigConstants.AKKA_STARTUP_TIMEOUT, DEFAULT_AKKA_STARTUP_TIMEOUT); - config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 8081); - config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logFile.toString()); + config.setInteger(JobManagerOptions.WEB_PORT, 8081); + config.setString(JobManagerOptions.WEB_LOG_PATH, logFile.toString()); config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, logFile.toString()); diff --git a/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java b/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java index 003eb0c57e71e8394f67f2576f7688b49b9bd1fa..538ac98370f7aad97de19b998f7b475fd22b0b63 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java @@ -24,12 +24,15 @@ import com.fasterxml.jackson.databind.node.ArrayNode; import org.apache.commons.io.FileUtils; +import org.apache.commons.math3.optim.nonlinear.vector.JacobianMultivariateVectorOptimizer; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobmanager.JobManagerCliOptions; import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.runtime.testutils.StoppableInvokable; import org.apache.flink.runtime.webmonitor.WebMonitorUtils; @@ -84,7 +87,7 @@ public class WebFrontendITCase extends TestLogger { Files.createFile(logFile.toPath()); Files.createFile(outFile.toPath()); - config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logFile.getAbsolutePath()); + config.setString(JobManagerOptions.WEB_LOG_PATH, logFile.getAbsolutePath()); config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, logFile.getAbsolutePath()); cluster = new LocalFlinkMiniCluster(config, false); diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java index b62f957363bc0c601753ddd68cede65b4e8b1b34..64417f64acacfbdc6d3e882b6e0c367e8ff296df 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java @@ -372,8 +372,7 @@ public class YarnApplicationMasterRunner { LOG); String protocol = "http://"; - if (config.getBoolean(ConfigConstants.JOB_MANAGER_WEB_SSL_ENABLED, - ConfigConstants.DEFAULT_JOB_MANAGER_WEB_SSL_ENABLED) && SSLUtils.getSSLEnabled(config)) { + if (config.getBoolean(JobManagerOptions.WEB_SSL_ENABLED) && SSLUtils.getSSLEnabled(config)) { protocol = "https://"; } final String webMonitorURL = webMonitor == null ? null :