[FLINK-5974] [mesos] Make mesos service name configurable for dns lookups

上级 1e53b75e
......@@ -263,3 +263,7 @@ May be set to -1 to disable this feature.
`mesos.resourcemanager.tasks.container.image.name`: Image name to use for the container (**NO DEFAULT**)
`mesos.resourcemanager.tasks.container.volumes`: A comma seperated list of [host_path:]container_path[:RO|RW]. This allows for mounting additional volumes into your container. (**NO DEFAULT**)
`mesos.resourcemanager.tasks.hostname`: Optional value to define the TaskManager's hostname. The pattern `_TASK_` is replaced by the actual id of the Mesos task. This can be used to configure the TaskManager to use Mesos DNS (e.g. `_TASK_.flink-service.mesos`) for name lookups. (**NO DEFAULT**)
`mesos.resourcemanager.tasks.bootstrap-cmd`: A command which is executed before the TaskManager is started (**NO DEFAULT**).
......@@ -1286,10 +1286,6 @@ public final class ConfigConstants {
public static final String DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_USER = "";
public static final String MESOS_RESOURCEMANAGER_TASKS_HOSTNAME = "mesos.resourcemanager.tasks.hostname";
public static final String MESOS_RESOURCEMANAGER_TASKS_BOOTSTRAP_CMD = "mesos.resourcemanager.tasks.cmd-prefix";
/** Default value to override SSL support for the Artifact Server */
public static final boolean DEFAULT_MESOS_ARTIFACT_SERVER_SSL_ENABLED = true;
......
......@@ -34,11 +34,13 @@ import org.apache.flink.util.Preconditions;
import org.apache.mesos.Protos;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import static org.apache.flink.mesos.Utils.variable;
import static org.apache.flink.mesos.Utils.range;
......@@ -200,9 +202,15 @@ public class LaunchableMesosWorker implements LaunchableTask {
final StringBuilder jvmArgs = new StringBuilder();
//configure task manager hostname property if hostname override property is supplied
if(params.getTaskManagerHostname().isDefined()) {
final String taskManagerHostName = params.getTaskManagerHostname().get().replace("_TASK",taskID.getValue());
dynamicProperties.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, taskManagerHostName);
Option<String> taskManagerHostnameOption = params.getTaskManagerHostname();
if(taskManagerHostnameOption.isDefined()) {
// replace the TASK_ID pattern by the actual task id value of the Mesos task
final String taskManagerHostname = MesosTaskManagerParameters.TASK_ID_PATTERN
.matcher(taskManagerHostnameOption.get())
.replaceAll(Matcher.quoteReplacement(taskID.getValue()));
dynamicProperties.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, taskManagerHostname);
}
// use the assigned ports for the TM
......@@ -251,13 +259,12 @@ public class LaunchableMesosWorker implements LaunchableTask {
env.addVariables(variable(MesosConfigKeys.ENV_FRAMEWORK_NAME, mesosConfiguration.frameworkInfo().getName()));
// build the launch command w/ dynamic application properties
StringBuilder launchCommand = new StringBuilder();
if(params.bootstrapCommand().isDefined()) {
launchCommand.append(params.bootstrapCommand().get()).append(" && ");
}
launchCommand.append("$FLINK_HOME/bin/mesos-taskmanager.sh ");
launchCommand.append(ContainerSpecification.formatSystemProperties(dynamicProperties));
cmd.setValue(launchCommand.toString());
Option<String> bootstrapCmdOption = params.bootstrapCommand();
final String bootstrapCommand = bootstrapCmdOption.isDefined() ? bootstrapCmdOption.get() + " && " : "";
final String launchCommand = bootstrapCommand + "$FLINK_HOME/bin/mesos-taskmanager.sh " + ContainerSpecification.formatSystemProperties(dynamicProperties);
cmd.setValue(launchCommand);
// build the container info
Protos.ContainerInfo.Builder containerInfo = Protos.ContainerInfo.newBuilder();
......
......@@ -32,6 +32,7 @@ import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices;
import org.apache.flink.mesos.runtime.clusterframework.services.MesosServicesUtils;
......@@ -210,18 +211,10 @@ public class MesosApplicationMasterRunner {
try {
// ------- (1) load and parse / validate all configurations -------
final String appMasterHostname;
//We will use JM RPC address property if it is supplied through configuration
final String jmRpcAddress = config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
if(jmRpcAddress != null) {
LOG.info("JM RPC address from Flink configuration file: {} ", jmRpcAddress);
appMasterHostname = jmRpcAddress;
} else {
// Note that we use the "appMasterHostname" given by the system, to make sure
// we use the hostnames consistently throughout akka.
// for akka "localhost" and "localhost.localdomain" are different actors.
appMasterHostname = InetAddress.getLocalHost().getHostName();
}
final String appMasterHostname = config.getString(
JobManagerOptions.ADDRESS,
InetAddress.getLocalHost().getHostName());
LOG.info("App Master Hostname to use: {}", appMasterHostname);
// Mesos configuration
......
......@@ -669,8 +669,13 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
private LaunchableMesosWorker createLaunchableMesosWorker(Protos.TaskID taskID) {
LaunchableMesosWorker launchable =
new LaunchableMesosWorker(artifactResolver, taskManagerParameters, taskManagerContainerSpec,
taskID, mesosConfig);
new LaunchableMesosWorker(
artifactResolver,
taskManagerParameters,
taskManagerContainerSpec,
taskID,
mesosConfig);
return launchable;
}
......
......@@ -33,6 +33,7 @@ import scala.Option;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.regex.Pattern;
import static org.apache.flink.configuration.ConfigOptions.key;
......@@ -44,33 +45,36 @@ import static org.apache.flink.configuration.ConfigOptions.key;
*/
public class MesosTaskManagerParameters {
/** Pattern replaced in the {@link #MESOS_TM_HOSTNAME} by the actual task id of the Mesos task */
public static final Pattern TASK_ID_PATTERN = Pattern.compile("_TASK_", Pattern.LITERAL);
public static final ConfigOption<Integer> MESOS_RM_TASKS_SLOTS =
key(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS)
.defaultValue(1);
key(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS)
.defaultValue(1);
public static final ConfigOption<Integer> MESOS_RM_TASKS_MEMORY_MB =
key("mesos.resourcemanager.tasks.mem")
.defaultValue(1024);
key("mesos.resourcemanager.tasks.mem")
.defaultValue(1024);
public static final ConfigOption<Double> MESOS_RM_TASKS_CPUS =
key("mesos.resourcemanager.tasks.cpus")
.defaultValue(0.0);
key("mesos.resourcemanager.tasks.cpus")
.defaultValue(0.0);
public static final ConfigOption<String> MESOS_RM_CONTAINER_TYPE =
key("mesos.resourcemanager.tasks.container.type")
.defaultValue("mesos");
.defaultValue("mesos");
public static final ConfigOption<String> MESOS_RM_CONTAINER_IMAGE_NAME =
key("mesos.resourcemanager.tasks.container.image.name")
.noDefaultValue();
.noDefaultValue();
public static final ConfigOption<String> MESOS_TM_HOSTNAME =
key(ConfigConstants.MESOS_RESOURCEMANAGER_TASKS_HOSTNAME)
.noDefaultValue();
key("mesos.resourcemanager.tasks.hostname")
.noDefaultValue();
public static final ConfigOption<String> MESOS_TM_BOOTSTRAP_CMD =
key(ConfigConstants.MESOS_RESOURCEMANAGER_TASKS_BOOTSTRAP_CMD)
.noDefaultValue();
key("mesos.resourcemanager.tasks.bootstrap-cmd")
.noDefaultValue();
public static final ConfigOption<String> MESOS_RM_CONTAINER_VOLUMES =
key("mesos.resourcemanager.tasks.container.volumes")
......@@ -78,7 +82,7 @@ public class MesosTaskManagerParameters {
public static final ConfigOption<String> MESOS_CONSTRAINTS_HARD_HOSTATTR =
key("mesos.constraints.hard.hostattribute")
.noDefaultValue();
.noDefaultValue();
/**
* Value for {@code MESOS_RESOURCEMANAGER_TASKS_CONTAINER_TYPE} setting. Tells to use the Mesos containerizer.
......@@ -237,7 +241,7 @@ public class MesosTaskManagerParameters {
List<Protos.Volume> containerVolumes = buildVolumes(containerVolOpt);
//obtain Task Manager Host Name from the configuration
Option<String> tmHostname = Option.apply(flinkConfig.getString(MESOS_TM_HOSTNAME));
Option<String> taskManagerHostname = Option.apply(flinkConfig.getString(MESOS_TM_HOSTNAME));
//obtain bootstrap command from the configuration
Option<String> tmBootstrapCommand = Option.apply(flinkConfig.getString(MESOS_TM_BOOTSTRAP_CMD));
......@@ -250,7 +254,7 @@ public class MesosTaskManagerParameters {
containerVolumes,
constraints,
tmBootstrapCommand,
tmHostname);
taskManagerHostname);
}
private static List<ConstraintEvaluator> parseConstraints(String mesosConstraints) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册