提交 e43b970a 编写于 作者: U Ufuk Celebi

[FLINK-3172] [core, runtime, yarn] Allow port range for job manager with high availability

This closes #1458.
上级 21882af0
......@@ -55,6 +55,8 @@ jobManagerAddress1:webUIPort1
By default, the job manager will pick a *random port* for inter process communication. You can change this via the **`recovery.jobmanager.port`** key. This key accepts single ports (e.g. `50010`), ranges (`50000-50025`), or a combination of both (`50010,50011,50020-50025,50050-50075`).
#### Config File (flink-conf.yaml)
In order to start an HA-cluster add the following configuration keys to `conf/flink-conf.yaml`:
......@@ -452,6 +452,9 @@ public final class ConfigConstants {
/** Defines recovery mode used for the cluster execution ("standalone", "zookeeper") */
public static final String RECOVERY_MODE = "recovery.mode";
/** Ports used by the job manager if not in standalone recovery mode */
public static final String RECOVERY_JOB_MANAGER_PORT = "recovery.jobmanager.port";
// --------------------------- ZooKeeper ----------------------------------
/** ZooKeeper servers. */
......@@ -728,6 +731,12 @@ public final class ConfigConstants {
public static String DEFAULT_RECOVERY_MODE = "standalone";
* Default port used by the job manager if not in standalone recovery mode. If <code>0</code>
* the OS picks a random port port.
public static final String DEFAULT_RECOVERY_JOB_MANAGER_PORT = "0";
// --------------------------- ZooKeeper ----------------------------------
public static final String DEFAULT_ZOOKEEPER_DIR_KEY = "/flink";
......@@ -19,10 +19,10 @@
package org.apache.flink.runtime.jobmanager
import java.io.{File, IOException}
import java.net.{UnknownHostException, InetAddress, InetSocketAddress}
import java.net.{BindException, ServerSocket, UnknownHostException, InetAddress, InetSocketAddress}
import java.util.UUID
import akka.actor.Status.{Success, Failure}
import akka.actor.Status.Failure
import akka.actor._
import akka.pattern.ask
import grizzled.slf4j.Logger
......@@ -61,7 +61,9 @@ import org.apache.flink.runtime.util._
import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils}
import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages}
import org.apache.flink.util.{ExceptionUtils, InstantiationUtil, NetUtils}
import org.jboss.netty.channel.ChannelException
import scala.annotation.tailrec
import scala.collection.JavaConverters._
import scala.concurrent._
import scala.concurrent.duration._
......@@ -1509,7 +1511,8 @@ object JobManager {
// parsing the command line arguments
val (configuration: Configuration,
executionMode: JobManagerMode,
listeningHost: String, listeningPort: Int) =
listeningHost: String,
listeningPortRange: java.util.Iterator[Integer]) =
try {
......@@ -1530,19 +1533,16 @@ object JobManager {
if (ZooKeeperUtils.isZooKeeperRecoveryMode(configuration)) {
// address and will not be reachable from anyone remote
if (listeningPort != 0) {
val message = "Config parameter '" + ConfigConstants.JOB_MANAGER_IPC_PORT_KEY +
"' is invalid, it must be equal to 0."
if (!listeningPortRange.hasNext) {
if (ZooKeeperUtils.isZooKeeperRecoveryMode(configuration)) {
val message = "Config parameter '" + ConfigConstants.RECOVERY_JOB_MANAGER_PORT +
"' does not specify a valid port range."
} else {
// address and will not be reachable from anyone remote
if (listeningPort <= 0 || listeningPort >= 65536) {
val message = "Config parameter '" + ConfigConstants.JOB_MANAGER_IPC_PORT_KEY +
"' is invalid, it must be greater than 0 and less than 65536."
else {
val message = s"Config parameter '" + ConfigConstants.JOB_MANAGER_IPC_PORT_KEY +
"' does not specify a valid port."
......@@ -1558,20 +1558,18 @@ object JobManager {
else {
} else {
LOG.info("Security is not enabled. Starting non-authenticated JobManager.")
catch {
} catch {
case t: Throwable =>
LOG.error("Failed to run JobManager.", t)
......@@ -1612,6 +1610,103 @@ object JobManager {
* Starts and runs the JobManager with all its components trying to bind to
* a port in the specified range.
* @param configuration The configuration object for the JobManager.
* @param executionMode The execution mode in which to run. Execution mode LOCAL will spawn an
* an additional TaskManager in the same process.
* @param listeningAddress The hostname where the JobManager should listen for messages.
* @param listeningPortRange The port range where the JobManager should listen for messages.
def runJobManager(
configuration: Configuration,
executionMode: JobManagerMode,
listeningAddress: String,
listeningPortRange: java.util.Iterator[Integer])
: Unit = {
val result = retryOnBindException({
// Try all ports in the range until successful
val socket = NetUtils.createSocketFromPorts(
new NetUtils.SocketFactory {
override def createSocket(port: Int): ServerSocket = new ServerSocket(
// Use the correct listening address, bound ports will only be
// detected later by Akka.
port, 0, InetAddress.getByName(listeningAddress))
val port =
if (socket == null) {
throw new BindException(s"Unable to allocate port for JobManager.")
} else {
try {
} finally {
runJobManager(configuration, executionMode, listeningAddress, port)
}, { !listeningPortRange.hasNext }, 5000)
result match {
case scala.util.Failure(f) => throw f
* Retries a function if it fails because of a [[java.net.BindException]].
* @param fn The function to retry
* @param stopCond Flag to signal termination
* @param maxSleepBetweenRetries Max random sleep time between retries
* @tparam T Return type of the the function to retry
* @return Return value of the the function to retry
def retryOnBindException[T](
fn: => T,
stopCond: => Boolean,
maxSleepBetweenRetries : Long = 0 )
: scala.util.Try[T] = {
def sleepBeforeRetry : Unit = {
if (maxSleepBetweenRetries > 0) {
val sleepTime = ((Math.random() * maxSleepBetweenRetries).asInstanceOf[Long])
LOG.info(s"Retrying after bind exception. Sleeping for ${sleepTime} ms.")
scala.util.Try {
} match {
case scala.util.Failure(x: BindException) =>
if (stopCond) {
scala.util.Failure(new RuntimeException(
"Unable to do further retries starting the actor system"))
} else {
retryOnBindException(fn, stopCond)
case scala.util.Failure(x: Exception) => x.getCause match {
case c: ChannelException =>
if (stopCond) {
scala.util.Failure(new RuntimeException(
"Unable to do further retries starting the actor system"))
} else {
retryOnBindException(fn, stopCond)
case _ => scala.util.Failure(x)
case f => f
/** Starts an ActorSystem, the JobManager and all its components including the WebMonitor.
* @param configuration The configuration object for the JobManager
......@@ -1760,7 +1855,8 @@ object JobManager {
* @param args command line arguments
* @return Quadruple of configuration, execution mode and an optional listening address
def parseArgs(args: Array[String]): (Configuration, JobManagerMode, String, Int) = {
def parseArgs(args: Array[String])
: (Configuration, JobManagerMode, String, java.util.Iterator[Integer]) = {
val parser = new scopt.OptionParser[JobManagerCliOptions]("JobManager") {
head("Flink JobManager")
......@@ -1825,27 +1921,44 @@ object JobManager {
val host = configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null)
// high availability mode
val port: Int =
val portRange =
// high availability mode
if (ZooKeeperUtils.isZooKeeperRecoveryMode(configuration)) {
LOG.info("Starting JobManager in High-Availability Mode")
LOG.info("Starting JobManager with high-availability")
configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0)
// The port range of allowed job manager ports or 0 for random
else {
LOG.info("Starting JobManager without high-availability")
// In standalone mode, we don't allow port ranges
val listeningPort = configuration.getInteger(
if (listeningPort <= 0 || listeningPort >= 65536) {
val message = "Config parameter '" + ConfigConstants.JOB_MANAGER_IPC_PORT_KEY +
"' is invalid, it must be greater than 0 and less than 65536."
val executionMode = config.getJobManagerMode
val hostPortUrl = NetUtils.hostAndPortToUrlString(host, port)
LOG.info(s"Starting JobManager on $hostPortUrl with execution mode $executionMode")
val hostUrl = NetUtils.ipAddressToUrlString(InetAddress.getByName(host))
LOG.info(s"Starting JobManager on $hostUrl:$portRange with execution mode $executionMode")
val portRangeIterator = NetUtils.getPortRangeFromString(portRange)
(configuration, executionMode, host, port)
(configuration, executionMode, host, portRangeIterator)
......@@ -18,15 +18,15 @@
package org.apache.flink.yarn
import java.io.{FileWriter, BufferedWriter, PrintWriter}
import java.io.{BufferedWriter, FileWriter, PrintWriter}
import java.net.{BindException, ServerSocket}
import java.security.PrivilegedAction
import akka.actor.{ActorRef, ActorSystem}
import org.apache.flink.client.CliFrontend
import org.apache.flink.configuration.{GlobalConfiguration, Configuration, ConfigConstants}
import org.apache.flink.configuration.{ConfigConstants, Configuration, GlobalConfiguration}
import org.apache.flink.runtime.akka.AkkaUtils
import org.apache.flink.runtime.jobmanager.{MemoryArchivist, JobManagerMode, JobManager}
import org.apache.flink.runtime.jobmanager.{JobManager, JobManagerMode, MemoryArchivist}
import org.apache.flink.runtime.util.EnvironmentInformation
import org.apache.flink.runtime.webmonitor.WebMonitor
import org.apache.flink.util.NetUtils
......@@ -34,12 +34,10 @@ import org.apache.flink.yarn.YarnMessages.StartYarnSession
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.jboss.netty.channel.ChannelException
import org.slf4j.LoggerFactory
import scala.annotation.tailrec
import scala.io.Source
import scala.util.{Success, Failure, Try}
import scala.util.{Failure, Success}
/** Base class for all application masters. This base class provides functionality to start a
* [[JobManager]] implementation in a Yarn container.
......@@ -153,34 +151,10 @@ abstract class ApplicationMasterBase {
def retry[T](fn: => T, stopCond: => Boolean): Try[T] = {
Try {
} match {
case Failure(x: BindException) =>
if (stopCond) {
Failure(new RuntimeException("Unable to do further retries starting the actor " +
} else {
retry(fn, stopCond)
case Failure(x: Exception) => x.getCause match {
case c: ChannelException =>
if (stopCond) {
Failure(new RuntimeException("Unable to do further retries starting the actor " +
} else {
retry(fn, stopCond)
case _ => Failure(x)
case f => f
// try starting the actor system
val result = retry(startActorSystem(portsIterator), {!portsIterator.hasNext})
val result = JobManager.retryOnBindException(
val (actorSystem, jmActor, archiveActor, webMonitor) = result match {
case Success(r) => r
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册