未验证 提交 b74594c4 编写于 作者: R Rune Skou Larsen 提交者: Till Rohrmann

[FLINK-9703] Allow TM ports to be exposed through Mesos

Maintain a deterministic port ordering, so we can have expectations on which endpoint is behind which port index.

This closes #6288.
上级 d58c8c05
......@@ -62,5 +62,10 @@
<td style="word-wrap: break-word;">(none)</td>
<td>Mesos framework user</td>
</tr>
<tr>
<td><h5>mesos.resourcemanager.tasks.port-assignments</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Comma-separated list of configuration keys which represent a configurable port.All port keys will dynamically get a port assigned through Mesos.</td>
</tr>
</tbody>
</table>
......@@ -120,4 +120,12 @@ public class MesosOptions {
.withDescription("Enables SSL for the Flink artifact server. Note that security.ssl.enabled also needs to" +
" be set to true encryption to enable encryption.");
/**
* Config parameter to configure which configuration keys will dynamically get a port assigned through Mesos.
*/
public static final ConfigOption<String> PORT_ASSIGNMENTS = key("mesos.resourcemanager.tasks.port-assignments")
.defaultValue("")
.withDescription("Comma-separated list of configuration keys which represent a configurable port." +
"All port keys will dynamically get a port assigned through Mesos.");
}
......@@ -30,8 +30,6 @@ import org.apache.flink.runtime.clusterframework.ContainerSpecification;
import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
import org.apache.flink.util.Preconditions;
import org.apache.flink.shaded.guava18.com.google.common.collect.Iterators;
import com.netflix.fenzo.ConstraintEvaluator;
import com.netflix.fenzo.TaskRequest;
import com.netflix.fenzo.VMTaskFitnessCalculator;
......@@ -41,8 +39,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
......@@ -53,6 +53,7 @@ import scala.Option;
import static org.apache.flink.mesos.Utils.rangeValues;
import static org.apache.flink.mesos.Utils.variable;
import static org.apache.flink.mesos.configuration.MesosOptions.PORT_ASSIGNMENTS;
/**
* Implements the launch of a Mesos worker.
......@@ -66,7 +67,7 @@ public class LaunchableMesosWorker implements LaunchableTask {
/**
* The set of configuration keys to be dynamically configured with a port allocated from Mesos.
*/
private static final String[] TM_PORT_KEYS = {
static final String[] TM_PORT_KEYS = {
"taskmanager.rpc.port",
"taskmanager.data.port"};
......@@ -147,7 +148,7 @@ public class LaunchableMesosWorker implements LaunchableTask {
@Override
public int getPorts() {
return TM_PORT_KEYS.length;
return extractPortKeys(containerSpec.getDynamicConfiguration()).size();
}
@Override
......@@ -235,9 +236,10 @@ public class LaunchableMesosWorker implements LaunchableTask {
}
// take needed ports for the TM
List<Protos.Resource> portResources = allocation.takeRanges("ports", TM_PORT_KEYS.length, roles);
Set<String> tmPortKeys = extractPortKeys(containerSpec.getDynamicConfiguration());
List<Protos.Resource> portResources = allocation.takeRanges("ports", tmPortKeys.size(), roles);
taskInfo.addAllResources(portResources);
Iterator<String> portsToAssign = Iterators.forArray(TM_PORT_KEYS);
Iterator<String> portsToAssign = tmPortKeys.iterator();
rangeValues(portResources).forEach(port -> dynamicProperties.setLong(portsToAssign.next(), port));
if (portsToAssign.hasNext()) {
throw new IllegalArgumentException("insufficient # of ports assigned");
......@@ -332,6 +334,26 @@ public class LaunchableMesosWorker implements LaunchableTask {
return taskInfo.build();
}
/**
* Get the port keys representing the TM's configured endpoints. This includes mandatory TM endpoints such as
* data and rpc as well as optionally configured endpoints for services such as prometheus reporter
*
* @param config to extract the port keys from
* @return A deterministically ordered Set of port keys to expose from the TM container
*/
static Set<String> extractPortKeys(Configuration config) {
final LinkedHashSet<String> tmPortKeys = new LinkedHashSet<>(Arrays.asList(TM_PORT_KEYS));
final String portKeys = config.getString(PORT_ASSIGNMENTS);
Arrays.stream(portKeys.split(","))
.map(String::trim)
.peek(key -> LOG.debug("Adding port key " + key + " to mesos request"))
.forEach(tmPortKeys::add);
return tmPortKeys;
}
@Override
public String toString() {
return "LaunchableMesosWorker{" +
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.mesos.runtime.clusterframework;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
import java.util.Iterator;
import java.util.Set;
import static org.apache.flink.mesos.configuration.MesosOptions.PORT_ASSIGNMENTS;
import static org.junit.Assert.assertEquals;
/**
* Test that mesos config are extracted correctly from the configuration.
*/
public class LaunchableMesosWorkerTest extends TestLogger {
@Test
public void canGetPortKeys() {
// Setup
Configuration config = new Configuration();
config.setString(PORT_ASSIGNMENTS, "someport.here,anotherport");
// Act
Set<String> portKeys = LaunchableMesosWorker.extractPortKeys(config);
// Assert
assertEquals("Must get right number of port keys", 4, portKeys.size());
Iterator<String> iterator = portKeys.iterator();
assertEquals("port key must be correct", LaunchableMesosWorker.TM_PORT_KEYS[0], iterator.next());
assertEquals("port key must be correct", LaunchableMesosWorker.TM_PORT_KEYS[1], iterator.next());
assertEquals("port key must be correct", "someport.here", iterator.next());
assertEquals("port key must be correct", "anotherport", iterator.next());
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册