提交 74b535d5 编写于 作者: R Robert Metzger

[FLINK-3074] Add config option to start YARN AM on port range

This closes #1416
上级 1190f3b1
......@@ -224,7 +224,7 @@ Note: State backend must be accessible from the JobManager, use file:// only for
- `blob.server.port`: Port definition for the blob server (serving user jar's) on the Taskmanagers.
By default the port is set to 0, which means that the operating system is picking an ephemeral port.
Flink also accepts a list of ports ("50100,50101"), ranges ("50100-50200") or a combination of both.
It is recommended to set a range of ports to avoid collisions when multiple TaskManagers are running
It is recommended to set a range of ports to avoid collisions when multiple JobManagers are running
on the same machine.
- `execution-retries.delay`: Delay between execution retries. Default value "5 s". Note that values
......@@ -428,6 +428,17 @@ For example for passing `LD_LIBRARY_PATH` as an env variable to the ApplicationM
- `yarn.taskmanager.env.` Similar to the configuration prefix about, this prefix allows setting custom
environment variables for the TaskManager processes.
- `yarn.application-master.port` (Default: 0, which lets the OS choose an ephemeral port)
With this configuration option, users can specify a port, a range of ports or a list of ports for the
Application Master (and JobManager) RPC port. By default we recommend using the default value (0) to
let the operating system choose an appropriate port. In particular when multiple AMs are running on the
same physical host, fixed port assignments prevent the AM from starting.
For example when running Flink on YARN on an environment with a restrictive firewall, this
option allows specifying a range of allowed ports.
## High Availability Mode
- `recovery.mode`: (Default 'standalone') Defines the recovery mode used for the cluster execution. Currently,
......
......@@ -257,6 +257,31 @@ It allows to access log files for running YARN applications and shows diagnostic
Users using Hadoop distributions from companies like Hortonworks, Cloudera or MapR might have to build Flink against their specific versions of Hadoop (HDFS) and YARN. Please read the [build instructions](building.html) for more details.
## Running Flink on YARN behind Firewalls
Some YARN clusters use firewalls for controlling the network traffic between the cluster and the rest of the network.
In those setups, Flink jobs can only be submitted to a YARN session from within the cluster's network (behind the firewall).
If this is not feasible for production use, Flink allows to configure a port range for all relevant services. With these
ranges configured, users can also submit jobs to Flink crossing the firewall.
Currently, two services are needed to submit a job:
* The JobManager (ApplicatonMaster in YARN)
* The BlobServer running within the JobManager.
When submitting a job to Flink, the BlobServer will distribute the jars with the user code to all worker nodes (TaskManagers).
The JobManager receives the job itself and triggers the execution.
The two configuration parameters for specifying the ports are the following:
* `yarn.application-master.port`
* `blob.server.port`
These two configuration options accept single ports (for example: "50010"), ranges ("50000-50025"), or a combination of
both ("50010,50011,50020-50025,50050-50075").
(Hadoop is using a similar mechanism, there the configuration parameter is called `yarn.app.mapreduce.am.job.client.port-range`.)
## Background / Internals
This section briefly describes how Flink and YARN interact.
......
......@@ -86,6 +86,8 @@ public final class ConfigConstants {
* The port can either be a port, such as "9123",
* a range of ports: "50100-50200"
* or a list of ranges and or points: "50100-50200,50300-50400,51234"
*
* Setting the port to 0 will let the OS choose an available port.
*/
public static final String BLOB_SERVER_PORT = "blob.server.port";
......@@ -264,6 +266,20 @@ public final class ConfigConstants {
public static final String YARN_TASK_MANAGER_ENV_PREFIX = "yarn.taskmanager.env.";
/**
* The config parameter defining the Akka actor system port for the ApplicationMaster and
* JobManager
*
* The port can either be a port, such as "9123",
* a range of ports: "50100-50200"
* or a list of ranges and or points: "50100-50200,50300-50400,51234"
*
* Setting the port to 0 will let the OS choose an available port.
*/
public static final String YARN_APPLICATION_MASTER_PORT = "yarn.application-master.port";
// ------------------------ Hadoop Configuration ------------------------
/**
......@@ -628,6 +644,12 @@ public final class ConfigConstants {
* Relative amount of memory to subtract from the requested memory.
*/
public static final float DEFAULT_YARN_HEAP_CUTOFF_RATIO = 0.25f;
/**
* Default port for the application master is 0, which means
* the operating system assigns an ephemeral port
*/
public static final String DEFAULT_YARN_APPLICATION_MASTER_PORT = "0";
// ------------------------ File System Behavior ------------------------
......
......@@ -18,7 +18,10 @@
package org.apache.flink.util;
import com.google.common.collect.Iterators;
import com.google.common.net.InetAddresses;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.Inet4Address;
......@@ -29,10 +32,13 @@ import java.net.MalformedURLException;
import java.net.ServerSocket;
import java.net.URL;
import java.net.UnknownHostException;
import java.util.HashSet;
import java.util.Set;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
public class NetUtils {
private static final Logger LOG = LoggerFactory.getLogger(NetUtils.class);
/**
* Turn a fully qualified domain name (fqdn) into a hostname. If the fqdn has multiple subparts
......@@ -170,30 +176,75 @@ public class NetUtils {
}
/**
* Returns a set of available ports defined by the range definition.
* Returns an iterator over available ports defined by the range definition.
*
* @param rangeDefinition String describing a single port, a range of ports or multiple ranges.
* @return Set of ports from the range definition
* @throws NumberFormatException If an invalid string is passed.
*/
public static Set<Integer> getPortRangeFromString(String rangeDefinition) throws NumberFormatException {
Set<Integer> finalSet = new HashSet<>();
public static Iterator<Integer> getPortRangeFromString(String rangeDefinition) throws NumberFormatException {
final String[] ranges = rangeDefinition.trim().split(",");
List<Iterator<Integer>> iterators = new ArrayList<>(ranges.length);
for(String rawRange: ranges) {
Iterator<Integer> rangeIterator = null;
String range = rawRange.trim();
int dashIdx = range.indexOf('-');
if (dashIdx == -1) {
// only one port in range:
finalSet.add(Integer.valueOf(range));
rangeIterator = Iterators.singletonIterator(Integer.valueOf(range));
} else {
// evaluate range
int start = Integer.valueOf(range.substring(0, dashIdx));
int end = Integer.valueOf(range.substring(dashIdx+1, range.length()));
for(int i = start; i <= end; i++) {
finalSet.add(i);
final int start = Integer.valueOf(range.substring(0, dashIdx));
final int end = Integer.valueOf(range.substring(dashIdx+1, range.length()));
rangeIterator = new Iterator<Integer>() {
int i = start;
@Override
public boolean hasNext() {
return i <= end;
}
@Override
public Integer next() {
return i++;
}
@Override
public void remove() {
throw new UnsupportedOperationException("Remove not supported");
}
};
}
iterators.add(rangeIterator);
}
return Iterators.concat(iterators.iterator());
}
/**
* Tries to allocate a socket from the given sets of ports.
*
* @param portsIterator A set of ports to choose from.
* @param factory A factory for creating the SocketServer
* @return null if no port was available or an allocated socket.
*/
public static ServerSocket createSocketFromPorts(Iterator<Integer> portsIterator, SocketFactory factory) throws IOException {
while (portsIterator.hasNext()) {
int port = portsIterator.next();
LOG.debug("Trying to open socket on port {}", port);
try {
return factory.createSocket(port);
} catch (IOException | IllegalArgumentException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Unable to allocate socket on port", e);
} else {
LOG.info("Unable to allocate on port {}, due to error: {}", port, e.getMessage());
}
}
}
return finalSet;
return null;
}
public interface SocketFactory {
ServerSocket createSocket(int port) throws IOException;
}
}
......@@ -18,11 +18,17 @@
package org.apache.flink.util;
import com.google.common.collect.DiscreteDomain;
import com.google.common.collect.Iterators;
import org.junit.Assert;
import org.junit.Test;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import static org.hamcrest.core.IsCollectionContaining.hasItems;
......@@ -99,11 +105,18 @@ public class NetUtilsTest {
}
}
@Test
public void testFreePortRangeUtility() {
// inspired by Hadoop's example for "yarn.app.mapreduce.am.job.client.port-range"
String rangeDefinition = "50000-50050, 50100-50200,51234 "; // this also contains some whitespaces
Set<Integer> ports = NetUtils.getPortRangeFromString(rangeDefinition);
Iterator<Integer> portsIter = NetUtils.getPortRangeFromString(rangeDefinition);
Set<Integer> ports = new HashSet<>();
while(portsIter.hasNext()) {
Assert.assertTrue("Duplicate element", ports.add(portsIter.next()));
}
Assert.assertEquals(51+101+1, ports.size());
// check first range
Assert.assertThat(ports, hasItems(50000, 50001, 50002, 50050));
......@@ -114,14 +127,20 @@ public class NetUtilsTest {
// test single port "range":
ports = NetUtils.getPortRangeFromString(" 51234");
Assert.assertEquals(1, ports.size());
Assert.assertEquals(51234, (int)ports.iterator().next());
portsIter = NetUtils.getPortRangeFromString(" 51234");
Assert.assertTrue(portsIter.hasNext());
Assert.assertEquals(51234, (int)portsIter.next());
Assert.assertFalse(portsIter.hasNext());
// test port list
ports = NetUtils.getPortRangeFromString("5,1,2,3,4");
Assert.assertEquals(5, ports.size());
Assert.assertThat(ports, hasItems(1,2,3,4,5));
portsIter = NetUtils.getPortRangeFromString("5,1,2,3,4");
Assert.assertTrue(portsIter.hasNext());
Assert.assertEquals(5, (int)portsIter.next());
Assert.assertEquals(1, (int)portsIter.next());
Assert.assertEquals(2, (int)portsIter.next());
Assert.assertEquals(3, (int)portsIter.next());
Assert.assertEquals(4, (int)portsIter.next());
Assert.assertFalse(portsIter.hasNext());
Throwable error = null;
......
......@@ -68,7 +68,7 @@ public class BlobServer extends Thread implements BlobService {
private final BlobStore blobStore;
/** Set of currently running threads */
private final Set<BlobServerConnection> activeConnections = new HashSet<BlobServerConnection>();
private final Set<BlobServerConnection> activeConnections = new HashSet<>();
/** The maximum number of concurrent connections */
private final int maxConnections;
......@@ -142,23 +142,17 @@ public class BlobServer extends Thread implements BlobService {
// ----------------------- start the server -------------------
String serverPortRange = config.getString(ConfigConstants.BLOB_SERVER_PORT, ConfigConstants.DEFAULT_BLOB_SERVER_PORT);
Iterator<Integer> ports = NetUtils.getPortRangeFromString(serverPortRange).iterator();
ServerSocket socketAttempt = null;
while(ports.hasNext()) {
int port = ports.next();
LOG.debug("Trying to open socket on port {}", port);
try {
socketAttempt = new ServerSocket(port, backlog);
break; // we were able to use the port.
} catch (IOException | IllegalArgumentException e) {
if(LOG.isDebugEnabled()) {
LOG.debug("Unable to allocate socket on port", e);
} else {
LOG.info("Unable to allocate on port {}, due to error: {}", port, e.getMessage());
}
Iterator<Integer> ports = NetUtils.getPortRangeFromString(serverPortRange);
final int finalBacklog = backlog;
ServerSocket socketAttempt = NetUtils.createSocketFromPorts(ports, new NetUtils.SocketFactory() {
@Override
public ServerSocket createSocket(int port) throws IOException {
return new ServerSocket(port, finalBacklog);
}
}
});
if(socketAttempt == null) {
throw new IOException("Unable to allocate socket for blob server in specified port range: "+serverPortRange);
} else {
......
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
# Convenience file for local debugging of the JobManager/TaskManager.
log4j.rootLogger=INFO, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
......@@ -19,22 +19,27 @@
package org.apache.flink.yarn
import java.io.{FileWriter, BufferedWriter, PrintWriter}
import java.net.{BindException, ServerSocket}
import java.security.PrivilegedAction
import akka.actor.ActorSystem
import akka.actor.{ActorRef, ActorSystem}
import org.apache.flink.client.CliFrontend
import org.apache.flink.configuration.{GlobalConfiguration, Configuration, ConfigConstants}
import org.apache.flink.runtime.akka.AkkaUtils
import org.apache.flink.runtime.jobmanager.{MemoryArchivist, JobManagerMode, JobManager}
import org.apache.flink.runtime.util.EnvironmentInformation
import org.apache.flink.runtime.webmonitor.WebMonitor
import org.apache.flink.util.NetUtils
import org.apache.flink.yarn.YarnMessages.StartYarnSession
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.jboss.netty.channel.ChannelException
import org.slf4j.LoggerFactory
import scala.annotation.tailrec
import scala.io.Source
import scala.util.{Success, Failure, Try}
/** Base class for all application masters. This base class provides functionality to start a
* [[JobManager]] implementation in a Yarn container.
......@@ -111,17 +116,78 @@ abstract class ApplicationMasterBase {
config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0)
}
val (actorSystem, jmActor, archiveActor, webMonitor) =
// we try to start the JobManager actor system using the port definition
// from the config.
// first, we check if the port is available by opening a socket
// if the actor system fails to start on the port, we try further
val amPortRange: String = config.getString(ConfigConstants.YARN_APPLICATION_MASTER_PORT,
ConfigConstants.DEFAULT_YARN_APPLICATION_MASTER_PORT)
val portsIterator = NetUtils.getPortRangeFromString(amPortRange)
// method to start the actor system.
def startActorSystem(
portsIterator: java.util.Iterator[Integer])
: (ActorSystem, ActorRef, ActorRef, Option[WebMonitor]) = {
val availableSocket = NetUtils.createSocketFromPorts(
portsIterator,
new NetUtils.SocketFactory {
override def createSocket(port: Int): ServerSocket = new ServerSocket(port)
})
// get port as integer and close socket
val tryPort = if (availableSocket == null) {
throw new BindException(s"Unable to allocate port for ApplicationMaster in " +
s"specified port range: $amPortRange ")
} else {
val port = availableSocket.getLocalPort
availableSocket.close()
port // return for if
}
JobManager.startActorSystemAndJobManagerActors(
config,
JobManagerMode.CLUSTER,
ownHostname,
0,
tryPort,
getJobManagerClass,
getArchivistClass
)
}
@tailrec
def retry[T](fn: => T, stopCond: => Boolean): Try[T] = {
Try {
fn
} match {
case Failure(x: BindException) =>
if (stopCond) {
Failure(new RuntimeException("Unable to do further retries starting the actor " +
"system"))
} else {
retry(fn, stopCond)
}
case Failure(x: Exception) => x.getCause match {
case c: ChannelException =>
if (stopCond) {
Failure(new RuntimeException("Unable to do further retries starting the actor " +
"system"))
} else {
retry(fn, stopCond)
}
case _ => Failure(x)
}
case f => f
}
}
// try starting the actor system
val result = retry(startActorSystem(portsIterator), {portsIterator.hasNext})
val (actorSystem, jmActor, archiveActor, webMonitor) = result match {
case Success(r) => r
case Failure(failure) => throw new RuntimeException("Unable to start actor system", failure)
}
actorSystemOption = Option(actorSystem)
webMonitorOption = webMonitor
val address = AkkaUtils.getAddress(actorSystem)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册