From c1025470a91376e251f966fc58cb5cb1c59c7c66 Mon Sep 17 00:00:00 2001 From: zentol Date: Fri, 5 May 2017 12:39:55 +0200 Subject: [PATCH] [FLINK-6461] Deprecate web config defaults in ConfigConstants This closes #3831. --- .../program/StandaloneClusterClient.java | 5 +- .../flink/configuration/ConfigConstants.java | 65 ++++++++++++++++--- .../configuration/JobManagerOptions.java | 7 ++ .../flink/api/java/ExecutionEnvironment.java | 4 -- .../runtime/webmonitor/WebMonitorConfig.java | 36 +--------- .../runtime/webmonitor/WebRuntimeMonitor.java | 32 +++------ .../webmonitor/WebRuntimeMonitorITCase.java | 13 ++-- .../executiongraph/ExecutionGraphBuilder.java | 5 +- .../runtime/webmonitor/WebMonitorUtils.java | 6 +- .../flink/runtime/jobmanager/JobManager.scala | 3 +- .../StreamExecutionEnvironment.java | 4 -- .../apache/flink/test/util/TestBaseUtils.java | 5 +- .../flink/test/web/WebFrontendITCase.java | 5 +- .../yarn/YarnApplicationMasterRunner.java | 3 +- 14 files changed, 98 insertions(+), 95 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java index fd179c0e0a9..7517504ba08 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java @@ -18,8 +18,8 @@ package org.apache.flink.client.program; import org.apache.flink.api.common.JobSubmissionResult; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.runtime.clusterframework.messages.GetClusterStatus; import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; @@ -53,8 +53,7 @@ public class StandaloneClusterClient extends ClusterClient { @Override public String getWebInterfaceURL() { String host = this.getJobManagerAddress().getHostString(); - int port = getFlinkConfiguration().getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, - ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT); + int port = getFlinkConfiguration().getInteger(JobManagerOptions.WEB_PORT); return "http://" + host + ":" + port; } diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index 92e6b5d018e..c3704be43e2 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -1338,36 +1338,81 @@ public final class ConfigConstants { key("jobmanager.web.address") .noDefaultValue(); - /** The config key for the port of the JobManager web frontend. - * Setting this value to {@code -1} disables the web frontend. */ + /** + * The config key for the port of the JobManager web frontend. + * Setting this value to {@code -1} disables the web frontend. + * + * @deprecated use {@link JobManagerOptions#WEB_PORT} instead + */ + @Deprecated public static final int DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT = 8081; - /** Default value to override SSL support for the JobManager web UI */ + /** + * Default value to override SSL support for the JobManager web UI + * + * @deprecated use {@link JobManagerOptions#WEB_SSL_ENABLED} instead + */ + @Deprecated public static final boolean DEFAULT_JOB_MANAGER_WEB_SSL_ENABLED = true; - /** The default number of archived jobs for the jobmanager */ + /** + * The default number of archived jobs for the jobmanager + * + * @deprecated use {@link JobManagerOptions#WEB_ARCHIVE_COUNT} instead + */ + @Deprecated public static final int DEFAULT_JOB_MANAGER_WEB_ARCHIVE_COUNT = 5; - /** By default, submitting jobs from the web-frontend is allowed. */ + /** + * By default, submitting jobs from the web-frontend is allowed. + * + * @deprecated use {@link JobManagerOptions#WEB_SUBMIT_ENABLE} instead + */ + @Deprecated public static final boolean DEFAULT_JOB_MANAGER_WEB_SUBMIT_ENABLED = true; /** @deprecated Config key has been deprecated. Therefore, no default value required. */ @Deprecated public static final boolean DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_DISABLE = false; - /** Default number of checkpoints to remember for recent history. */ + /** + * Default number of checkpoints to remember for recent history. + * + * @deprecated use {@link JobManagerOptions#WEB_CHECKPOINTS_HISTORY_SIZE} instead + */ + @Deprecated public static final int DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE = 10; - /** Time after which cached stats are cleaned up. */ + /** + * Time after which cached stats are cleaned up. + * + * @@deprecated use {@link JobManagerOptions#WEB_BACKPRESSURE_CLEANUP_INTERVAL} instead + */ + @Deprecated public static final int DEFAULT_JOB_MANAGER_WEB_BACK_PRESSURE_CLEAN_UP_INTERVAL = 10 * 60 * 1000; - /** Time after which available stats are deprecated and need to be refreshed (by resampling). */ + /** + * Time after which available stats are deprecated and need to be refreshed (by resampling). + * + * @deprecated use {@link JobManagerOptions#WEB_BACKPRESSURE_REFRESH_INTERVAL} instead + */ + @Deprecated public static final int DEFAULT_JOB_MANAGER_WEB_BACK_PRESSURE_REFRESH_INTERVAL = 60 * 1000; - /** Number of samples to take to determine back pressure. */ + /** + * Number of samples to take to determine back pressure. + * + * @deprecated use {@link JobManagerOptions#WEB_BACKPRESSURE_NUM_SAMPLES} instead + */ + @Deprecated public static final int DEFAULT_JOB_MANAGER_WEB_BACK_PRESSURE_NUM_SAMPLES = 100; - /** Delay between samples to determine back pressure. */ + /** + * Delay between samples to determine back pressure. + * + * @deprecated use {@link JobManagerOptions#WEB_BACKPRESSURE_DELAY} instead + */ + @Deprecated public static final int DEFAULT_JOB_MANAGER_WEB_BACK_PRESSURE_DELAY = 50; // ------------------------------ Akka Values ------------------------------ diff --git a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java index b924e8e23a3..76b6bed590b 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java @@ -115,6 +115,13 @@ public class JobManagerOptions { key("jobmanager.web.access-control-allow-origin") .defaultValue("*"); + /** + * The config parameter defining the refresh interval for the web-frontend. + */ + public static final ConfigOption WEB_REFRESH_INTERVAL = + key("jobmanager.web.refresh-interval") + .defaultValue(3000L); + /** * Config parameter to override SSL support for the JobManager Web UI */ diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java index 709ef094018..3d8a38471c9 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java @@ -1235,10 +1235,6 @@ public abstract class ExecutionEnvironment { public static ExecutionEnvironment createLocalEnvironmentWithWebUI(Configuration conf) { checkNotNull(conf, "conf"); - if (!conf.containsKey(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY)) { - int port = ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT; - conf.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, port); - } conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true); LocalEnvironment localEnv = new LocalEnvironment(conf); diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java index dba21459c0a..77537a2d2d4 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java @@ -18,39 +18,11 @@ package org.apache.flink.runtime.webmonitor; - -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; public class WebMonitorConfig { - // ------------------------------------------------------------------------ - // Config Keys - // ------------------------------------------------------------------------ - - /** The port for the runtime monitor web-frontend server. */ - public static final String JOB_MANAGER_WEB_PORT_KEY = ConfigConstants.JOB_MANAGER_WEB_PORT_KEY; - - /** The initial refresh interval for the web dashboard */ - public static final String JOB_MANAGER_WEB_REFRESH_INTERVAL_KEY = "jobmanager.web.refresh-interval"; - - - // ------------------------------------------------------------------------ - // Default values - // ------------------------------------------------------------------------ - - /** Default port for the web dashboard (= 8081) */ - public static final int DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT = ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT; - - /** Default refresh interval for the web dashboard (= 3000 msecs) */ - public static final long DEFAULT_JOB_MANAGER_WEB_REFRESH_INTERVAL = 3000; - - - // ------------------------------------------------------------------------ - // Config - // ------------------------------------------------------------------------ - /** The configuration queried by this config object */ private final Configuration config; @@ -67,17 +39,15 @@ public class WebMonitorConfig { } public int getWebFrontendPort() { - return config.getInteger(JOB_MANAGER_WEB_PORT_KEY, DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT); + return config.getInteger(JobManagerOptions.WEB_PORT); } public long getRefreshInterval() { - return config.getLong(JOB_MANAGER_WEB_REFRESH_INTERVAL_KEY, DEFAULT_JOB_MANAGER_WEB_REFRESH_INTERVAL); + return config.getLong(JobManagerOptions.WEB_REFRESH_INTERVAL); } public boolean isProgramSubmitEnabled() { - return config.getBoolean( - ConfigConstants.JOB_MANAGER_WEB_SUBMIT_ENABLED_KEY, - ConfigConstants.DEFAULT_JOB_MANAGER_WEB_SUBMIT_ENABLED); + return config.getBoolean(JobManagerOptions.WEB_SUBMIT_ENABLE); } public String getAllowOrigin() { diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java index f83fa272ab5..03b53ad2457 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java @@ -25,6 +25,7 @@ import io.netty.handler.codec.http.router.Router; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.jobmanager.MemoryArchivist; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; @@ -191,21 +192,13 @@ public class WebRuntimeMonitor implements WebMonitor { stackTraceSamples = new StackTraceSampleCoordinator(actorSystem.dispatcher(), 60000); // Back pressure stats tracker config - int cleanUpInterval = config.getInteger( - ConfigConstants.JOB_MANAGER_WEB_BACK_PRESSURE_CLEAN_UP_INTERVAL, - ConfigConstants.DEFAULT_JOB_MANAGER_WEB_BACK_PRESSURE_CLEAN_UP_INTERVAL); + int cleanUpInterval = config.getInteger(JobManagerOptions.WEB_BACKPRESSURE_CLEANUP_INTERVAL); - int refreshInterval = config.getInteger( - ConfigConstants.JOB_MANAGER_WEB_BACK_PRESSURE_REFRESH_INTERVAL, - ConfigConstants.DEFAULT_JOB_MANAGER_WEB_BACK_PRESSURE_REFRESH_INTERVAL); + int refreshInterval = config.getInteger(JobManagerOptions.WEB_BACKPRESSURE_REFRESH_INTERVAL); - int numSamples = config.getInteger( - ConfigConstants.JOB_MANAGER_WEB_BACK_PRESSURE_NUM_SAMPLES, - ConfigConstants.DEFAULT_JOB_MANAGER_WEB_BACK_PRESSURE_NUM_SAMPLES); + int numSamples = config.getInteger(JobManagerOptions.WEB_BACKPRESSURE_NUM_SAMPLES); - int delay = config.getInteger( - ConfigConstants.JOB_MANAGER_WEB_BACK_PRESSURE_DELAY, - ConfigConstants.DEFAULT_JOB_MANAGER_WEB_BACK_PRESSURE_DELAY); + int delay = config.getInteger(JobManagerOptions.WEB_BACKPRESSURE_DELAY); Time delayBetweenSamples = Time.milliseconds(delay); @@ -219,10 +212,7 @@ public class WebRuntimeMonitor implements WebMonitor { ExecutionContextExecutor context = ExecutionContext$.MODULE$.fromExecutor(executorService); // Config to enable https access to the web-ui - boolean enableSSL = config.getBoolean( - ConfigConstants.JOB_MANAGER_WEB_SSL_ENABLED, - ConfigConstants.DEFAULT_JOB_MANAGER_WEB_SSL_ENABLED) && - SSLUtils.getSSLEnabled(config); + boolean enableSSL = config.getBoolean(JobManagerOptions.WEB_SSL_ENABLED) && SSLUtils.getSSLEnabled(config); if (enableSSL) { LOG.info("Enabling ssl for the web frontend"); @@ -310,9 +300,7 @@ public class WebRuntimeMonitor implements WebMonitor { // DELETE is the preferred way of stopping a job (Rest-conform) DELETE(router, new JobStoppingHandler()); - int maxCachedEntries = config.getInteger( - ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE, - ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE); + int maxCachedEntries = config.getInteger(JobManagerOptions.WEB_CHECKPOINTS_HISTORY_SIZE); CheckpointStatsCache cache = new CheckpointStatsCache(maxCachedEntries); // Register the checkpoint stats handlers @@ -525,14 +513,14 @@ public class WebRuntimeMonitor implements WebMonitor { } private String getBaseDirStr(Configuration configuration) { - return configuration.getString(ConfigConstants.JOB_MANAGER_WEB_TMPDIR_KEY, System.getProperty("java.io.tmpdir")); + return configuration.getString(JobManagerOptions.WEB_TMP_DIR); } private File getUploadDir(Configuration configuration) { - File baseDir = new File(configuration.getString(ConfigConstants.JOB_MANAGER_WEB_UPLOAD_DIR_KEY, + File baseDir = new File(configuration.getString(JobManagerOptions.WEB_UPLOAD_DIR, getBaseDirStr(configuration))); - boolean uploadDirSpecified = configuration.containsKey(ConfigConstants.JOB_MANAGER_WEB_UPLOAD_DIR_KEY); + boolean uploadDirSpecified = configuration.contains(JobManagerOptions.WEB_UPLOAD_DIR); return uploadDirSpecified ? baseDir : new File(baseDir, "flink-web-" + UUID.randomUUID()); } } diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java index 5ccfe909218..a51a2340b98 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java @@ -25,6 +25,7 @@ import org.apache.curator.test.TestingServer; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; @@ -137,8 +138,8 @@ public class WebRuntimeMonitorITCase extends TestLogger { Path logFile = Files.createFile(new File(logDir, "jobmanager.log").toPath()); Files.createFile(new File(logDir, "jobmanager.out").toPath()); - config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0); - config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logFile.toString()); + config.setInteger(JobManagerOptions.WEB_PORT, 0); + config.setString(JobManagerOptions.WEB_LOG_PATH, logFile.toString()); highAvailabilityServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices( config, @@ -286,8 +287,8 @@ public class WebRuntimeMonitorITCase extends TestLogger { Files.createFile(new File(logDir, "jobmanager.out").toPath()); final Configuration config = new Configuration(); - config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0); - config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logFile.toString()); + config.setInteger(JobManagerOptions.WEB_PORT, 0); + config.setString(JobManagerOptions.WEB_LOG_PATH, logFile.toString()); config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zooKeeper.getConnectString()); @@ -463,8 +464,8 @@ public class WebRuntimeMonitorITCase extends TestLogger { // Web frontend on random port Configuration config = new Configuration(); - config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0); - config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logFile.toString()); + config.setInteger(JobManagerOptions.WEB_PORT, 0); + config.setString(JobManagerOptions.WEB_LOG_PATH, logFile.toString()); WebRuntimeMonitor webMonitor = new WebRuntimeMonitor( config, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java index 88863e48519..aa28fbcae24 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java @@ -25,6 +25,7 @@ import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; @@ -204,9 +205,7 @@ public class ExecutionGraphBuilder { } // Maximum number of remembered checkpoints - int historySize = jobManagerConfig.getInteger( - ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE, - ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE); + int historySize = jobManagerConfig.getInteger(JobManagerOptions.WEB_CHECKPOINTS_HISTORY_SIZE); CheckpointStatsTracker checkpointStatsTracker = new CheckpointStatsTracker( historySize, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java index 2baadb50b2f..dd9527ea328 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java @@ -25,8 +25,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ArrayNode; import java.net.URI; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; @@ -80,14 +80,14 @@ public final class WebMonitorUtils { if (logFilePath == null) { LOG.warn("Log file environment variable '{}' is not set.", logEnv); - logFilePath = config.getString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, null); + logFilePath = config.getString(JobManagerOptions.WEB_LOG_PATH); } // not configured, cannot serve log files if (logFilePath == null || logFilePath.length() < 4) { LOG.warn("JobManager log files are unavailable in the web dashboard. " + "Log file location not found in environment variable '{}' or configuration key '{}'.", - logEnv, ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY); + logEnv, JobManagerOptions.WEB_LOG_PATH.key()); return new LogFileLocation(null, null); } diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 5092643c490..57a6415c319 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -2532,8 +2532,7 @@ object JobManager { val restartStrategy = RestartStrategyFactory.createRestartStrategyFactory(configuration) - val archiveCount = configuration.getInteger(ConfigConstants.JOB_MANAGER_WEB_ARCHIVE_COUNT, - ConfigConstants.DEFAULT_JOB_MANAGER_WEB_ARCHIVE_COUNT) + val archiveCount = configuration.getInteger(JobManagerOptions.WEB_ARCHIVE_COUNT) val archiveDir = configuration.getString(JobManagerOptions.ARCHIVE_DIR) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index aad3a4b74c3..97117d214eb 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -1661,10 +1661,6 @@ public abstract class StreamExecutionEnvironment { public static StreamExecutionEnvironment createLocalEnvironmentWithWebUI(Configuration conf) { checkNotNull(conf, "conf"); - if (!conf.containsKey(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY)) { - int port = ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT; - conf.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, port); - } conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true); LocalStreamEnvironment localEnv = new LocalStreamEnvironment(conf); diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java index f96ab3de47d..437dd5f1deb 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java @@ -30,6 +30,7 @@ import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.core.testutils.CommonTestUtils; import org.apache.flink.runtime.messages.TaskManagerMessages; @@ -148,8 +149,8 @@ public class TestBaseUtils extends TestLogger { config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, DEFAULT_AKKA_ASK_TIMEOUT + "s"); config.setString(ConfigConstants.AKKA_STARTUP_TIMEOUT, DEFAULT_AKKA_STARTUP_TIMEOUT); - config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 8081); - config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logFile.toString()); + config.setInteger(JobManagerOptions.WEB_PORT, 8081); + config.setString(JobManagerOptions.WEB_LOG_PATH, logFile.toString()); config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, logFile.toString()); diff --git a/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java b/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java index 003eb0c57e7..538ac98370f 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java @@ -24,12 +24,15 @@ import com.fasterxml.jackson.databind.node.ArrayNode; import org.apache.commons.io.FileUtils; +import org.apache.commons.math3.optim.nonlinear.vector.JacobianMultivariateVectorOptimizer; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobmanager.JobManagerCliOptions; import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.runtime.testutils.StoppableInvokable; import org.apache.flink.runtime.webmonitor.WebMonitorUtils; @@ -84,7 +87,7 @@ public class WebFrontendITCase extends TestLogger { Files.createFile(logFile.toPath()); Files.createFile(outFile.toPath()); - config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logFile.getAbsolutePath()); + config.setString(JobManagerOptions.WEB_LOG_PATH, logFile.getAbsolutePath()); config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, logFile.getAbsolutePath()); cluster = new LocalFlinkMiniCluster(config, false); diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java index b62f957363b..64417f64aca 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java @@ -372,8 +372,7 @@ public class YarnApplicationMasterRunner { LOG); String protocol = "http://"; - if (config.getBoolean(ConfigConstants.JOB_MANAGER_WEB_SSL_ENABLED, - ConfigConstants.DEFAULT_JOB_MANAGER_WEB_SSL_ENABLED) && SSLUtils.getSSLEnabled(config)) { + if (config.getBoolean(JobManagerOptions.WEB_SSL_ENABLED) && SSLUtils.getSSLEnabled(config)) { protocol = "https://"; } final String webMonitorURL = webMonitor == null ? null : -- GitLab