From 2dcff4c122f37ca58a485e886fec9295fef0c832 Mon Sep 17 00:00:00 2001 From: Stephan Ewen Date: Mon, 16 Feb 2015 19:48:32 +0100 Subject: [PATCH] [FLINK-1559] [akka] Normalize all akka URLs to use IP addresses rather than hostnames --- .../apache/flink/runtime/akka/AkkaUtils.scala | 33 +++++++++++++------ .../flink/runtime/client/JobClient.scala | 5 +-- .../flink/runtime/jobmanager/JobManager.scala | 14 ++------ .../minicluster/FlinkMiniCluster.scala | 6 +++- .../runtime/taskmanager/TaskManager.scala | 20 ++++++----- .../src/test/resources/log4j-test.properties | 14 ++++---- .../apache/flink/yarn/FlinkYarnCluster.java | 2 +- .../apache/flink/yarn/ApplicationClient.scala | 4 ++- .../org/apache/flink/yarn/Messages.scala | 3 +- 9 files changed, 58 insertions(+), 43 deletions(-) diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala index 2c9daadfd01..5a1a2be5dea 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala @@ -19,6 +19,7 @@ package org.apache.flink.runtime.akka import java.io.IOException +import java.net.InetAddress import java.util.concurrent.{TimeUnit, Callable} import akka.actor.Actor.Receive @@ -94,9 +95,12 @@ object AkkaUtils { val defaultConfig = getBasicAkkaConfig(configuration) listeningAddress match { + case Some((hostname, port)) => - val remoteConfig = getRemoteAkkaConfig(configuration, hostname, port) + val ipAddress = InetAddress.getByName(hostname).getHostAddress() + val remoteConfig = getRemoteAkkaConfig(configuration, ipAddress, port) remoteConfig.withFallback(defaultConfig) + case None => defaultConfig } @@ -174,10 +178,12 @@ object AkkaUtils { */ private def getRemoteAkkaConfig(configuration: Configuration, hostname: String, port: Int): Config = { - val akkaAskTimeout = Duration(configuration.getString(ConfigConstants.AKKA_ASK_TIMEOUT, + val akkaAskTimeout = Duration(configuration.getString( + ConfigConstants.AKKA_ASK_TIMEOUT, ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT)) - val startupTimeout = configuration.getString(ConfigConstants.AKKA_STARTUP_TIMEOUT, + val startupTimeout = configuration.getString( + ConfigConstants.AKKA_STARTUP_TIMEOUT, akkaAskTimeout.toString) val transportHeartbeatInterval = configuration.getString( @@ -188,25 +194,32 @@ object AkkaUtils { ConfigConstants.AKKA_TRANSPORT_HEARTBEAT_PAUSE, ConfigConstants.DEFAULT_AKKA_TRANSPORT_HEARTBEAT_PAUSE) - val transportThreshold = configuration.getDouble(ConfigConstants.AKKA_TRANSPORT_THRESHOLD, + val transportThreshold = configuration.getDouble( + ConfigConstants.AKKA_TRANSPORT_THRESHOLD, ConfigConstants.DEFAULT_AKKA_TRANSPORT_THRESHOLD) val watchHeartbeatInterval = configuration.getString( - ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, (akkaAskTimeout/10).toString) + ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, + (akkaAskTimeout/10).toString) - val watchHeartbeatPause = configuration.getString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, + val watchHeartbeatPause = configuration.getString( + ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, akkaAskTimeout.toString) - val watchThreshold = configuration.getDouble(ConfigConstants.AKKA_WATCH_THRESHOLD, + val watchThreshold = configuration.getDouble( + ConfigConstants.AKKA_WATCH_THRESHOLD, ConfigConstants.DEFAULT_AKKA_WATCH_THRESHOLD) - val akkaTCPTimeout = configuration.getString(ConfigConstants.AKKA_TCP_TIMEOUT, + val akkaTCPTimeout = configuration.getString( + ConfigConstants.AKKA_TCP_TIMEOUT, akkaAskTimeout.toString) - val akkaFramesize = configuration.getString(ConfigConstants.AKKA_FRAMESIZE, + val akkaFramesize = configuration.getString( + ConfigConstants.AKKA_FRAMESIZE, ConfigConstants.DEFAULT_AKKA_FRAMESIZE) - val lifecycleEvents = configuration.getBoolean(ConfigConstants.AKKA_LOG_LIFECYCLE_EVENTS, + val lifecycleEvents = configuration.getBoolean( + ConfigConstants.AKKA_LOG_LIFECYCLE_EVENTS, ConfigConstants.DEFAULT_AKKA_LOG_LIFECYCLE_EVENTS) val logLifecycleEvents = if (lifecycleEvents) "on" else "off" diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala index 60aefe45b87..34df4eb8013 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala @@ -19,7 +19,7 @@ package org.apache.flink.runtime.client import java.io.IOException -import java.net.InetSocketAddress +import java.net.{InetAddress, InetSocketAddress} import akka.actor.Status.Failure import akka.actor._ @@ -172,7 +172,8 @@ object JobClient { "JobManager address has not been specified in the configuration.") } - JobManager.getRemoteJobManagerAkkaURL(jobManagerAddress + ":" + jobManagerRPCPort) + val hostPort = new InetSocketAddress(InetAddress.getByName(jobManagerAddress), jobManagerRPCPort) + JobManager.getRemoteJobManagerAkkaURL(hostPort) } } diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 1682443cabe..d522d2d52e0 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -848,17 +848,6 @@ object JobManager { // Resolving the JobManager endpoint // -------------------------------------------------------------------------- - /** - * Builds the akka actor path for the JobManager actor, given the address (host:port) - * where the JobManager's actor system runs. - * - * @param address The address (host:port) of the JobManager's actor system. - * @return The akka URL of the JobManager actor. - */ - def getRemoteJobManagerAkkaURL(address: String): String = { - s"akka.tcp://flink@$address/user/$JOB_MANAGER_NAME" - } - /** * Builds the akka actor path for the JobManager actor, given the socket address * where the JobManager's actor system runs. @@ -867,7 +856,8 @@ object JobManager { * @return The akka URL of the JobManager actor. */ def getRemoteJobManagerAkkaURL(address: InetSocketAddress): String = { - getRemoteJobManagerAkkaURL(address.getAddress().getHostAddress() + ":" + address.getPort) + val hostPort = address.getAddress().getHostAddress() + ":" + address.getPort() + s"akka.tcp://flink@$hostPort/user/$JOB_MANAGER_NAME" } /** diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala index dd158cb3beb..8f79003bb70 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala @@ -18,6 +18,8 @@ package org.apache.flink.runtime.minicluster +import java.net.InetAddress + import akka.pattern.ask import akka.actor.{ActorRef, ActorSystem} import com.typesafe.config.Config @@ -42,7 +44,9 @@ abstract class FlinkMiniCluster(userConfiguration: Configuration, val singleActorSystem: Boolean) { import FlinkMiniCluster._ - val HOSTNAME = "localhost" + // NOTE: THIS MUST BE getByName("localhost"), which is 127.0.0.1 and + // not getLocalHost(), which may be 127.0.1.1 + val HOSTNAME = InetAddress.getByName("localhost").getHostAddress() implicit val timeout = AkkaUtils.getTimeout(userConfiguration) diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index 4a0ae72b033..7b84928e086 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -821,18 +821,22 @@ object TaskManager { val jobManagerURL = if (localAkkaCommunication) { // JobManager and TaskManager are in the same ActorSystem -> Use local Akka URL JobManager.getLocalJobManagerAkkaURL - } else { - val jobManagerAddress = configuration.getString(ConfigConstants - .JOB_MANAGER_IPC_ADDRESS_KEY, null) - val jobManagerRPCPort = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, - ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT) + } + else { + val jobManagerAddress = configuration.getString( + ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null) + + val jobManagerRPCPort = configuration.getInteger( + ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, + ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT) if (jobManagerAddress == null) { - throw new RuntimeException("JobManager address has not been specified in the " + - "configuration.") + throw new RuntimeException( + "JobManager address has not been specified in the configuration.") } - JobManager.getRemoteJobManagerAkkaURL(jobManagerAddress + ":" + jobManagerRPCPort) + val hostPort = new InetSocketAddress(InetAddress.getByName(jobManagerAddress), jobManagerRPCPort) + JobManager.getRemoteJobManagerAkkaURL(hostPort) } val slots = configuration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1) diff --git a/flink-tests/src/test/resources/log4j-test.properties b/flink-tests/src/test/resources/log4j-test.properties index 0b686e543bb..b2d89ff2148 100644 --- a/flink-tests/src/test/resources/log4j-test.properties +++ b/flink-tests/src/test/resources/log4j-test.properties @@ -16,12 +16,12 @@ # limitations under the License. ################################################################################ -# Set root logger level to DEBUG and its only appender to A1. -log4j.rootLogger=OFF, A1 +# Set root logger level to OFF to not flood build logs +# set manually to INFO for debugging purposes +log4j.rootLogger=OFF, testlogger # A1 is set to be a ConsoleAppender. -log4j.appender.A1=org.apache.log4j.ConsoleAppender - -# A1 uses PatternLayout. -log4j.appender.A1.layout=org.apache.log4j.PatternLayout -log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n \ No newline at end of file +log4j.appender.testlogger=org.apache.log4j.ConsoleAppender +log4j.appender.testlogger.target = System.err +log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout +log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n \ No newline at end of file diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java index abfd4a90269..6fedd9a7778 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java @@ -106,7 +106,7 @@ public class FlinkYarnCluster extends AbstractFlinkYarnCluster { applicationClient = actorSystem.actorOf(Props.create(ApplicationClient.class)); // instruct ApplicationClient to start a periodical status polling - applicationClient.tell(new Messages.LocalRegisterClient(jobManagerHost + ":" + jobManagerPort), applicationClient); + applicationClient.tell(new Messages.LocalRegisterClient(this.jobManagerAddress), applicationClient); // add hook to ensure proper shutdown diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala index b8079ab7a9e..684990bb86d 100644 --- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala +++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala @@ -18,6 +18,8 @@ package org.apache.flink.yarn +import java.net.InetSocketAddress + import akka.actor._ import org.apache.flink.configuration.GlobalConfiguration import org.apache.flink.runtime.ActorLogMessages @@ -63,7 +65,7 @@ class ApplicationClient extends Actor with ActorLogMessages with ActorLogging { override def receiveWithLogMessages: Receive = { // ----------------------------- Registration -> Status updates -> shutdown ---------------- - case LocalRegisterClient(address: String) => + case LocalRegisterClient(address: InetSocketAddress) => val jmAkkaUrl = JobManager.getRemoteJobManagerAkkaURL(address) val jobManagerFuture = AkkaUtils.getReference(jmAkkaUrl, system, timeout) diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/Messages.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/Messages.scala index 5cdbbffb1c9..0ac135da3ef 100644 --- a/flink-yarn/src/main/scala/org/apache/flink/yarn/Messages.scala +++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/Messages.scala @@ -18,6 +18,7 @@ package org.apache.flink.yarn +import java.net.InetSocketAddress import java.util.Date import akka.actor.ActorRef @@ -41,7 +42,7 @@ object Messages { case object CheckForUserCommand // Client-local messages - case class LocalRegisterClient(jobManagerAddress: String) + case class LocalRegisterClient(jobManagerAddress: InetSocketAddress) case object LocalGetYarnMessage // request new message case object LocalGetYarnClusterStatus // request the latest cluster status -- GitLab