提交 d63d704e 编写于 作者: Z zjureel 提交者: zentol

[FLINK-6494] [RM][Yarn][Mesos] Migrate ResourceManager/Yarn/Mesos configuration options

This closes #4075.
上级 65391805
......@@ -135,7 +135,9 @@ public final class ConfigConstants {
/**
* The config parameter defining the network port to connect to
* for communication with the resource manager.
* @deprecated Use {@link ResourceManagerOptions#IPC_PORT} instead.
*/
@Deprecated
public static final String RESOURCE_MANAGER_IPC_PORT_KEY = "resourcemanager.rpc.port";
/**
......@@ -349,12 +351,16 @@ public final class ConfigConstants {
/**
* Percentage of heap space to remove from containers (YARN / Mesos), to compensate
* for other JVM memory usage.
* @deprecated Use {@link ResourceManagerOptions#CONTAINERIZED_HEAP_CUTOFF_RATIO} instead.
*/
@Deprecated
public static final String CONTAINERIZED_HEAP_CUTOFF_RATIO = "containerized.heap-cutoff-ratio";
/**
* Minimum amount of heap memory to remove in containers, as a safety margin.
* @deprecated Use {@link ResourceManagerOptions#CONTAINERIZED_HEAP_CUTOFF_MIN} instead.
*/
@Deprecated
public static final String CONTAINERIZED_HEAP_CUTOFF_MIN = "containerized.heap-cutoff-min";
/**
......@@ -362,13 +368,17 @@ public final class ConfigConstants {
* For example for passing LD_LIBRARY_PATH as an env variable to the AppMaster, set:
* containerized.master.env.LD_LIBRARY_PATH: "/usr/lib/native"
* in the flink-conf.yaml.
* @deprecated Use {@link ResourceManagerOptions#CONTAINERIZED_MASTER_ENV_PREFIX} instead.
*/
@Deprecated
public static final String CONTAINERIZED_MASTER_ENV_PREFIX = "containerized.master.env.";
/**
* Similar to the {@see CONTAINERIZED_MASTER_ENV_PREFIX}, this configuration prefix allows
* setting custom environment variables for the workers (TaskManagers)
* @deprecated Use {@link ResourceManagerOptions#CONTAINERIZED_TASK_MANAGER_ENV_PREFIX} instead.
*/
@Deprecated
public static final String CONTAINERIZED_TASK_MANAGER_ENV_PREFIX = "containerized.taskmanager.env.";
......@@ -376,7 +386,9 @@ public final class ConfigConstants {
/**
* The vcores exposed by YARN.
* @deprecated in favor of {@code YarnConfigOptions#VCORES}.
*/
@Deprecated
public static final String YARN_VCORES = "yarn.containers.vcores";
/**
......@@ -406,7 +418,9 @@ public final class ConfigConstants {
* the YARN session / job on YARN.
*
* By default, we take the number of of initially requested containers.
* @deprecated in favor of {@code YarnConfigOptions#MAX_FAILED_CONTAINERS}.
*/
@Deprecated
public static final String YARN_MAX_FAILED_CONTAINERS = "yarn.maximum-failed-containers";
/**
......@@ -414,14 +428,18 @@ public final class ConfigConstants {
* availability mode. This value is usually limited by YARN.
*
* By default, it's 1 in the standalone case and 2 in the high availability case.
* @deprecated in favor of {@code YarnConfigOptions#APPLICATION_ATTEMPTS}.
*/
@Deprecated
public static final String YARN_APPLICATION_ATTEMPTS = "yarn.application-attempts";
/**
* The heartbeat interval between the Application Master and the YARN Resource Manager.
*
* The default value is 5 (seconds).
* @deprecated in favor of {@code YarnConfigOptions#HEARTBEAT_DELAY_SECONDS}.
*/
@Deprecated
public static final String YARN_HEARTBEAT_DELAY_SECONDS = "yarn.heartbeat-delay";
/**
......@@ -429,8 +447,10 @@ public final class ConfigConstants {
* processing slots is written into a properties file, so that the Flink client is able
* to pick those details up.
* This configuration parameter allows changing the default location of that file (for example
* for environments sharing a Flink installation between users)
* for environments sharing a Flink installation between users).
* @deprecated in favor of {@code YarnConfigOptions#PROPERTIES_FILE_LOCATION}.
*/
@Deprecated
public static final String YARN_PROPERTIES_FILE_LOCATION = "yarn.properties-file.location";
/**
......@@ -474,12 +494,16 @@ public final class ConfigConstants {
* or a list of ranges and or points: "50100-50200,50300-50400,51234"
*
* Setting the port to 0 will let the OS choose an available port.
* @deprecated in favor of {@code YarnConfigOptions#APPLICATION_MASTER_PORT}.
*/
@Deprecated
public static final String YARN_APPLICATION_MASTER_PORT = "yarn.application-master.port";
/**
* A comma-separated list of strings to use as YARN application tags.
* @deprecated in favor of {@code YarnConfigOptions#APPLICATION_TAGS}.
*/
@Deprecated
public static final String YARN_APPLICATION_TAGS = "yarn.tags";
......@@ -487,7 +511,9 @@ public final class ConfigConstants {
/**
* The initial number of Mesos tasks to allocate.
* @deprecated in favor of {@code MesosOptions#INITIAL_TASKS}.
*/
@Deprecated
public static final String MESOS_INITIAL_TASKS = "mesos.initial-tasks";
/**
......@@ -495,7 +521,9 @@ public final class ConfigConstants {
* the Mesos session / job on Mesos.
*
* By default, we take the number of of initially requested tasks.
* @deprecated in favor of {@code MesosOptions#MAX_FAILED_TASKS}.
*/
@Deprecated
public static final String MESOS_MAX_FAILED_TASKS = "mesos.maximum-failed-tasks";
/**
......@@ -510,36 +538,53 @@ public final class ConfigConstants {
* file:///path/to/file (where file contains one of the above)
* }
* </pre>
*
* @deprecated in favor of {@code MesosOptions#MASTER_URL}.
*/
@Deprecated
public static final String MESOS_MASTER_URL = "mesos.master";
/**
* The failover timeout for the Mesos scheduler, after which running tasks are automatically shut down.
*
* The default value is 600 (seconds).
* @deprecated in favor of {@code MesosOptions#FAILOVER_TIMEOUT_SECONDS}.
*/
@Deprecated
public static final String MESOS_FAILOVER_TIMEOUT_SECONDS = "mesos.failover-timeout";
/**
* The config parameter defining the Mesos artifact server port to use.
* Setting the port to 0 will let the OS choose an available port.
* @deprecated in favor of {@code MesosOptions#ARTIFACT_SERVER_PORT_KEY}.
*/
@Deprecated
public static final String MESOS_ARTIFACT_SERVER_PORT_KEY = "mesos.resourcemanager.artifactserver.port";
/** @deprecated in favor of {@code MesosOptions#RESOURCEMANAGER_FRAMEWORK_NAME}. */
@Deprecated
public static final String MESOS_RESOURCEMANAGER_FRAMEWORK_NAME = "mesos.resourcemanager.framework.name";
/** @deprecated in favor of {@code MesosOptions#RESOURCEMANAGER_FRAMEWORK_ROLE}. */
@Deprecated
public static final String MESOS_RESOURCEMANAGER_FRAMEWORK_ROLE = "mesos.resourcemanager.framework.role";
/** @deprecated in favor of {@code MesosOptions#RESOURCEMANAGER_FRAMEWORK_PRINCIPAL}. */
@Deprecated
public static final String MESOS_RESOURCEMANAGER_FRAMEWORK_PRINCIPAL = "mesos.resourcemanager.framework.principal";
/** @deprecated in favor of {@code MesosOptions#RESOURCEMANAGER_FRAMEWORK_SECRET}. */
@Deprecated
public static final String MESOS_RESOURCEMANAGER_FRAMEWORK_SECRET = "mesos.resourcemanager.framework.secret";
/** @deprecated in favor of {@code MesosOptions#RESOURCEMANAGER_FRAMEWORK_USER}. */
@Deprecated
public static final String MESOS_RESOURCEMANAGER_FRAMEWORK_USER = "mesos.resourcemanager.framework.user";
/**
* Config parameter to override SSL support for the Artifact Server
* @deprecated in favor of {@code MesosOptions#ARTIFACT_SERVER_SSL_ENABLED}.
*/
@Deprecated
public static final String MESOS_ARTIFACT_SERVER_SSL_ENABLED = "mesos.resourcemanager.artifactserver.ssl.enabled";
// ------------------------ Hadoop Configuration ------------------------
......@@ -1218,7 +1263,9 @@ public final class ConfigConstants {
/**
* The default network port of the resource manager.
* @deprecated Use {@link ResourceManagerOptions#IPC_PORT} instead.
*/
@Deprecated
public static final int DEFAULT_RESOURCE_MANAGER_IPC_PORT = 0;
/**
......@@ -1378,13 +1425,17 @@ public final class ConfigConstants {
/**
* Minimum amount of memory to subtract from the process memory to get the TaskManager
* heap size. We came up with these values experimentally.
* @deprecated Use {@link ResourceManagerOptions#CONTAINERIZED_HEAP_CUTOFF_MIN} instead.
*/
@Deprecated
public static final int DEFAULT_YARN_HEAP_CUTOFF = 600;
/**
* Relative amount of memory to subtract from Java process memory to get the TaskManager
* heap size
* heap size.
* @deprecated Use {@link ResourceManagerOptions#CONTAINERIZED_HEAP_CUTOFF_RATIO} instead.
*/
@Deprecated
public static final float DEFAULT_YARN_HEAP_CUTOFF_RATIO = 0.25f;
/**
......@@ -1395,31 +1446,49 @@ public final class ConfigConstants {
/**
* Default port for the application master is 0, which means
* the operating system assigns an ephemeral port
* the operating system assigns an ephemeral port.
* @deprecated in favor of {@code YarnConfigOptions#APPLICATION_MASTER_PORT}.
*/
@Deprecated
public static final String DEFAULT_YARN_JOB_MANAGER_PORT = "0";
// ------ Mesos-Specific Configuration ------
// For more configuration entries please see {@code MesosTaskManagerParameters}.
/** The default failover timeout provided to Mesos (10 mins) */
/**
* The default failover timeout provided to Mesos (10 mins)
* @deprecated in favor of {@code MesosOptions#FAILOVER_TIMEOUT_SECONDS}.
*/
@Deprecated
public static final int DEFAULT_MESOS_FAILOVER_TIMEOUT_SECS = 10 * 60;
/**
* The default network port to listen on for the Mesos artifact server.
* @deprecated in favor of {@code MesosOptions#ARTIFACT_SERVER_PORT_KEY}.
*/
@Deprecated
public static final int DEFAULT_MESOS_ARTIFACT_SERVER_PORT = 0;
/**
* The default Mesos framework name for the ResourceManager to use.
* @deprecated in favor of {@code MesosOptions#RESOURCEMANAGER_FRAMEWORK_NAME}.
*/
@Deprecated
public static final String DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_NAME = "Flink";
/** @deprecated in favor of {@code MesosOptions#RESOURCEMANAGER_FRAMEWORK_ROLE}. */
@Deprecated
public static final String DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_ROLE = "*";
/** @deprecated in favor of {@code MesosOptions#RESOURCEMANAGER_FRAMEWORK_USER}. */
@Deprecated
public static final String DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_USER = "";
/** Default value to override SSL support for the Artifact Server */
/**
* Default value to override SSL support for the Artifact Server.
* @deprecated in favor of {@code MesosOptions#ARTIFACT_SERVER_SSL_ENABLED}.
*/
@Deprecated
public static final boolean DEFAULT_MESOS_ARTIFACT_SERVER_SSL_ENABLED = true;
// ------------------------ File System Behavior ------------------------
......@@ -1659,8 +1728,16 @@ public final class ConfigConstants {
public static final int DEFAULT_LOCAL_NUMBER_JOB_MANAGER = 1;
/**
* @deprecated Use {@link ResourceManagerOptions#LOCAL_NUMBER_RESOURCE_MANAGER} instead.
*/
@Deprecated
public static final String LOCAL_NUMBER_RESOURCE_MANAGER = "local.number-resourcemanager";
/**
* @deprecated Use {@link ResourceManagerOptions#LOCAL_NUMBER_RESOURCE_MANAGER} instead.
*/
@Deprecated
public static final int DEFAULT_LOCAL_NUMBER_RESOURCE_MANAGER = 1;
public static final String LOCAL_START_WEBSERVER = "local.start-webserver";
......
......@@ -33,6 +33,45 @@ public class ResourceManagerOptions {
.key("resourcemanager.job.timeout")
.defaultValue("5 minutes");
public static final ConfigOption<Integer> LOCAL_NUMBER_RESOURCE_MANAGER = ConfigOptions
.key("local.number-resourcemanager")
.defaultValue(1);
public static final ConfigOption<Integer> IPC_PORT = ConfigOptions
.key("resourcemanager.rpc.port")
.defaultValue(0);
/**
* Percentage of heap space to remove from containers (YARN / Mesos), to compensate
* for other JVM memory usage.
*/
public static final ConfigOption<Float> CONTAINERIZED_HEAP_CUTOFF_RATIO = ConfigOptions
.key("containerized.heap-cutoff-ratio")
.defaultValue(0.25f)
.withDeprecatedKeys("yarn.heap-cutoff-ratio");
/**
* Minimum amount of heap memory to remove in containers, as a safety margin.
*/
public static final ConfigOption<Integer> CONTAINERIZED_HEAP_CUTOFF_MIN = ConfigOptions
.key("containerized.heap-cutoff-min")
.defaultValue(600)
.withDeprecatedKeys("yarn.heap-cutoff-min");
/**
* Prefix for passing custom environment variables to Flink's master process.
* For example for passing LD_LIBRARY_PATH as an env variable to the AppMaster, set:
* containerized.master.env.LD_LIBRARY_PATH: "/usr/lib/native"
* in the flink-conf.yaml.
*/
public static final String CONTAINERIZED_MASTER_ENV_PREFIX = "containerized.master.env.";
/**
* Similar to the {@see CONTAINERIZED_MASTER_ENV_PREFIX}, this configuration prefix allows
* setting custom environment variables for the workers (TaskManagers)
*/
public static final String CONTAINERIZED_TASK_MANAGER_ENV_PREFIX = "containerized.taskmanager.env.";
// ---------------------------------------------------------------------------------------------
/** Not intended to be instantiated */
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.mesos.configuration;
import org.apache.flink.configuration.ConfigOption;
import static org.apache.flink.configuration.ConfigOptions.key;
/**
* The set of configuration options relating to mesos settings.
*/
public class MesosOptions {
/**
* The initial number of Mesos tasks to allocate.
*/
public static final ConfigOption<Integer> INITIAL_TASKS =
key("mesos.initial-tasks")
.defaultValue(0);
/**
* The maximum number of failed Mesos tasks before entirely stopping
* the Mesos session / job on Mesos.
*
* <p>By default, we take the number of initially requested tasks.
*/
public static final ConfigOption<Integer> MAX_FAILED_TASKS =
key("mesos.maximum-failed-tasks")
.defaultValue(-1);
/**
* The Mesos master URL.
*
* <p>The value should be in one of the following forms:
* <pre>
* {@code
* host:port
* zk://host1:port1,host2:port2,.../path
* zk://username:password@host1:port1,host2:port2,.../path
* file:///path/to/file (where file contains one of the above)
* }
* </pre>
*/
public static final ConfigOption<String> MASTER_URL =
key("mesos.master")
.noDefaultValue();
/**
* The failover timeout for the Mesos scheduler, after which running tasks are automatically shut down.
*/
public static final ConfigOption<Integer> FAILOVER_TIMEOUT_SECONDS =
key("mesos.failover-timeout")
.defaultValue(600);
/**
* The config parameter defining the Mesos artifact server port to use.
* Setting the port to 0 will let the OS choose an available port.
*/
public static final ConfigOption<Integer> ARTIFACT_SERVER_PORT =
key("mesos.resourcemanager.artifactserver.port")
.defaultValue(0);
public static final ConfigOption<String> RESOURCEMANAGER_FRAMEWORK_NAME =
key("mesos.resourcemanager.framework.name")
.defaultValue("Flink");
public static final ConfigOption<String> RESOURCEMANAGER_FRAMEWORK_ROLE =
key("mesos.resourcemanager.framework.role")
.defaultValue("*");
public static final ConfigOption<String> RESOURCEMANAGER_FRAMEWORK_PRINCIPAL =
key("mesos.resourcemanager.framework.principal")
.noDefaultValue();
public static final ConfigOption<String> RESOURCEMANAGER_FRAMEWORK_SECRET =
key("mesos.resourcemanager.framework.secret")
.noDefaultValue();
public static final ConfigOption<String> RESOURCEMANAGER_FRAMEWORK_USER =
key("mesos.resourcemanager.framework.user")
.defaultValue("");
/**
* Config parameter to override SSL support for the Artifact Server.
*/
public static final ConfigOption<Boolean> ARTIFACT_SERVER_SSL_ENABLED =
key("mesos.resourcemanager.artifactserver.ssl.enabled")
.defaultValue(true);
}
......@@ -18,12 +18,12 @@
package org.apache.flink.mesos.runtime.clusterframework;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.mesos.configuration.MesosOptions;
import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices;
import org.apache.flink.mesos.runtime.clusterframework.services.MesosServicesUtils;
import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
......@@ -264,8 +264,7 @@ public class MesosApplicationMasterRunner {
// try to start the artifact server
LOG.debug("Starting Artifact Server");
final int artifactServerPort = config.getInteger(ConfigConstants.MESOS_ARTIFACT_SERVER_PORT_KEY,
ConfigConstants.DEFAULT_MESOS_ARTIFACT_SERVER_PORT);
final int artifactServerPort = config.getInteger(MesosOptions.ARTIFACT_SERVER_PORT);
final String artifactServerPrefix = UUID.randomUUID().toString();
artifactServer = new MesosArtifactServer(artifactServerPrefix, akkaHostname, artifactServerPort, config);
......@@ -491,42 +490,38 @@ public class MesosApplicationMasterRunner {
.setHostname(hostname);
Protos.Credential.Builder credential = null;
if (!flinkConfig.containsKey(ConfigConstants.MESOS_MASTER_URL)) {
throw new IllegalConfigurationException(ConfigConstants.MESOS_MASTER_URL + " must be configured.");
if (!flinkConfig.contains(MesosOptions.MASTER_URL)) {
throw new IllegalConfigurationException(MesosOptions.MASTER_URL.key() + " must be configured.");
}
String masterUrl = flinkConfig.getString(ConfigConstants.MESOS_MASTER_URL, null);
String masterUrl = flinkConfig.getString(MesosOptions.MASTER_URL);
Duration failoverTimeout = FiniteDuration.apply(
flinkConfig.getInteger(
ConfigConstants.MESOS_FAILOVER_TIMEOUT_SECONDS,
ConfigConstants.DEFAULT_MESOS_FAILOVER_TIMEOUT_SECS),
MesosOptions.FAILOVER_TIMEOUT_SECONDS),
TimeUnit.SECONDS);
frameworkInfo.setFailoverTimeout(failoverTimeout.toSeconds());
frameworkInfo.setName(flinkConfig.getString(
ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_NAME,
ConfigConstants.DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_NAME));
MesosOptions.RESOURCEMANAGER_FRAMEWORK_NAME));
frameworkInfo.setRole(flinkConfig.getString(
ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_ROLE,
ConfigConstants.DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_ROLE));
MesosOptions.RESOURCEMANAGER_FRAMEWORK_ROLE));
frameworkInfo.setUser(flinkConfig.getString(
ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_USER,
ConfigConstants.DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_USER));
MesosOptions.RESOURCEMANAGER_FRAMEWORK_USER));
if (flinkConfig.containsKey(ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_PRINCIPAL)) {
if (flinkConfig.contains(MesosOptions.RESOURCEMANAGER_FRAMEWORK_PRINCIPAL)) {
frameworkInfo.setPrincipal(flinkConfig.getString(
ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_PRINCIPAL, null));
MesosOptions.RESOURCEMANAGER_FRAMEWORK_PRINCIPAL));
credential = Protos.Credential.newBuilder();
credential.setPrincipal(frameworkInfo.getPrincipal());
// some environments use a side-channel to communicate the secret to Mesos,
// and thus don't set the 'secret' configuration setting
if (flinkConfig.containsKey(ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_SECRET)) {
if (flinkConfig.contains(MesosOptions.RESOURCEMANAGER_FRAMEWORK_SECRET)) {
credential.setSecret(flinkConfig.getString(
ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_SECRET, null));
MesosOptions.RESOURCEMANAGER_FRAMEWORK_SECRET));
}
}
......
......@@ -19,9 +19,9 @@
package org.apache.flink.mesos.runtime.clusterframework;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.mesos.configuration.MesosOptions;
import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
import org.apache.flink.mesos.scheduler.ConnectionMonitor;
import org.apache.flink.mesos.scheduler.LaunchCoordinator;
......@@ -641,7 +641,7 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
String msg = "Stopping Mesos session because the number of failed tasks ("
+ failedTasksSoFar + ") exceeded the maximum failed tasks ("
+ maxFailedTasks + "). This number is controlled by the '"
+ ConfigConstants.MESOS_MAX_FAILED_TASKS + "' configuration setting. "
+ MesosOptions.MAX_FAILED_TASKS.key() + "' configuration setting. "
+ "By default its the number of requested tasks.";
LOG.error(msg);
......@@ -757,18 +757,18 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
Logger log) {
final int numInitialTaskManagers = flinkConfig.getInteger(
ConfigConstants.MESOS_INITIAL_TASKS, 0);
MesosOptions.INITIAL_TASKS);
if (numInitialTaskManagers >= 0) {
log.info("Mesos framework to allocate {} initial tasks",
numInitialTaskManagers);
}
else {
throw new IllegalConfigurationException("Invalid value for " +
ConfigConstants.MESOS_INITIAL_TASKS + ", which must be at least zero.");
MesosOptions.INITIAL_TASKS.key() + ", which must be at least zero.");
}
final int maxFailedTasks = flinkConfig.getInteger(
ConfigConstants.MESOS_MAX_FAILED_TASKS, numInitialTaskManagers);
MesosOptions.MAX_FAILED_TASKS.key(), numInitialTaskManagers);
if (maxFailedTasks >= 0) {
log.info("Mesos framework tolerates {} failed tasks before giving up",
maxFailedTasks);
......
......@@ -18,12 +18,12 @@
package org.apache.flink.mesos.util;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.mesos.configuration.MesosOptions;
import org.apache.flink.runtime.net.SSLUtils;
import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap;
......@@ -115,8 +115,7 @@ public class MesosArtifactServer implements MesosArtifactResolver {
// Config to enable https access to the artifact server
boolean enableSSL = config.getBoolean(
ConfigConstants.MESOS_ARTIFACT_SERVER_SSL_ENABLED,
ConfigConstants.DEFAULT_MESOS_ARTIFACT_SERVER_SSL_ENABLED) &&
MesosOptions.ARTIFACT_SERVER_SSL_ENABLED) &&
SSLUtils.getSSLEnabled(config);
if (enableSSL) {
......
......@@ -18,8 +18,8 @@
package org.apache.flink.mesos.runtime.clusterframework;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.mesos.configuration.MesosOptions;
import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
import org.apache.flink.mesos.scheduler.ConnectionMonitor;
import org.apache.flink.mesos.scheduler.LaunchCoordinator;
......@@ -105,8 +105,8 @@ public class MesosFlinkResourceManagerTest extends TestLogger {
private static final long serialVersionUID = -952579203067648838L;
{
setInteger(ConfigConstants.MESOS_MAX_FAILED_TASKS, -1);
setInteger(ConfigConstants.MESOS_INITIAL_TASKS, 0);
setInteger(MesosOptions.MAX_FAILED_TASKS, -1);
setInteger(MesosOptions.INITIAL_TASKS, 0);
}};
@BeforeClass
......
......@@ -18,8 +18,8 @@
package org.apache.flink.runtime.clusterframework;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ResourceManagerOptions;
import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
import java.util.HashMap;
......@@ -115,22 +115,20 @@ public class ContaineredTaskManagerParameters implements java.io.Serializable {
// (1) compute how much memory we subtract from the total memory, to get the Java memory
final float memoryCutoffRatio = config.getFloat(
ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO,
ConfigConstants.DEFAULT_YARN_HEAP_CUTOFF_RATIO);
ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO);
final int minCutoff = config.getInteger(
ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN,
ConfigConstants.DEFAULT_YARN_HEAP_CUTOFF);
ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN);
if (memoryCutoffRatio >= 1 || memoryCutoffRatio <= 0) {
throw new IllegalArgumentException("The configuration value '"
+ ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO + "' must be between 0 and 1. Value given="
+ ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key() + "' must be between 0 and 1. Value given="
+ memoryCutoffRatio);
}
if (minCutoff >= containerMemoryMB) {
throw new IllegalArgumentException("The configuration value '"
+ ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN + "'='" + minCutoff
+ ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN.key() + "'='" + minCutoff
+ "' is larger than the total container memory " + containerMemoryMB);
}
......@@ -147,7 +145,7 @@ public class ContaineredTaskManagerParameters implements java.io.Serializable {
// (3) obtain the additional environment variables from the configuration
final HashMap<String, String> envVars = new HashMap<>();
final String prefix = ConfigConstants.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX;
final String prefix = ResourceManagerOptions.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX;
for (String key : config.keySet()) {
if (key.startsWith(prefix) && key.length() > prefix.length()) {
......
......@@ -28,7 +28,7 @@ import akka.actor.{ActorRef, ActorSystem}
import com.typesafe.config.Config
import org.apache.flink.api.common.time.Time
import org.apache.flink.api.common.{JobExecutionResult, JobID, JobSubmissionResult}
import org.apache.flink.configuration.{AkkaOptions, ConfigConstants, Configuration, JobManagerOptions, TaskManagerOptions}
import org.apache.flink.configuration.{AkkaOptions, ConfigConstants, Configuration, JobManagerOptions, ResourceManagerOptions, TaskManagerOptions}
import org.apache.flink.core.fs.Path
import org.apache.flink.runtime.akka.{AkkaJobManagerGateway, AkkaUtils}
import org.apache.flink.runtime.client.{JobClient, JobExecutionException}
......@@ -168,8 +168,7 @@ abstract class FlinkMiniCluster(
def getNumberOfResourceManagers: Int = {
originalConfiguration.getInteger(
ConfigConstants.LOCAL_NUMBER_RESOURCE_MANAGER,
ConfigConstants.DEFAULT_LOCAL_NUMBER_RESOURCE_MANAGER
ResourceManagerOptions.LOCAL_NUMBER_RESOURCE_MANAGER
)
}
......@@ -226,8 +225,8 @@ abstract class FlinkMiniCluster(
if (useSingleActorSystem) {
AkkaUtils.getAkkaConfig(originalConfiguration, None)
} else {
val port = originalConfiguration.getInteger(ConfigConstants.RESOURCE_MANAGER_IPC_PORT_KEY,
ConfigConstants.DEFAULT_RESOURCE_MANAGER_IPC_PORT)
val port = originalConfiguration.getInteger(
ResourceManagerOptions.IPC_PORT)
val resolvedPort = if(port != 0) port + index else port
......
......@@ -24,7 +24,7 @@ import java.util.concurrent.{Executor, ScheduledExecutorService}
import akka.actor.{ActorRef, ActorSystem, Props}
import org.apache.flink.api.common.JobID
import org.apache.flink.api.common.io.FileOutputFormat
import org.apache.flink.configuration.{ConfigConstants, Configuration, JobManagerOptions, QueryableStateOptions, TaskManagerOptions}
import org.apache.flink.configuration.{ConfigConstants, Configuration, JobManagerOptions, QueryableStateOptions, ResourceManagerOptions, TaskManagerOptions}
import org.apache.flink.core.fs.Path
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
import org.apache.flink.runtime.clusterframework.FlinkResourceManager
......@@ -183,11 +183,11 @@ class LocalFlinkMiniCluster(
val resourceManagerName = getResourceManagerName(index)
val resourceManagerPort = config.getInteger(
ConfigConstants.RESOURCE_MANAGER_IPC_PORT_KEY,
ConfigConstants.DEFAULT_RESOURCE_MANAGER_IPC_PORT)
ResourceManagerOptions.IPC_PORT)
if(resourceManagerPort > 0) {
config.setInteger(ConfigConstants.RESOURCE_MANAGER_IPC_PORT_KEY, resourceManagerPort + index)
config.setInteger(ResourceManagerOptions.IPC_PORT,
resourceManagerPort + index)
}
val resourceManagerProps = getResourceManagerProps(
......
......@@ -20,6 +20,7 @@ package org.apache.flink.yarn;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ResourceManagerOptions;
import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.Level;
......@@ -62,8 +63,8 @@ public class UtilsTest {
@Test
public void testHeapCutoff() {
Configuration conf = new Configuration();
conf.setDouble(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO, 0.15);
conf.setInteger(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN, 384);
conf.setFloat(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO, 0.15F);
conf.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN, 384);
Assert.assertEquals(616, Utils.calculateHeapSize(1000, conf));
Assert.assertEquals(8500, Utils.calculateHeapSize(10000, conf));
......@@ -71,14 +72,14 @@ public class UtilsTest {
// test different configuration
Assert.assertEquals(3400, Utils.calculateHeapSize(4000, conf));
conf.setString(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN, "1000");
conf.setString(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO, "0.1");
conf.setString(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN.key(), "1000");
conf.setString(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key(), "0.1");
Assert.assertEquals(3000, Utils.calculateHeapSize(4000, conf));
conf.setString(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO, "0.5");
conf.setString(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key(), "0.5");
Assert.assertEquals(2000, Utils.calculateHeapSize(4000, conf));
conf.setString(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO, "1");
conf.setString(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key(), "1");
Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf));
// test also deprecated keys
......@@ -93,21 +94,21 @@ public class UtilsTest {
@Test(expected = IllegalArgumentException.class)
public void illegalArgument() {
Configuration conf = new Configuration();
conf.setString(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO, "1.1");
conf.setString(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key(), "1.1");
Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf));
}
@Test(expected = IllegalArgumentException.class)
public void illegalArgumentNegative() {
Configuration conf = new Configuration();
conf.setString(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO, "-0.01");
conf.setString(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key(), "-0.01");
Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf));
}
@Test(expected = IllegalArgumentException.class)
public void tooMuchCutoff() {
Configuration conf = new Configuration();
conf.setString(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO, "6000");
conf.setString(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key(), "6000");
Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf));
}
......
......@@ -18,7 +18,6 @@
package org.apache.flink.yarn;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.client.JobClient;
......@@ -26,6 +25,7 @@ import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
import org.apache.flink.test.testdata.WordCountData;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
......@@ -145,7 +145,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
"-nm", "customName",
"-Dfancy-configuration-value=veryFancy",
"-Dyarn.maximum-failed-containers=3",
"-D" + ConfigConstants.YARN_VCORES + "=2"},
"-D" + YarnConfigOptions.VCORES.key() + "=2"},
"Number of connected TaskManagers changed to 1. Slots available: 3",
RunTypes.YARN_SESSION);
......@@ -186,7 +186,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
Assert.assertEquals("veryFancy", parsedConfig.get("fancy-configuration-value"));
Assert.assertEquals("3", parsedConfig.get("yarn.maximum-failed-containers"));
Assert.assertEquals("2", parsedConfig.get(ConfigConstants.YARN_VCORES));
Assert.assertEquals("2", parsedConfig.get(YarnConfigOptions.VCORES.key()));
// -------------- FLINK-1902: check if jobmanager hostname/port are shown in web interface
// first, get the hostname/port
......
......@@ -253,7 +253,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
// The number of cores can be configured in the config.
// If not configured, it is set to the number of task slots
int numYarnVcores = conf.getInt(YarnConfiguration.NM_VCORES, YarnConfiguration.DEFAULT_NM_VCORES);
int configuredVcores = flinkConfiguration.getInteger(ConfigConstants.YARN_VCORES, clusterSpecification.getSlotsPerTaskManager());
int configuredVcores = flinkConfiguration.getInteger(YarnConfigOptions.VCORES, clusterSpecification.getSlotsPerTaskManager());
// don't configure more than the maximum configured number of vcores
if (configuredVcores > numYarnVcores) {
throw new IllegalConfigurationException(
......@@ -261,7 +261,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
" but Yarn only has %d virtual cores available. Please note that the number" +
" of virtual cores is set to the number of task slots by default unless configured" +
" in the Flink config with '%s.'",
configuredVcores, numYarnVcores, ConfigConstants.YARN_VCORES));
configuredVcores, numYarnVcores, YarnConfigOptions.VCORES.key()));
}
// check if required Hadoop environment variables are set. If not, warn user
......@@ -677,7 +677,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
// activate re-execution of failed applications
appContext.setMaxAppAttempts(
flinkConfiguration.getInteger(
ConfigConstants.YARN_APPLICATION_ATTEMPTS,
YarnConfigOptions.APPLICATION_ATTEMPTS.key(),
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS));
activateHighAvailabilitySupport(appContext);
......@@ -685,7 +685,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
// set number of application retries to 1 in the default case
appContext.setMaxAppAttempts(
flinkConfiguration.getInteger(
ConfigConstants.YARN_APPLICATION_ATTEMPTS,
YarnConfigOptions.APPLICATION_ATTEMPTS.key(),
1));
}
......@@ -1135,7 +1135,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
IllegalAccessException {
final ApplicationSubmissionContextReflector reflector = ApplicationSubmissionContextReflector.getInstance();
final String tagsString = flinkConfiguration.getString(ConfigConstants.YARN_APPLICATION_TAGS, "");
final String tagsString = flinkConfiguration.getString(YarnConfigOptions.APPLICATION_TAGS);
final Set<String> applicationTags = new HashSet<>();
......
......@@ -18,7 +18,7 @@
package org.apache.flink.yarn;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.ResourceManagerOptions;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
import org.apache.flink.runtime.security.SecurityUtils;
......@@ -82,24 +82,17 @@ public final class Utils {
*/
public static int calculateHeapSize(int memory, org.apache.flink.configuration.Configuration conf) {
BootstrapTools.substituteDeprecatedConfigKey(conf,
ConfigConstants.YARN_HEAP_CUTOFF_RATIO, ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO);
BootstrapTools.substituteDeprecatedConfigKey(conf,
ConfigConstants.YARN_HEAP_CUTOFF_MIN, ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN);
float memoryCutoffRatio = conf.getFloat(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO,
ConfigConstants.DEFAULT_YARN_HEAP_CUTOFF_RATIO);
int minCutoff = conf.getInteger(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN,
ConfigConstants.DEFAULT_YARN_HEAP_CUTOFF);
float memoryCutoffRatio = conf.getFloat(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO);
int minCutoff = conf.getInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN);
if (memoryCutoffRatio > 1 || memoryCutoffRatio < 0) {
throw new IllegalArgumentException("The configuration value '"
+ ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO
+ ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key()
+ "' must be between 0 and 1. Value given=" + memoryCutoffRatio);
}
if (minCutoff > memory) {
throw new IllegalArgumentException("The configuration value '"
+ ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN
+ ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN.key()
+ "' is higher (" + minCutoff + ") than the requested amount of memory " + memory);
}
......
......@@ -23,6 +23,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.ResourceManagerOptions;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
......@@ -43,6 +44,7 @@ import org.apache.flink.runtime.util.JvmShutdownSafeguard;
import org.apache.flink.runtime.util.SignalHandler;
import org.apache.flink.runtime.webmonitor.WebMonitor;
import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
......@@ -299,8 +301,7 @@ public class YarnApplicationMasterRunner {
// try to start the actor system, JobManager and JobManager actor system
// using the port range definition from the config.
final String amPortRange = config.getString(
ConfigConstants.YARN_APPLICATION_MASTER_PORT,
ConfigConstants.DEFAULT_YARN_JOB_MANAGER_PORT);
YarnConfigOptions.APPLICATION_MASTER_PORT);
actorSystem = BootstrapTools.startActorSystem(config, appMasterHostname, amPortRange, LOG);
......@@ -518,21 +519,13 @@ public class YarnApplicationMasterRunner {
// corresponding generic config keys instead. that way, later code needs not
// deal with deprecated config keys
BootstrapTools.substituteDeprecatedConfigKey(configuration,
ConfigConstants.YARN_HEAP_CUTOFF_RATIO,
ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO);
BootstrapTools.substituteDeprecatedConfigKey(configuration,
ConfigConstants.YARN_HEAP_CUTOFF_MIN,
ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN);
BootstrapTools.substituteDeprecatedConfigPrefix(configuration,
ConfigConstants.YARN_APPLICATION_MASTER_ENV_PREFIX,
ConfigConstants.CONTAINERIZED_MASTER_ENV_PREFIX);
ResourceManagerOptions.CONTAINERIZED_MASTER_ENV_PREFIX);
BootstrapTools.substituteDeprecatedConfigPrefix(configuration,
ConfigConstants.YARN_TASK_MANAGER_ENV_PREFIX,
ConfigConstants.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX);
ResourceManagerOptions.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX);
return configuration;
}
......
......@@ -18,7 +18,6 @@
package org.apache.flink.yarn;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
......@@ -28,6 +27,7 @@ import org.apache.flink.runtime.clusterframework.messages.StopCluster;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.util.Preconditions;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.flink.yarn.messages.ContainersAllocated;
import org.apache.flink.yarn.messages.ContainersComplete;
......@@ -337,7 +337,7 @@ public class YarnFlinkResourceManager extends FlinkResourceManager<RegisteredYar
// Resource requirements for worker containers
int taskManagerSlots = taskManagerParameters.numSlots();
int vcores = config.getInteger(ConfigConstants.YARN_VCORES, Math.max(taskManagerSlots, 1));
int vcores = config.getInteger(YarnConfigOptions.VCORES, Math.max(taskManagerSlots, 1));
Resource capability = Resource.newInstance(containerMemorySizeMB, vcores);
resourceManagerClient.addContainerRequest(
......@@ -550,7 +550,7 @@ public class YarnFlinkResourceManager extends FlinkResourceManager<RegisteredYar
String msg = "Stopping YARN session because the number of failed containers ("
+ failedContainersSoFar + ") exceeded the maximum failed containers ("
+ maxFailedContainers + "). This number is controlled by the '"
+ ConfigConstants.YARN_MAX_FAILED_CONTAINERS + "' configuration setting. "
+ YarnConfigOptions.MAX_FAILED_CONTAINERS.key() + "' configuration setting. "
+ "By default its the number of requested containers.";
LOG.error(msg);
......@@ -710,7 +710,7 @@ public class YarnFlinkResourceManager extends FlinkResourceManager<RegisteredYar
Logger log) {
final int yarnHeartbeatIntervalMS = flinkConfig.getInteger(
ConfigConstants.YARN_HEARTBEAT_DELAY_SECONDS, DEFAULT_YARN_HEARTBEAT_INTERVAL_MS / 1000) * 1000;
YarnConfigOptions.HEARTBEAT_DELAY_SECONDS) * 1000;
final long yarnExpiryIntervalMS = yarnConfig.getLong(
YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS,
......@@ -723,7 +723,7 @@ public class YarnFlinkResourceManager extends FlinkResourceManager<RegisteredYar
}
final int maxFailedContainers = flinkConfig.getInteger(
ConfigConstants.YARN_MAX_FAILED_CONTAINERS, numInitialTaskManagers);
YarnConfigOptions.MAX_FAILED_CONTAINERS.key(), numInitialTaskManagers);
if (maxFailedContainers >= 0) {
log.info("YARN application tolerates {} failed TaskManager containers before giving up",
maxFailedContainers);
......
......@@ -38,6 +38,7 @@ import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.Container;
......@@ -134,7 +135,7 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements
this.yarnConfig = new YarnConfiguration();
this.env = env;
final int yarnHeartbeatIntervalMS = flinkConfig.getInteger(
ConfigConstants.YARN_HEARTBEAT_DELAY_SECONDS, DEFAULT_YARN_HEARTBEAT_INTERVAL_MS / 1000) * 1000;
YarnConfigOptions.HEARTBEAT_DELAY_SECONDS) * 1000;
final long yarnExpiryIntervalMS = yarnConfig.getLong(
YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS,
......
......@@ -37,6 +37,7 @@ import org.apache.flink.yarn.AbstractYarnClusterDescriptor;
import org.apache.flink.yarn.YarnClusterClient;
import org.apache.flink.yarn.YarnClusterDescriptor;
import org.apache.flink.yarn.YarnClusterDescriptorV2;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
......@@ -773,7 +774,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
String defaultPropertiesFileLocation = System.getProperty("java.io.tmpdir");
String currentUser = System.getProperty("user.name");
String propertiesFileLocation =
conf.getString(ConfigConstants.YARN_PROPERTIES_FILE_LOCATION, defaultPropertiesFileLocation);
conf.getString(YarnConfigOptions.PROPERTIES_FILE_LOCATION, defaultPropertiesFileLocation);
return new File(propertiesFileLocation, YARN_PROPERTIES_FILE + currentUser);
}
......
......@@ -52,6 +52,71 @@ public class YarnConfigOptions {
key("yarn.per-job-cluster.include-user-jar")
.defaultValue("ORDER");
/**
* The vcores exposed by YARN.
*/
public static final ConfigOption<Integer> VCORES =
key("yarn.containers.vcores")
.defaultValue(-1);
/**
* The maximum number of failed YARN containers before entirely stopping
* the YARN session / job on YARN.
* By default, we take the number of of initially requested containers.
*
* <p>Note: This option returns a String since Integer options must have a static default value.
*/
public static final ConfigOption<String> MAX_FAILED_CONTAINERS =
key("yarn.maximum-failed-containers")
.noDefaultValue();
/**
* Set the number of retries for failed YARN ApplicationMasters/JobManagers in high
* availability mode. This value is usually limited by YARN.
* By default, it's 1 in the standalone case and 2 in the high availability case.
*
* <p>>Note: This option returns a String since Integer options must have a static default value.
*/
public static final ConfigOption<String> APPLICATION_ATTEMPTS =
key("yarn.application-attempts")
.noDefaultValue();
/**
* The heartbeat interval between the Application Master and the YARN Resource Manager.
*/
public static final ConfigOption<Integer> HEARTBEAT_DELAY_SECONDS =
key("yarn.heartbeat-delay")
.defaultValue(5);
/**
* When a Flink job is submitted to YARN, the JobManager's host and the number of available
* processing slots is written into a properties file, so that the Flink client is able
* to pick those details up.
* This configuration parameter allows changing the default location of that file (for example
* for environments sharing a Flink installation between users)
*/
public static final ConfigOption<String> PROPERTIES_FILE_LOCATION =
key("yarn.properties-file.location")
.noDefaultValue();
/**
* The config parameter defining the Akka actor system port for the ApplicationMaster and
* JobManager.
* The port can either be a port, such as "9123",
* a range of ports: "50100-50200"
* or a list of ranges and or points: "50100-50200,50300-50400,51234".
* Setting the port to 0 will let the OS choose an available port.
*/
public static final ConfigOption<String> APPLICATION_MASTER_PORT =
key("yarn.application-master.port")
.defaultValue("0");
/**
* A comma-separated list of strings to use as YARN application tags.
*/
public static final ConfigOption<String> APPLICATION_TAGS =
key("yarn.tags")
.defaultValue("");
// ------------------------------------------------------------------------
......
......@@ -23,6 +23,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.ResourceManagerOptions;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.security.SecurityContext;
......@@ -112,21 +113,13 @@ public class YarnEntrypointUtils {
// corresponding generic config keys instead. that way, later code needs not
// deal with deprecated config keys
BootstrapTools.substituteDeprecatedConfigKey(configuration,
ConfigConstants.YARN_HEAP_CUTOFF_RATIO,
ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO);
BootstrapTools.substituteDeprecatedConfigKey(configuration,
ConfigConstants.YARN_HEAP_CUTOFF_MIN,
ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN);
BootstrapTools.substituteDeprecatedConfigPrefix(configuration,
ConfigConstants.YARN_APPLICATION_MASTER_ENV_PREFIX,
ConfigConstants.CONTAINERIZED_MASTER_ENV_PREFIX);
ResourceManagerOptions.CONTAINERIZED_MASTER_ENV_PREFIX);
BootstrapTools.substituteDeprecatedConfigPrefix(configuration,
ConfigConstants.YARN_TASK_MANAGER_ENV_PREFIX,
ConfigConstants.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX);
ResourceManagerOptions.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX);
final String keytabPath;
......
......@@ -22,7 +22,7 @@ import java.io.IOException
import java.util.concurrent.{Executor, ScheduledExecutorService, TimeUnit}
import akka.actor.ActorRef
import org.apache.flink.configuration.{ConfigConstants, Configuration => FlinkConfiguration}
import org.apache.flink.configuration.{Configuration => FlinkConfiguration}
import org.apache.flink.core.fs.Path
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
import org.apache.flink.runtime.clusterframework.ContaineredJobManager
......@@ -34,6 +34,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkSchedule
import org.apache.flink.runtime.jobmanager.{JobManager, SubmittedJobGraphStore}
import org.apache.flink.runtime.leaderelection.LeaderElectionService
import org.apache.flink.runtime.metrics.MetricRegistry
import org.apache.flink.yarn.configuration.YarnConfigOptions
import scala.concurrent.duration._
import scala.language.postfixOps
......@@ -88,7 +89,7 @@ class YarnJobManager(
val DEFAULT_YARN_HEARTBEAT_DELAY: FiniteDuration = 5 seconds
val YARN_HEARTBEAT_DELAY: FiniteDuration =
FiniteDuration(
flinkConfiguration.getInteger(ConfigConstants.YARN_HEARTBEAT_DELAY_SECONDS, 5),
flinkConfiguration.getInteger(YarnConfigOptions.HEARTBEAT_DELAY_SECONDS),
TimeUnit.SECONDS)
val yarnFilesPath: Option[String] = Option(System.getenv().get(YarnConfigKeys.FLINK_YARN_FILES))
......
......@@ -25,6 +25,7 @@ import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.util.TestLogger;
import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.ApplicationConstants;
......@@ -87,7 +88,7 @@ public class YarnClusterDescriptorTest extends TestLogger {
public void testConfigOverwrite() {
Configuration configuration = new Configuration();
// overwrite vcores in config
configuration.setInteger(ConfigConstants.YARN_VCORES, Integer.MAX_VALUE);
configuration.setInteger(YarnConfigOptions.VCORES, Integer.MAX_VALUE);
YarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor(
configuration,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册