提交 2dcff4c1 编写于 作者: S Stephan Ewen

[FLINK-1559] [akka] Normalize all akka URLs to use IP addresses rather than hostnames

上级 b941cf2d
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
package org.apache.flink.runtime.akka package org.apache.flink.runtime.akka
import java.io.IOException import java.io.IOException
import java.net.InetAddress
import java.util.concurrent.{TimeUnit, Callable} import java.util.concurrent.{TimeUnit, Callable}
import akka.actor.Actor.Receive import akka.actor.Actor.Receive
...@@ -94,9 +95,12 @@ object AkkaUtils { ...@@ -94,9 +95,12 @@ object AkkaUtils {
val defaultConfig = getBasicAkkaConfig(configuration) val defaultConfig = getBasicAkkaConfig(configuration)
listeningAddress match { listeningAddress match {
case Some((hostname, port)) => case Some((hostname, port)) =>
val remoteConfig = getRemoteAkkaConfig(configuration, hostname, port) val ipAddress = InetAddress.getByName(hostname).getHostAddress()
val remoteConfig = getRemoteAkkaConfig(configuration, ipAddress, port)
remoteConfig.withFallback(defaultConfig) remoteConfig.withFallback(defaultConfig)
case None => case None =>
defaultConfig defaultConfig
} }
...@@ -174,10 +178,12 @@ object AkkaUtils { ...@@ -174,10 +178,12 @@ object AkkaUtils {
*/ */
private def getRemoteAkkaConfig(configuration: Configuration, private def getRemoteAkkaConfig(configuration: Configuration,
hostname: String, port: Int): Config = { hostname: String, port: Int): Config = {
val akkaAskTimeout = Duration(configuration.getString(ConfigConstants.AKKA_ASK_TIMEOUT, val akkaAskTimeout = Duration(configuration.getString(
ConfigConstants.AKKA_ASK_TIMEOUT,
ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT)) ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT))
val startupTimeout = configuration.getString(ConfigConstants.AKKA_STARTUP_TIMEOUT, val startupTimeout = configuration.getString(
ConfigConstants.AKKA_STARTUP_TIMEOUT,
akkaAskTimeout.toString) akkaAskTimeout.toString)
val transportHeartbeatInterval = configuration.getString( val transportHeartbeatInterval = configuration.getString(
...@@ -188,25 +194,32 @@ object AkkaUtils { ...@@ -188,25 +194,32 @@ object AkkaUtils {
ConfigConstants.AKKA_TRANSPORT_HEARTBEAT_PAUSE, ConfigConstants.AKKA_TRANSPORT_HEARTBEAT_PAUSE,
ConfigConstants.DEFAULT_AKKA_TRANSPORT_HEARTBEAT_PAUSE) ConfigConstants.DEFAULT_AKKA_TRANSPORT_HEARTBEAT_PAUSE)
val transportThreshold = configuration.getDouble(ConfigConstants.AKKA_TRANSPORT_THRESHOLD, val transportThreshold = configuration.getDouble(
ConfigConstants.AKKA_TRANSPORT_THRESHOLD,
ConfigConstants.DEFAULT_AKKA_TRANSPORT_THRESHOLD) ConfigConstants.DEFAULT_AKKA_TRANSPORT_THRESHOLD)
val watchHeartbeatInterval = configuration.getString( val watchHeartbeatInterval = configuration.getString(
ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, (akkaAskTimeout/10).toString) ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL,
(akkaAskTimeout/10).toString)
val watchHeartbeatPause = configuration.getString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, val watchHeartbeatPause = configuration.getString(
ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE,
akkaAskTimeout.toString) akkaAskTimeout.toString)
val watchThreshold = configuration.getDouble(ConfigConstants.AKKA_WATCH_THRESHOLD, val watchThreshold = configuration.getDouble(
ConfigConstants.AKKA_WATCH_THRESHOLD,
ConfigConstants.DEFAULT_AKKA_WATCH_THRESHOLD) ConfigConstants.DEFAULT_AKKA_WATCH_THRESHOLD)
val akkaTCPTimeout = configuration.getString(ConfigConstants.AKKA_TCP_TIMEOUT, val akkaTCPTimeout = configuration.getString(
ConfigConstants.AKKA_TCP_TIMEOUT,
akkaAskTimeout.toString) akkaAskTimeout.toString)
val akkaFramesize = configuration.getString(ConfigConstants.AKKA_FRAMESIZE, val akkaFramesize = configuration.getString(
ConfigConstants.AKKA_FRAMESIZE,
ConfigConstants.DEFAULT_AKKA_FRAMESIZE) ConfigConstants.DEFAULT_AKKA_FRAMESIZE)
val lifecycleEvents = configuration.getBoolean(ConfigConstants.AKKA_LOG_LIFECYCLE_EVENTS, val lifecycleEvents = configuration.getBoolean(
ConfigConstants.AKKA_LOG_LIFECYCLE_EVENTS,
ConfigConstants.DEFAULT_AKKA_LOG_LIFECYCLE_EVENTS) ConfigConstants.DEFAULT_AKKA_LOG_LIFECYCLE_EVENTS)
val logLifecycleEvents = if (lifecycleEvents) "on" else "off" val logLifecycleEvents = if (lifecycleEvents) "on" else "off"
......
...@@ -19,7 +19,7 @@ ...@@ -19,7 +19,7 @@
package org.apache.flink.runtime.client package org.apache.flink.runtime.client
import java.io.IOException import java.io.IOException
import java.net.InetSocketAddress import java.net.{InetAddress, InetSocketAddress}
import akka.actor.Status.Failure import akka.actor.Status.Failure
import akka.actor._ import akka.actor._
...@@ -172,7 +172,8 @@ object JobClient { ...@@ -172,7 +172,8 @@ object JobClient {
"JobManager address has not been specified in the configuration.") "JobManager address has not been specified in the configuration.")
} }
JobManager.getRemoteJobManagerAkkaURL(jobManagerAddress + ":" + jobManagerRPCPort) val hostPort = new InetSocketAddress(InetAddress.getByName(jobManagerAddress), jobManagerRPCPort)
JobManager.getRemoteJobManagerAkkaURL(hostPort)
} }
} }
......
...@@ -848,17 +848,6 @@ object JobManager { ...@@ -848,17 +848,6 @@ object JobManager {
// Resolving the JobManager endpoint // Resolving the JobManager endpoint
// -------------------------------------------------------------------------- // --------------------------------------------------------------------------
/**
* Builds the akka actor path for the JobManager actor, given the address (host:port)
* where the JobManager's actor system runs.
*
* @param address The address (host:port) of the JobManager's actor system.
* @return The akka URL of the JobManager actor.
*/
def getRemoteJobManagerAkkaURL(address: String): String = {
s"akka.tcp://flink@$address/user/$JOB_MANAGER_NAME"
}
/** /**
* Builds the akka actor path for the JobManager actor, given the socket address * Builds the akka actor path for the JobManager actor, given the socket address
* where the JobManager's actor system runs. * where the JobManager's actor system runs.
...@@ -867,7 +856,8 @@ object JobManager { ...@@ -867,7 +856,8 @@ object JobManager {
* @return The akka URL of the JobManager actor. * @return The akka URL of the JobManager actor.
*/ */
def getRemoteJobManagerAkkaURL(address: InetSocketAddress): String = { def getRemoteJobManagerAkkaURL(address: InetSocketAddress): String = {
getRemoteJobManagerAkkaURL(address.getAddress().getHostAddress() + ":" + address.getPort) val hostPort = address.getAddress().getHostAddress() + ":" + address.getPort()
s"akka.tcp://flink@$hostPort/user/$JOB_MANAGER_NAME"
} }
/** /**
......
...@@ -18,6 +18,8 @@ ...@@ -18,6 +18,8 @@
package org.apache.flink.runtime.minicluster package org.apache.flink.runtime.minicluster
import java.net.InetAddress
import akka.pattern.ask import akka.pattern.ask
import akka.actor.{ActorRef, ActorSystem} import akka.actor.{ActorRef, ActorSystem}
import com.typesafe.config.Config import com.typesafe.config.Config
...@@ -42,7 +44,9 @@ abstract class FlinkMiniCluster(userConfiguration: Configuration, ...@@ -42,7 +44,9 @@ abstract class FlinkMiniCluster(userConfiguration: Configuration,
val singleActorSystem: Boolean) { val singleActorSystem: Boolean) {
import FlinkMiniCluster._ import FlinkMiniCluster._
val HOSTNAME = "localhost" // NOTE: THIS MUST BE getByName("localhost"), which is 127.0.0.1 and
// not getLocalHost(), which may be 127.0.1.1
val HOSTNAME = InetAddress.getByName("localhost").getHostAddress()
implicit val timeout = AkkaUtils.getTimeout(userConfiguration) implicit val timeout = AkkaUtils.getTimeout(userConfiguration)
......
...@@ -821,18 +821,22 @@ object TaskManager { ...@@ -821,18 +821,22 @@ object TaskManager {
val jobManagerURL = if (localAkkaCommunication) { val jobManagerURL = if (localAkkaCommunication) {
// JobManager and TaskManager are in the same ActorSystem -> Use local Akka URL // JobManager and TaskManager are in the same ActorSystem -> Use local Akka URL
JobManager.getLocalJobManagerAkkaURL JobManager.getLocalJobManagerAkkaURL
} else { }
val jobManagerAddress = configuration.getString(ConfigConstants else {
.JOB_MANAGER_IPC_ADDRESS_KEY, null) val jobManagerAddress = configuration.getString(
val jobManagerRPCPort = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null)
ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
val jobManagerRPCPort = configuration.getInteger(
ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
if (jobManagerAddress == null) { if (jobManagerAddress == null) {
throw new RuntimeException("JobManager address has not been specified in the " + throw new RuntimeException(
"configuration.") "JobManager address has not been specified in the configuration.")
} }
JobManager.getRemoteJobManagerAkkaURL(jobManagerAddress + ":" + jobManagerRPCPort) val hostPort = new InetSocketAddress(InetAddress.getByName(jobManagerAddress), jobManagerRPCPort)
JobManager.getRemoteJobManagerAkkaURL(hostPort)
} }
val slots = configuration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1) val slots = configuration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1)
......
...@@ -16,12 +16,12 @@ ...@@ -16,12 +16,12 @@
# limitations under the License. # limitations under the License.
################################################################################ ################################################################################
# Set root logger level to DEBUG and its only appender to A1. # Set root logger level to OFF to not flood build logs
log4j.rootLogger=OFF, A1 # set manually to INFO for debugging purposes
log4j.rootLogger=OFF, testlogger
# A1 is set to be a ConsoleAppender. # A1 is set to be a ConsoleAppender.
log4j.appender.A1=org.apache.log4j.ConsoleAppender log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
log4j.appender.testlogger.target = System.err
# A1 uses PatternLayout. log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
log4j.appender.A1.layout=org.apache.log4j.PatternLayout log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n \ No newline at end of file
\ No newline at end of file
...@@ -106,7 +106,7 @@ public class FlinkYarnCluster extends AbstractFlinkYarnCluster { ...@@ -106,7 +106,7 @@ public class FlinkYarnCluster extends AbstractFlinkYarnCluster {
applicationClient = actorSystem.actorOf(Props.create(ApplicationClient.class)); applicationClient = actorSystem.actorOf(Props.create(ApplicationClient.class));
// instruct ApplicationClient to start a periodical status polling // instruct ApplicationClient to start a periodical status polling
applicationClient.tell(new Messages.LocalRegisterClient(jobManagerHost + ":" + jobManagerPort), applicationClient); applicationClient.tell(new Messages.LocalRegisterClient(this.jobManagerAddress), applicationClient);
// add hook to ensure proper shutdown // add hook to ensure proper shutdown
......
...@@ -18,6 +18,8 @@ ...@@ -18,6 +18,8 @@
package org.apache.flink.yarn package org.apache.flink.yarn
import java.net.InetSocketAddress
import akka.actor._ import akka.actor._
import org.apache.flink.configuration.GlobalConfiguration import org.apache.flink.configuration.GlobalConfiguration
import org.apache.flink.runtime.ActorLogMessages import org.apache.flink.runtime.ActorLogMessages
...@@ -63,7 +65,7 @@ class ApplicationClient extends Actor with ActorLogMessages with ActorLogging { ...@@ -63,7 +65,7 @@ class ApplicationClient extends Actor with ActorLogMessages with ActorLogging {
override def receiveWithLogMessages: Receive = { override def receiveWithLogMessages: Receive = {
// ----------------------------- Registration -> Status updates -> shutdown ---------------- // ----------------------------- Registration -> Status updates -> shutdown ----------------
case LocalRegisterClient(address: String) => case LocalRegisterClient(address: InetSocketAddress) =>
val jmAkkaUrl = JobManager.getRemoteJobManagerAkkaURL(address) val jmAkkaUrl = JobManager.getRemoteJobManagerAkkaURL(address)
val jobManagerFuture = AkkaUtils.getReference(jmAkkaUrl, system, timeout) val jobManagerFuture = AkkaUtils.getReference(jmAkkaUrl, system, timeout)
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
package org.apache.flink.yarn package org.apache.flink.yarn
import java.net.InetSocketAddress
import java.util.Date import java.util.Date
import akka.actor.ActorRef import akka.actor.ActorRef
...@@ -41,7 +42,7 @@ object Messages { ...@@ -41,7 +42,7 @@ object Messages {
case object CheckForUserCommand case object CheckForUserCommand
// Client-local messages // Client-local messages
case class LocalRegisterClient(jobManagerAddress: String) case class LocalRegisterClient(jobManagerAddress: InetSocketAddress)
case object LocalGetYarnMessage // request new message case object LocalGetYarnMessage // request new message
case object LocalGetYarnClusterStatus // request the latest cluster status case object LocalGetYarnClusterStatus // request the latest cluster status
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册