未验证 提交 1e53b75e 编写于 作者: V Vijay Srinivasaraghavan 提交者: Till Rohrmann

[FLINK-5974] [mesos] Added configurations to support mesos-dns hostname resolution

This closes #3692.
上级 b1f18642
......@@ -1286,6 +1286,10 @@ public final class ConfigConstants {
public static final String DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_USER = "";
public static final String MESOS_RESOURCEMANAGER_TASKS_HOSTNAME = "mesos.resourcemanager.tasks.hostname";
public static final String MESOS_RESOURCEMANAGER_TASKS_BOOTSTRAP_CMD = "mesos.resourcemanager.tasks.cmd-prefix";
/** Default value to override SSL support for the Artifact Server */
public static final boolean DEFAULT_MESOS_ARTIFACT_SERVER_SSL_ENABLED = true;
......
......@@ -22,10 +22,12 @@ import com.netflix.fenzo.ConstraintEvaluator;
import com.netflix.fenzo.TaskAssignmentResult;
import com.netflix.fenzo.TaskRequest;
import com.netflix.fenzo.VMTaskFitnessCalculator;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.mesos.Utils;
import org.apache.flink.mesos.scheduler.LaunchableTask;
import org.apache.flink.mesos.util.MesosArtifactResolver;
import org.apache.flink.mesos.util.MesosConfiguration;
import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
import org.apache.flink.runtime.clusterframework.ContainerSpecification;
import org.apache.flink.util.Preconditions;
......@@ -64,6 +66,7 @@ public class LaunchableMesosWorker implements LaunchableTask {
private final MesosTaskManagerParameters params;
private final Protos.TaskID taskID;
private final Request taskRequest;
private final MesosConfiguration mesosConfiguration;
/**
* Construct a launchable Mesos worker.
......@@ -76,11 +79,14 @@ public class LaunchableMesosWorker implements LaunchableTask {
MesosArtifactResolver resolver,
MesosTaskManagerParameters params,
ContainerSpecification containerSpec,
Protos.TaskID taskID) {
Protos.TaskID taskID,
MesosConfiguration mesosConfiguration) {
this.resolver = Preconditions.checkNotNull(resolver);
this.params = Preconditions.checkNotNull(params);
this.containerSpec = Preconditions.checkNotNull(containerSpec);
this.params = Preconditions.checkNotNull(params);
this.taskID = Preconditions.checkNotNull(taskID);
this.mesosConfiguration = Preconditions.checkNotNull(mesosConfiguration);
this.taskRequest = new Request();
}
......@@ -193,6 +199,12 @@ public class LaunchableMesosWorker implements LaunchableTask {
final Protos.Environment.Builder env = cmd.getEnvironmentBuilder();
final StringBuilder jvmArgs = new StringBuilder();
//configure task manager hostname property if hostname override property is supplied
if(params.getTaskManagerHostname().isDefined()) {
final String taskManagerHostName = params.getTaskManagerHostname().get().replace("_TASK",taskID.getValue());
dynamicProperties.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, taskManagerHostName);
}
// use the assigned ports for the TM
if (assignment.getAssignedPorts().size() < TM_PORT_KEYS.length) {
throw new IllegalArgumentException("unsufficient # of ports assigned");
......@@ -234,8 +246,16 @@ public class LaunchableMesosWorker implements LaunchableTask {
// finalize JVM args
env.addVariables(variable(MesosConfigKeys.ENV_JVM_ARGS, jvmArgs.toString()));
// populate TASK_NAME and FRAMEWORK_NAME environment variables to the TM container
env.addVariables(variable(MesosConfigKeys.ENV_TASK_NAME, taskInfo.getTaskId().getValue()));
env.addVariables(variable(MesosConfigKeys.ENV_FRAMEWORK_NAME, mesosConfiguration.frameworkInfo().getName()));
// build the launch command w/ dynamic application properties
StringBuilder launchCommand = new StringBuilder("$FLINK_HOME/bin/mesos-taskmanager.sh ");
StringBuilder launchCommand = new StringBuilder();
if(params.bootstrapCommand().isDefined()) {
launchCommand.append(params.bootstrapCommand().get()).append(" && ");
}
launchCommand.append("$FLINK_HOME/bin/mesos-taskmanager.sh ");
launchCommand.append(ContainerSpecification.formatSystemProperties(dynamicProperties));
cmd.setValue(launchCommand.toString());
......
......@@ -210,10 +210,19 @@ public class MesosApplicationMasterRunner {
try {
// ------- (1) load and parse / validate all configurations -------
// Note that we use the "appMasterHostname" given by the system, to make sure
// we use the hostnames consistently throughout akka.
// for akka "localhost" and "localhost.localdomain" are different actors.
final String appMasterHostname = InetAddress.getLocalHost().getHostName();
final String appMasterHostname;
//We will use JM RPC address property if it is supplied through configuration
final String jmRpcAddress = config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
if(jmRpcAddress != null) {
LOG.info("JM RPC address from Flink configuration file: {} ", jmRpcAddress);
appMasterHostname = jmRpcAddress;
} else {
// Note that we use the "appMasterHostname" given by the system, to make sure
// we use the hostnames consistently throughout akka.
// for akka "localhost" and "localhost.localdomain" are different actors.
appMasterHostname = InetAddress.getLocalHost().getHostName();
}
LOG.info("App Master Hostname to use: {}", appMasterHostname);
// Mesos configuration
final MesosConfiguration mesosConfig = createMesosConfig(config, appMasterHostname);
......
......@@ -41,6 +41,16 @@ public class MesosConfigKeys {
*/
public static final String ENV_JVM_ARGS = "JVM_ARGS";
/**
* Standard environment variables used in DCOS environment
*/
public static final String ENV_TASK_NAME = "TASK_NAME";
/**
* Standard environment variables used in DCOS environment
*/
public static final String ENV_FRAMEWORK_NAME = "FRAMEWORK_NAME";
/** Private constructor to prevent instantiation */
private MesosConfigKeys() {}
}
......@@ -669,7 +669,8 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
private LaunchableMesosWorker createLaunchableMesosWorker(Protos.TaskID taskID) {
LaunchableMesosWorker launchable =
new LaunchableMesosWorker(artifactResolver, taskManagerParameters, taskManagerContainerSpec, taskID);
new LaunchableMesosWorker(artifactResolver, taskManagerParameters, taskManagerContainerSpec,
taskID, mesosConfig);
return launchable;
}
......
......@@ -64,6 +64,14 @@ public class MesosTaskManagerParameters {
key("mesos.resourcemanager.tasks.container.image.name")
.noDefaultValue();
public static final ConfigOption<String> MESOS_TM_HOSTNAME =
key(ConfigConstants.MESOS_RESOURCEMANAGER_TASKS_HOSTNAME)
.noDefaultValue();
public static final ConfigOption<String> MESOS_TM_BOOTSTRAP_CMD =
key(ConfigConstants.MESOS_RESOURCEMANAGER_TASKS_BOOTSTRAP_CMD)
.noDefaultValue();
public static final ConfigOption<String> MESOS_RM_CONTAINER_VOLUMES =
key("mesos.resourcemanager.tasks.container.volumes")
.noDefaultValue();
......@@ -92,6 +100,10 @@ public class MesosTaskManagerParameters {
private final List<Protos.Volume> containerVolumes;
private final List<ConstraintEvaluator> constraints;
private final Option<String> bootstrapCommand;
private final Option<String> taskManagerHostname;
public MesosTaskManagerParameters(
double cpus,
......@@ -99,7 +111,9 @@ public class MesosTaskManagerParameters {
Option<String> containerImageName,
ContaineredTaskManagerParameters containeredParameters,
List<Protos.Volume> containerVolumes,
List<ConstraintEvaluator> constraints) {
List<ConstraintEvaluator> constraints,
Option<String> bootstrapCommand,
Option<String> taskManagerHostname) {
this.cpus = cpus;
this.containerType = Preconditions.checkNotNull(containerType);
......@@ -107,6 +121,8 @@ public class MesosTaskManagerParameters {
this.containeredParameters = Preconditions.checkNotNull(containeredParameters);
this.containerVolumes = Preconditions.checkNotNull(containerVolumes);
this.constraints = Preconditions.checkNotNull(constraints);
this.bootstrapCommand = Preconditions.checkNotNull(bootstrapCommand);
this.taskManagerHostname = Preconditions.checkNotNull(taskManagerHostname);
}
......@@ -154,6 +170,16 @@ public class MesosTaskManagerParameters {
return constraints;
}
/**
* Get the taskManager hostname.
*/
public Option<String> getTaskManagerHostname() { return taskManagerHostname; }
/**
* Get the bootstrap command.
*/
public Option<String> bootstrapCommand() { return bootstrapCommand; }
@Override
public String toString() {
return "MesosTaskManagerParameters{" +
......@@ -163,6 +189,8 @@ public class MesosTaskManagerParameters {
", containeredParameters=" + containeredParameters +
", containerVolumes=" + containerVolumes +
", constraints=" + constraints +
", taskManagerHostName=" + taskManagerHostname +
", bootstrapCommand=" + bootstrapCommand +
'}';
}
......@@ -208,13 +236,21 @@ public class MesosTaskManagerParameters {
List<Protos.Volume> containerVolumes = buildVolumes(containerVolOpt);
//obtain Task Manager Host Name from the configuration
Option<String> tmHostname = Option.apply(flinkConfig.getString(MESOS_TM_HOSTNAME));
//obtain bootstrap command from the configuration
Option<String> tmBootstrapCommand = Option.apply(flinkConfig.getString(MESOS_TM_BOOTSTRAP_CMD));
return new MesosTaskManagerParameters(
cpus,
containerType,
Option.apply(imageName),
containeredParameters,
containerVolumes,
constraints);
constraints,
tmBootstrapCommand,
tmHostname);
}
private static List<ConstraintEvaluator> parseConstraints(String mesosConstraints) {
......
......@@ -214,7 +214,14 @@ public class MesosFlinkResourceManagerTest extends TestLogger {
ContaineredTaskManagerParameters containeredParams =
new ContaineredTaskManagerParameters(1024, 768, 256, 4, new HashMap<String, String>());
MesosTaskManagerParameters tmParams = new MesosTaskManagerParameters(
1.0, MesosTaskManagerParameters.ContainerType.MESOS, Option.<String>empty(), containeredParams, Collections.<Protos.Volume>emptyList(), Collections.<ConstraintEvaluator>emptyList());
1.0,
MesosTaskManagerParameters.ContainerType.MESOS,
Option.<String>empty(),
containeredParams,
Collections.<Protos.Volume>emptyList(),
Collections.<ConstraintEvaluator>emptyList(),
Option.<String>empty(),
Option.<String>empty());
TestActorRef<TestingMesosFlinkResourceManager> resourceManagerRef =
TestActorRef.create(system, MesosFlinkResourceManager.createActorProps(
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册