提交 c1025470 编写于 作者: Z zentol

[FLINK-6461] Deprecate web config defaults in ConfigConstants

This closes #3831.
上级 4ab39381
......@@ -18,8 +18,8 @@
package org.apache.flink.client.program;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.clusterframework.messages.GetClusterStatus;
import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
......@@ -53,8 +53,7 @@ public class StandaloneClusterClient extends ClusterClient {
@Override
public String getWebInterfaceURL() {
String host = this.getJobManagerAddress().getHostString();
int port = getFlinkConfiguration().getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY,
ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT);
int port = getFlinkConfiguration().getInteger(JobManagerOptions.WEB_PORT);
return "http://" + host + ":" + port;
}
......
......@@ -1338,36 +1338,81 @@ public final class ConfigConstants {
key("jobmanager.web.address")
.noDefaultValue();
/** The config key for the port of the JobManager web frontend.
* Setting this value to {@code -1} disables the web frontend. */
/**
* The config key for the port of the JobManager web frontend.
* Setting this value to {@code -1} disables the web frontend.
*
* @deprecated use {@link JobManagerOptions#WEB_PORT} instead
*/
@Deprecated
public static final int DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT = 8081;
/** Default value to override SSL support for the JobManager web UI */
/**
* Default value to override SSL support for the JobManager web UI
*
* @deprecated use {@link JobManagerOptions#WEB_SSL_ENABLED} instead
*/
@Deprecated
public static final boolean DEFAULT_JOB_MANAGER_WEB_SSL_ENABLED = true;
/** The default number of archived jobs for the jobmanager */
/**
* The default number of archived jobs for the jobmanager
*
* @deprecated use {@link JobManagerOptions#WEB_ARCHIVE_COUNT} instead
*/
@Deprecated
public static final int DEFAULT_JOB_MANAGER_WEB_ARCHIVE_COUNT = 5;
/** By default, submitting jobs from the web-frontend is allowed. */
/**
* By default, submitting jobs from the web-frontend is allowed.
*
* @deprecated use {@link JobManagerOptions#WEB_SUBMIT_ENABLE} instead
*/
@Deprecated
public static final boolean DEFAULT_JOB_MANAGER_WEB_SUBMIT_ENABLED = true;
/** @deprecated Config key has been deprecated. Therefore, no default value required. */
@Deprecated
public static final boolean DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_DISABLE = false;
/** Default number of checkpoints to remember for recent history. */
/**
* Default number of checkpoints to remember for recent history.
*
* @deprecated use {@link JobManagerOptions#WEB_CHECKPOINTS_HISTORY_SIZE} instead
*/
@Deprecated
public static final int DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE = 10;
/** Time after which cached stats are cleaned up. */
/**
* Time after which cached stats are cleaned up.
*
* @@deprecated use {@link JobManagerOptions#WEB_BACKPRESSURE_CLEANUP_INTERVAL} instead
*/
@Deprecated
public static final int DEFAULT_JOB_MANAGER_WEB_BACK_PRESSURE_CLEAN_UP_INTERVAL = 10 * 60 * 1000;
/** Time after which available stats are deprecated and need to be refreshed (by resampling). */
/**
* Time after which available stats are deprecated and need to be refreshed (by resampling).
*
* @deprecated use {@link JobManagerOptions#WEB_BACKPRESSURE_REFRESH_INTERVAL} instead
*/
@Deprecated
public static final int DEFAULT_JOB_MANAGER_WEB_BACK_PRESSURE_REFRESH_INTERVAL = 60 * 1000;
/** Number of samples to take to determine back pressure. */
/**
* Number of samples to take to determine back pressure.
*
* @deprecated use {@link JobManagerOptions#WEB_BACKPRESSURE_NUM_SAMPLES} instead
*/
@Deprecated
public static final int DEFAULT_JOB_MANAGER_WEB_BACK_PRESSURE_NUM_SAMPLES = 100;
/** Delay between samples to determine back pressure. */
/**
* Delay between samples to determine back pressure.
*
* @deprecated use {@link JobManagerOptions#WEB_BACKPRESSURE_DELAY} instead
*/
@Deprecated
public static final int DEFAULT_JOB_MANAGER_WEB_BACK_PRESSURE_DELAY = 50;
// ------------------------------ Akka Values ------------------------------
......
......@@ -115,6 +115,13 @@ public class JobManagerOptions {
key("jobmanager.web.access-control-allow-origin")
.defaultValue("*");
/**
* The config parameter defining the refresh interval for the web-frontend.
*/
public static final ConfigOption<Long> WEB_REFRESH_INTERVAL =
key("jobmanager.web.refresh-interval")
.defaultValue(3000L);
/**
* Config parameter to override SSL support for the JobManager Web UI
*/
......
......@@ -1235,10 +1235,6 @@ public abstract class ExecutionEnvironment {
public static ExecutionEnvironment createLocalEnvironmentWithWebUI(Configuration conf) {
checkNotNull(conf, "conf");
if (!conf.containsKey(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY)) {
int port = ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT;
conf.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, port);
}
conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
LocalEnvironment localEnv = new LocalEnvironment(conf);
......
......@@ -18,39 +18,11 @@
package org.apache.flink.runtime.webmonitor;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
public class WebMonitorConfig {
// ------------------------------------------------------------------------
// Config Keys
// ------------------------------------------------------------------------
/** The port for the runtime monitor web-frontend server. */
public static final String JOB_MANAGER_WEB_PORT_KEY = ConfigConstants.JOB_MANAGER_WEB_PORT_KEY;
/** The initial refresh interval for the web dashboard */
public static final String JOB_MANAGER_WEB_REFRESH_INTERVAL_KEY = "jobmanager.web.refresh-interval";
// ------------------------------------------------------------------------
// Default values
// ------------------------------------------------------------------------
/** Default port for the web dashboard (= 8081) */
public static final int DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT = ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT;
/** Default refresh interval for the web dashboard (= 3000 msecs) */
public static final long DEFAULT_JOB_MANAGER_WEB_REFRESH_INTERVAL = 3000;
// ------------------------------------------------------------------------
// Config
// ------------------------------------------------------------------------
/** The configuration queried by this config object */
private final Configuration config;
......@@ -67,17 +39,15 @@ public class WebMonitorConfig {
}
public int getWebFrontendPort() {
return config.getInteger(JOB_MANAGER_WEB_PORT_KEY, DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT);
return config.getInteger(JobManagerOptions.WEB_PORT);
}
public long getRefreshInterval() {
return config.getLong(JOB_MANAGER_WEB_REFRESH_INTERVAL_KEY, DEFAULT_JOB_MANAGER_WEB_REFRESH_INTERVAL);
return config.getLong(JobManagerOptions.WEB_REFRESH_INTERVAL);
}
public boolean isProgramSubmitEnabled() {
return config.getBoolean(
ConfigConstants.JOB_MANAGER_WEB_SUBMIT_ENABLED_KEY,
ConfigConstants.DEFAULT_JOB_MANAGER_WEB_SUBMIT_ENABLED);
return config.getBoolean(JobManagerOptions.WEB_SUBMIT_ENABLE);
}
public String getAllowOrigin() {
......
......@@ -25,6 +25,7 @@ import io.netty.handler.codec.http.router.Router;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.jobmanager.MemoryArchivist;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
......@@ -191,21 +192,13 @@ public class WebRuntimeMonitor implements WebMonitor {
stackTraceSamples = new StackTraceSampleCoordinator(actorSystem.dispatcher(), 60000);
// Back pressure stats tracker config
int cleanUpInterval = config.getInteger(
ConfigConstants.JOB_MANAGER_WEB_BACK_PRESSURE_CLEAN_UP_INTERVAL,
ConfigConstants.DEFAULT_JOB_MANAGER_WEB_BACK_PRESSURE_CLEAN_UP_INTERVAL);
int cleanUpInterval = config.getInteger(JobManagerOptions.WEB_BACKPRESSURE_CLEANUP_INTERVAL);
int refreshInterval = config.getInteger(
ConfigConstants.JOB_MANAGER_WEB_BACK_PRESSURE_REFRESH_INTERVAL,
ConfigConstants.DEFAULT_JOB_MANAGER_WEB_BACK_PRESSURE_REFRESH_INTERVAL);
int refreshInterval = config.getInteger(JobManagerOptions.WEB_BACKPRESSURE_REFRESH_INTERVAL);
int numSamples = config.getInteger(
ConfigConstants.JOB_MANAGER_WEB_BACK_PRESSURE_NUM_SAMPLES,
ConfigConstants.DEFAULT_JOB_MANAGER_WEB_BACK_PRESSURE_NUM_SAMPLES);
int numSamples = config.getInteger(JobManagerOptions.WEB_BACKPRESSURE_NUM_SAMPLES);
int delay = config.getInteger(
ConfigConstants.JOB_MANAGER_WEB_BACK_PRESSURE_DELAY,
ConfigConstants.DEFAULT_JOB_MANAGER_WEB_BACK_PRESSURE_DELAY);
int delay = config.getInteger(JobManagerOptions.WEB_BACKPRESSURE_DELAY);
Time delayBetweenSamples = Time.milliseconds(delay);
......@@ -219,10 +212,7 @@ public class WebRuntimeMonitor implements WebMonitor {
ExecutionContextExecutor context = ExecutionContext$.MODULE$.fromExecutor(executorService);
// Config to enable https access to the web-ui
boolean enableSSL = config.getBoolean(
ConfigConstants.JOB_MANAGER_WEB_SSL_ENABLED,
ConfigConstants.DEFAULT_JOB_MANAGER_WEB_SSL_ENABLED) &&
SSLUtils.getSSLEnabled(config);
boolean enableSSL = config.getBoolean(JobManagerOptions.WEB_SSL_ENABLED) && SSLUtils.getSSLEnabled(config);
if (enableSSL) {
LOG.info("Enabling ssl for the web frontend");
......@@ -310,9 +300,7 @@ public class WebRuntimeMonitor implements WebMonitor {
// DELETE is the preferred way of stopping a job (Rest-conform)
DELETE(router, new JobStoppingHandler());
int maxCachedEntries = config.getInteger(
ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE,
ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE);
int maxCachedEntries = config.getInteger(JobManagerOptions.WEB_CHECKPOINTS_HISTORY_SIZE);
CheckpointStatsCache cache = new CheckpointStatsCache(maxCachedEntries);
// Register the checkpoint stats handlers
......@@ -525,14 +513,14 @@ public class WebRuntimeMonitor implements WebMonitor {
}
private String getBaseDirStr(Configuration configuration) {
return configuration.getString(ConfigConstants.JOB_MANAGER_WEB_TMPDIR_KEY, System.getProperty("java.io.tmpdir"));
return configuration.getString(JobManagerOptions.WEB_TMP_DIR);
}
private File getUploadDir(Configuration configuration) {
File baseDir = new File(configuration.getString(ConfigConstants.JOB_MANAGER_WEB_UPLOAD_DIR_KEY,
File baseDir = new File(configuration.getString(JobManagerOptions.WEB_UPLOAD_DIR,
getBaseDirStr(configuration)));
boolean uploadDirSpecified = configuration.containsKey(ConfigConstants.JOB_MANAGER_WEB_UPLOAD_DIR_KEY);
boolean uploadDirSpecified = configuration.contains(JobManagerOptions.WEB_UPLOAD_DIR);
return uploadDirSpecified ? baseDir : new File(baseDir, "flink-web-" + UUID.randomUUID());
}
}
......@@ -25,6 +25,7 @@ import org.apache.curator.test.TestingServer;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
......@@ -137,8 +138,8 @@ public class WebRuntimeMonitorITCase extends TestLogger {
Path logFile = Files.createFile(new File(logDir, "jobmanager.log").toPath());
Files.createFile(new File(logDir, "jobmanager.out").toPath());
config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logFile.toString());
config.setInteger(JobManagerOptions.WEB_PORT, 0);
config.setString(JobManagerOptions.WEB_LOG_PATH, logFile.toString());
highAvailabilityServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
config,
......@@ -286,8 +287,8 @@ public class WebRuntimeMonitorITCase extends TestLogger {
Files.createFile(new File(logDir, "jobmanager.out").toPath());
final Configuration config = new Configuration();
config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logFile.toString());
config.setInteger(JobManagerOptions.WEB_PORT, 0);
config.setString(JobManagerOptions.WEB_LOG_PATH, logFile.toString());
config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zooKeeper.getConnectString());
......@@ -463,8 +464,8 @@ public class WebRuntimeMonitorITCase extends TestLogger {
// Web frontend on random port
Configuration config = new Configuration();
config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logFile.toString());
config.setInteger(JobManagerOptions.WEB_PORT, 0);
config.setString(JobManagerOptions.WEB_LOG_PATH, logFile.toString());
WebRuntimeMonitor webMonitor = new WebRuntimeMonitor(
config,
......
......@@ -25,6 +25,7 @@ import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
......@@ -204,9 +205,7 @@ public class ExecutionGraphBuilder {
}
// Maximum number of remembered checkpoints
int historySize = jobManagerConfig.getInteger(
ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE,
ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE);
int historySize = jobManagerConfig.getInteger(JobManagerOptions.WEB_CHECKPOINTS_HISTORY_SIZE);
CheckpointStatsTracker checkpointStatsTracker = new CheckpointStatsTracker(
historySize,
......
......@@ -25,8 +25,8 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import java.net.URI;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
......@@ -80,14 +80,14 @@ public final class WebMonitorUtils {
if (logFilePath == null) {
LOG.warn("Log file environment variable '{}' is not set.", logEnv);
logFilePath = config.getString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, null);
logFilePath = config.getString(JobManagerOptions.WEB_LOG_PATH);
}
// not configured, cannot serve log files
if (logFilePath == null || logFilePath.length() < 4) {
LOG.warn("JobManager log files are unavailable in the web dashboard. " +
"Log file location not found in environment variable '{}' or configuration key '{}'.",
logEnv, ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY);
logEnv, JobManagerOptions.WEB_LOG_PATH.key());
return new LogFileLocation(null, null);
}
......
......@@ -2532,8 +2532,7 @@ object JobManager {
val restartStrategy = RestartStrategyFactory.createRestartStrategyFactory(configuration)
val archiveCount = configuration.getInteger(ConfigConstants.JOB_MANAGER_WEB_ARCHIVE_COUNT,
ConfigConstants.DEFAULT_JOB_MANAGER_WEB_ARCHIVE_COUNT)
val archiveCount = configuration.getInteger(JobManagerOptions.WEB_ARCHIVE_COUNT)
val archiveDir = configuration.getString(JobManagerOptions.ARCHIVE_DIR)
......
......@@ -1661,10 +1661,6 @@ public abstract class StreamExecutionEnvironment {
public static StreamExecutionEnvironment createLocalEnvironmentWithWebUI(Configuration conf) {
checkNotNull(conf, "conf");
if (!conf.containsKey(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY)) {
int port = ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT;
conf.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, port);
}
conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
LocalStreamEnvironment localEnv = new LocalStreamEnvironment(conf);
......
......@@ -30,6 +30,7 @@ import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.runtime.messages.TaskManagerMessages;
......@@ -148,8 +149,8 @@ public class TestBaseUtils extends TestLogger {
config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, DEFAULT_AKKA_ASK_TIMEOUT + "s");
config.setString(ConfigConstants.AKKA_STARTUP_TIMEOUT, DEFAULT_AKKA_STARTUP_TIMEOUT);
config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 8081);
config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logFile.toString());
config.setInteger(JobManagerOptions.WEB_PORT, 8081);
config.setString(JobManagerOptions.WEB_LOG_PATH, logFile.toString());
config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, logFile.toString());
......
......@@ -24,12 +24,15 @@ import com.fasterxml.jackson.databind.node.ArrayNode;
import org.apache.commons.io.FileUtils;
import org.apache.commons.math3.optim.nonlinear.vector.JacobianMultivariateVectorOptimizer;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.JobManagerCliOptions;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.runtime.testutils.StoppableInvokable;
import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
......@@ -84,7 +87,7 @@ public class WebFrontendITCase extends TestLogger {
Files.createFile(logFile.toPath());
Files.createFile(outFile.toPath());
config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logFile.getAbsolutePath());
config.setString(JobManagerOptions.WEB_LOG_PATH, logFile.getAbsolutePath());
config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, logFile.getAbsolutePath());
cluster = new LocalFlinkMiniCluster(config, false);
......
......@@ -372,8 +372,7 @@ public class YarnApplicationMasterRunner {
LOG);
String protocol = "http://";
if (config.getBoolean(ConfigConstants.JOB_MANAGER_WEB_SSL_ENABLED,
ConfigConstants.DEFAULT_JOB_MANAGER_WEB_SSL_ENABLED) && SSLUtils.getSSLEnabled(config)) {
if (config.getBoolean(JobManagerOptions.WEB_SSL_ENABLED) && SSLUtils.getSSLEnabled(config)) {
protocol = "https://";
}
final String webMonitorURL = webMonitor == null ? null :
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册