From b74594c488e4c5428c5391b89c32ae6fd7d98a79 Mon Sep 17 00:00:00 2001 From: Rune Skou Larsen Date: Mon, 9 Jul 2018 13:54:51 +0200 Subject: [PATCH] [FLINK-9703] Allow TM ports to be exposed through Mesos Maintain a deterministic port ordering, so we can have expectations on which endpoint is behind which port index. This closes #6288. --- .../generated/mesos_configuration.html | 5 ++ .../mesos/configuration/MesosOptions.java | 8 +++ .../LaunchableMesosWorker.java | 34 ++++++++++-- .../LaunchableMesosWorkerTest.java | 55 +++++++++++++++++++ 4 files changed, 96 insertions(+), 6 deletions(-) create mode 100644 flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorkerTest.java diff --git a/docs/_includes/generated/mesos_configuration.html b/docs/_includes/generated/mesos_configuration.html index 16a23886cee..cd0ae2432e3 100644 --- a/docs/_includes/generated/mesos_configuration.html +++ b/docs/_includes/generated/mesos_configuration.html @@ -62,5 +62,10 @@ (none) Mesos framework user + +
mesos.resourcemanager.tasks.port-assignments
+ (none) + Comma-separated list of configuration keys which represent a configurable port.All port keys will dynamically get a port assigned through Mesos. + diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/configuration/MesosOptions.java b/flink-mesos/src/main/java/org/apache/flink/mesos/configuration/MesosOptions.java index 6c802fa7658..753923fc35e 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/configuration/MesosOptions.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/configuration/MesosOptions.java @@ -120,4 +120,12 @@ public class MesosOptions { .withDescription("Enables SSL for the Flink artifact server. Note that security.ssl.enabled also needs to" + " be set to true encryption to enable encryption."); + /** + * Config parameter to configure which configuration keys will dynamically get a port assigned through Mesos. + */ + public static final ConfigOption PORT_ASSIGNMENTS = key("mesos.resourcemanager.tasks.port-assignments") + .defaultValue("") + .withDescription("Comma-separated list of configuration keys which represent a configurable port." + + "All port keys will dynamically get a port assigned through Mesos."); + } diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java index 93c90b76d86..bb15aee0a7f 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java @@ -30,8 +30,6 @@ import org.apache.flink.runtime.clusterframework.ContainerSpecification; import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; import org.apache.flink.util.Preconditions; -import org.apache.flink.shaded.guava18.com.google.common.collect.Iterators; - import com.netflix.fenzo.ConstraintEvaluator; import com.netflix.fenzo.TaskRequest; import com.netflix.fenzo.VMTaskFitnessCalculator; @@ -41,8 +39,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.Arrays; import java.util.Collections; import java.util.Iterator; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -53,6 +53,7 @@ import scala.Option; import static org.apache.flink.mesos.Utils.rangeValues; import static org.apache.flink.mesos.Utils.variable; +import static org.apache.flink.mesos.configuration.MesosOptions.PORT_ASSIGNMENTS; /** * Implements the launch of a Mesos worker. @@ -66,7 +67,7 @@ public class LaunchableMesosWorker implements LaunchableTask { /** * The set of configuration keys to be dynamically configured with a port allocated from Mesos. */ - private static final String[] TM_PORT_KEYS = { + static final String[] TM_PORT_KEYS = { "taskmanager.rpc.port", "taskmanager.data.port"}; @@ -147,7 +148,7 @@ public class LaunchableMesosWorker implements LaunchableTask { @Override public int getPorts() { - return TM_PORT_KEYS.length; + return extractPortKeys(containerSpec.getDynamicConfiguration()).size(); } @Override @@ -235,9 +236,10 @@ public class LaunchableMesosWorker implements LaunchableTask { } // take needed ports for the TM - List portResources = allocation.takeRanges("ports", TM_PORT_KEYS.length, roles); + Set tmPortKeys = extractPortKeys(containerSpec.getDynamicConfiguration()); + List portResources = allocation.takeRanges("ports", tmPortKeys.size(), roles); taskInfo.addAllResources(portResources); - Iterator portsToAssign = Iterators.forArray(TM_PORT_KEYS); + Iterator portsToAssign = tmPortKeys.iterator(); rangeValues(portResources).forEach(port -> dynamicProperties.setLong(portsToAssign.next(), port)); if (portsToAssign.hasNext()) { throw new IllegalArgumentException("insufficient # of ports assigned"); @@ -332,6 +334,26 @@ public class LaunchableMesosWorker implements LaunchableTask { return taskInfo.build(); } + /** + * Get the port keys representing the TM's configured endpoints. This includes mandatory TM endpoints such as + * data and rpc as well as optionally configured endpoints for services such as prometheus reporter + * + * @param config to extract the port keys from + * @return A deterministically ordered Set of port keys to expose from the TM container + */ + static Set extractPortKeys(Configuration config) { + final LinkedHashSet tmPortKeys = new LinkedHashSet<>(Arrays.asList(TM_PORT_KEYS)); + + final String portKeys = config.getString(PORT_ASSIGNMENTS); + + Arrays.stream(portKeys.split(",")) + .map(String::trim) + .peek(key -> LOG.debug("Adding port key " + key + " to mesos request")) + .forEach(tmPortKeys::add); + + return tmPortKeys; + } + @Override public String toString() { return "LaunchableMesosWorker{" + diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorkerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorkerTest.java new file mode 100644 index 00000000000..6784e427c1f --- /dev/null +++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorkerTest.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.Iterator; +import java.util.Set; + +import static org.apache.flink.mesos.configuration.MesosOptions.PORT_ASSIGNMENTS; +import static org.junit.Assert.assertEquals; + +/** + * Test that mesos config are extracted correctly from the configuration. + */ +public class LaunchableMesosWorkerTest extends TestLogger { + + @Test + public void canGetPortKeys() { + // Setup + Configuration config = new Configuration(); + config.setString(PORT_ASSIGNMENTS, "someport.here,anotherport"); + + // Act + Set portKeys = LaunchableMesosWorker.extractPortKeys(config); + + // Assert + assertEquals("Must get right number of port keys", 4, portKeys.size()); + Iterator iterator = portKeys.iterator(); + assertEquals("port key must be correct", LaunchableMesosWorker.TM_PORT_KEYS[0], iterator.next()); + assertEquals("port key must be correct", LaunchableMesosWorker.TM_PORT_KEYS[1], iterator.next()); + assertEquals("port key must be correct", "someport.here", iterator.next()); + assertEquals("port key must be correct", "anotherport", iterator.next()); + } + +} -- GitLab