From 0ae72041a7e5d7c7179b65c75fec558f941f8eac Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Sat, 11 Oct 2014 01:08:42 +0200 Subject: [PATCH] Implemented proper shutdown of yarn containers if the system is shutdown via the yarn client. --- flink-addons/flink-yarn/pom.xml | 8 +- .../java/org/apache/flink/yarn/Client.java | 65 +++++++--------- .../yarn/appMaster/YarnTaskManagerRunner.java | 12 ++- .../apache/flink/yarn/ApplicationClient.scala | 31 ++++---- .../apache/flink/yarn/ApplicationMaster.scala | 29 ++++--- .../org/apache/flink/yarn/Messages.scala | 1 - ...{YarnMaster.scala => YarnJobManager.scala} | 68 ++++++++-------- .../apache/flink/yarn/YarnTaskManager.scala | 37 +++++++++ .../org/apache/flink/yarn/YarnUtils.scala | 26 ++++++- flink-dist/pom.xml | 38 +++------ flink-dist/src/main/assemblies/bin.xml | 9 ++- .../src/main/assemblies/yarn-uberjar.xml | 67 ---------------- flink-dist/src/main/assemblies/yarn.xml | 2 +- .../src/main/flink-bin/conf/flink-conf.yaml | 77 ------------------- .../apache/flink/runtime/akka/AkkaUtils.scala | 9 ++- .../flink/runtime/client/JobClient.scala | 6 +- .../flink/runtime/jobmanager/JobManager.scala | 6 +- .../runtime/jobmanager/WithWebServer.scala | 5 +- .../runtime/taskmanager/TaskManager.scala | 29 +++++-- 19 files changed, 232 insertions(+), 293 deletions(-) rename flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/{YarnMaster.scala => YarnJobManager.scala} (85%) create mode 100644 flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala delete mode 100644 flink-dist/src/main/assemblies/yarn-uberjar.xml delete mode 100644 flink-dist/src/main/flink-bin/conf/flink-conf.yaml diff --git a/flink-addons/flink-yarn/pom.xml b/flink-addons/flink-yarn/pom.xml index f1b5b2635df..2219f33d860 100644 --- a/flink-addons/flink-yarn/pom.xml +++ b/flink-addons/flink-yarn/pom.xml @@ -64,7 +64,13 @@ under the License. com.typesafe.akka akka-camel_2.10 - + + + org.apache.camel + camel-stream + 2.14.0 + + com.google.guava diff --git a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Client.java b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Client.java index 587fa3431fd..b7c8e51edfc 100644 --- a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Client.java +++ b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Client.java @@ -42,14 +42,12 @@ import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; import org.apache.commons.cli.PosixParser; import org.apache.commons.lang3.StringUtils; -import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.flink.client.CliFrontend; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.GlobalConfiguration; -import org.apache.flink.yarn.Messages.WaitForJobTermination$; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -73,7 +71,6 @@ import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.client.api.YarnClientApplication; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.util.Records; -import scala.concurrent.duration.Duration; /** * All classes in this package contain code taken from @@ -128,16 +125,11 @@ public class Client { public static final String ENV_CLIENT_HOME_DIR = "_CLIENT_HOME_DIR"; public static final String ENV_CLIENT_SHIP_FILES = "_CLIENT_SHIP_FILES"; public static final String ENV_CLIENT_USERNAME = "_CLIENT_USERNAME"; - public static final String ENV_AM_PRC_PORT = "_AM_PRC_PORT"; public static final String ENV_SLOTS = "_SLOTS"; public static final String ENV_DYNAMIC_PROPERTIES = "_DYNAMIC_PROPERTIES"; private static final String CONFIG_FILE_NAME = "flink-conf.yaml"; - /** - * Seconds to wait between each status query to the AM. - */ - private static final int CLIENT_POLLING_INTERVALL = 3; /** * Minimum memory requirements, checked by the Client. */ @@ -314,11 +306,11 @@ public class Client { + "of "+MIN_TM_MEMORY+" MB"); System.exit(1); } - + if(cmd.hasOption(SLOTS.getOpt())) { slots = Integer.valueOf(cmd.getOptionValue(SLOTS.getOpt())); } - + String[] dynamicProperties = null; if(cmd.hasOption(DYNAMIC_PROPERTIES.getOpt())) { dynamicProperties = cmd.getOptionValues(DYNAMIC_PROPERTIES.getOpt()); @@ -336,7 +328,7 @@ public class Client { LOG.warn("Unable to find job manager port in configuration!"); jmPort = ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT; } - + conf = Utils.initializeYarnConfiguration(); // intialize HDFS @@ -435,7 +427,7 @@ public class Client { if(hasLog4j) { amCommand += " -Dlog4j.configuration=file:log4j.properties"; } - + amCommand += " "+ApplicationMaster.class.getName()+" " + " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager-stdout.log" @@ -448,14 +440,14 @@ public class Client { ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext(); final ApplicationId appId = appContext.getApplicationId(); /** - * All network ports are offsetted by the application number + * All network ports are offsetted by the application number * to avoid version port clashes when running multiple Flink sessions * in parallel */ int appNumber = appId.getId(); jmPort = Utils.offsetPort(jmPort, appNumber); - + // Setup jar for ApplicationMaster LocalResource appMasterJar = Records.newRecord(LocalResource.class); LocalResource flinkConf = Records.newRecord(LocalResource.class); @@ -531,23 +523,23 @@ public class Client { LOG.info("Submitting application master " + appId); yarnClient.submitApplication(appContext); - ApplicationReport appReport = yarnClient.getApplicationReport(appId); Runtime.getRuntime().addShutdownHook(new ClientShutdownHook()); // start actor system LOG.info("Start actor system."); - actorSystem = AkkaUtils.createActorSystem(); + actorSystem = YarnUtils.createActorSystem(); // start application client - String path = appReport.getHost() + ":" + jmPort; LOG.info("Start application client."); - applicationClient = actorSystem.actorOf(Props.create(ApplicationClient.class, path, - yarnClient)); + applicationClient = actorSystem.actorOf(Props.create(ApplicationClient.class, appId, jmPort, + yarnClient, confDirPath, slots, taskManagerCount, dynamicPropertiesEncoded)); - appReport = AkkaUtils.ask(applicationClient, - WaitForJobTermination$.MODULE$, - AkkaUtils.FUTURE_TIMEOUT(), Duration.Inf()); + actorSystem.awaitTermination(); + + actorSystem = null; + + ApplicationReport appReport = yarnClient.getApplicationReport(appId); LOG.info("Application " + appId + " finished with state " + appReport .getYarnApplicationState() + " and final state " + appReport @@ -564,7 +556,7 @@ public class Client { } private void stopSession() { - try { + if(actorSystem != null){ LOG.info("Sending shutdown request to the Application Master"); if(applicationClient != ActorRef.noSender()) { applicationClient.tell(new Messages.StopYarnSession(FinalApplicationStatus.KILLED), @@ -572,15 +564,20 @@ public class Client { applicationClient = ActorRef.noSender(); } - if(actorSystem != null){ - actorSystem.shutdown(); - actorSystem.awaitTermination(); + actorSystem.shutdown(); + actorSystem.awaitTermination(); - actorSystem = null; - } - } catch (Exception e) { - LOG.warn("Exception while killing the YARN application", e); + actorSystem = null; } + + try { + FileSystem shutFS = FileSystem.get(conf); + shutFS.delete(sessionFilesDir, true); // delete conf and jar file. + shutFS.close(); + }catch(IOException e){ + LOG.error("Could not delete the conf and jar files.", e); + } + try { yarnPropertiesFile.delete(); } catch (Exception e) { @@ -590,14 +587,6 @@ public class Client { yarnClient.stop(); LOG.info("Deleting files in "+sessionFilesDir ); - - try { - FileSystem shutFS = FileSystem.get(conf); - shutFS.delete(sessionFilesDir, true); // delete conf and jar file. - shutFS.close(); - }catch(IOException e){ - LOG.error("Could not delete the conf and jar files.", e); - } } public class ClientShutdownHook extends Thread { diff --git a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java index 0628722d886..33a8942b1cc 100644 --- a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java +++ b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java @@ -23,14 +23,17 @@ import java.security.PrivilegedAction; import java.util.Arrays; import java.util.Map; +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import org.apache.flink.yarn.YarnUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.flink.runtime.taskmanager.TaskManager; import org.apache.flink.yarn.Client; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; +import scala.Tuple2; public class YarnTaskManagerRunner { @@ -44,7 +47,7 @@ public class YarnTaskManagerRunner { // configure local directory final String[] newArgs = Arrays.copyOf(args, args.length + 2); - newArgs[newArgs.length-2] = "--configDir"; + newArgs[newArgs.length-2] = "--tempDir"; newArgs[newArgs.length-1] = localDirs; LOG.info("Setting log path "+localDirs); LOG.info("YARN daemon runs as '"+UserGroupInformation.getCurrentUser().getShortUserName()+"' setting" @@ -57,7 +60,10 @@ public class YarnTaskManagerRunner { @Override public Object run() { try { - TaskManager.main(newArgs); + Tuple2 tuple = YarnUtils + .startActorSystemAndTaskManager(newArgs); + + tuple._1().awaitTermination(); } catch (Exception e) { LOG.error("Error while running the TaskManager", e); } diff --git a/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala b/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala index e19045f1146..f598a0c6ce8 100644 --- a/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala +++ b/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala @@ -33,10 +33,10 @@ ApplicationId} import org.apache.hadoop.yarn.client.api.YarnClient import scala.concurrent.duration._ -class ApplicationClient(appId: ApplicationId, address: String, yarnClient: YarnClient, +class ApplicationClient(appId: ApplicationId, port: Int, yarnClient: YarnClient, confDirPath: String, slots: Int, numTaskManagers: Int, dynamicPropertiesEncoded: String) - extends Actor with ActorLogMessages with ActorLogging with Consumer { + extends Actor with Consumer with ActorLogMessages with ActorLogging { import context._ val INITIAL_POLLING_DELAY = 0 seconds @@ -45,8 +45,6 @@ class ApplicationClient(appId: ApplicationId, address: String, yarnClient: YarnC val waitingChars = Array[Char]('/', '|', '\\', '-') - val terminationListeners = scala.collection.mutable.HashSet[ActorRef]() - var jobManager: Option[ActorRef] = None var pollingTimer: Option[Cancellable] = None var running = false @@ -55,11 +53,13 @@ class ApplicationClient(appId: ApplicationId, address: String, yarnClient: YarnC def endpointUri = "stream:in" override def preStart(): Unit = { + super.preStart() pollingTimer = Some(context.system.scheduler.schedule(INITIAL_POLLING_DELAY, WAIT_FOR_YARN_INTERVAL, self, PollYarnReport)) } override def postStop(): Unit = { + log.info("Stopped Application client.") pollingTimer foreach { _.cancel() } @@ -68,25 +68,25 @@ class ApplicationClient(appId: ApplicationId, address: String, yarnClient: YarnC } override def receiveWithLogMessages: Receive = { - case WaitForJobTermination => - terminationListeners += sender() case PollYarnReport => { val report = yarnClient.getApplicationReport(appId) + log.info(s"Yarn state ${report.getYarnApplicationState}, " + + s"state ${report.getFinalApplicationStatus}") + report.getYarnApplicationState match { case YarnApplicationState.FINISHED | YarnApplicationState.KILLED | YarnApplicationState .FAILED => { - terminationListeners foreach { - _ ! report - } + log.info(s"Terminate polling.") - self ! PoisonPill + context.system.shutdown() } - case YarnApplicationState.RUNNING => { - println(s"Flink JobManager is now running on $address") - println(s"JobManager Web Interface: ${report.getTrackingUrl}") + case YarnApplicationState.RUNNING if !running => { + val address = s"${report.getHost}:$port" + log.info(s"Flink JobManager is now running on $address") + log.info(s"JobManager Web Interface: ${report.getTrackingUrl}") - writeYarnProperties() + writeYarnProperties(address) jobManager = Some(AkkaUtils.getReference(JobManager.getAkkaURL(address))) jobManager.get ! RegisterMessageListener @@ -116,6 +116,7 @@ class ApplicationClient(appId: ApplicationId, address: String, yarnClient: YarnC println(msg) } case msg: StopYarnSession => { + log.info("Stop yarn session.") jobManager foreach { _ forward msg } @@ -136,7 +137,7 @@ class ApplicationClient(appId: ApplicationId, address: String, yarnClient: YarnC """.stripMargin) } - def writeYarnProperties(): Unit = { + def writeYarnProperties(address: String): Unit = { val yarnProps = new Properties() yarnProps.setProperty(CliFrontend.YARN_PROPERTIES_JOBMANAGER_KEY, address) diff --git a/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala b/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala index eab1b484948..628db6d7339 100644 --- a/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala +++ b/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala @@ -37,6 +37,9 @@ object ApplicationMaster{ val LOG = LoggerFactory.getLogger(this.getClass) + val CONF_FILE = "flink-conf.yaml" + val MODIFIED_CONF_FILE = "flink-conf-modified.yaml" + def main(args: Array[String]): Unit ={ val yarnClientUsername = System.getenv(Client.ENV_CLIENT_USERNAME) LOG.info(s"YARN daemon runs as ${UserGroupInformation.getCurrentUser.getShortUserName} " + @@ -90,12 +93,14 @@ object ApplicationMaster{ actorSystem = system jobManager = actor + LOG.info("Start yarn session on job manager.") jobManager ! StartYarnSession(conf) + LOG.info("Await termination of actor system.") actorSystem.awaitTermination() }catch{ case t: Throwable => - LOG.error("Error while running the application master.") + LOG.error("Error while running the application master.", t) if(actorSystem != null){ actorSystem.shutdown() @@ -115,13 +120,13 @@ object ApplicationMaster{ jobManagerWebPort: Int, logDirs: String, slots: Int, taskManagerCount: Int, dynamicPropertiesEncodedString: String) : Unit = { - val output = new PrintWriter(new BufferedWriter(new FileWriter(s"$currDir/flink-conf-modified" + - s".yaml"))) + LOG.info("Generate configuration file for application master.") + val output = new PrintWriter(new BufferedWriter( + new FileWriter(s"$currDir/$MODIFIED_CONF_FILE")) + ) - val exclusions = Set(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, - ConfigConstants.JOB_MANAGER_WEB_ROOT_PATH_KEY) - for (line <- Source.fromFile(s"$currDir/flink-conf.yaml").getLines() if !(exclusions exists - {line.contains(_)})) { + for (line <- Source.fromFile(s"$currDir/$CONF_FILE").getLines() if !(line.contains + (ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY))) { output.println(line) } @@ -149,15 +154,17 @@ object ApplicationMaster{ } def startJobManager(currDir: String): (ActorSystem, ActorRef) = { - - val pathToConfig = s"$currDir/flink-conf.modified.yaml" + LOG.info("Start job manager for yarn") + val pathToConfig = s"$currDir/$MODIFIED_CONF_FILE" val args = Array[String]("--configDir", pathToConfig) - val (hostname, port, configuration) = JobManager.initialize(args) + LOG.info(s"Config path: ${pathToConfig}.") + val (hostname, port, configuration) = JobManager.parseArgs(args) implicit val jobManagerSystem = YarnUtils.createActorSystem(hostname, port, configuration) + LOG.info("Start job manager actor.") (jobManagerSystem, JobManager.startActor(Props(new JobManager(configuration) with - WithWebServer with YarnMaster))) + WithWebServer with YarnJobManager))) } } diff --git a/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/Messages.scala b/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/Messages.scala index b45523bdc77..cc921652421 100644 --- a/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/Messages.scala +++ b/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/Messages.scala @@ -27,7 +27,6 @@ object Messages { case class YarnMessage(message: String, date: Date = new Date()) case class ApplicationMasterStatus(numTaskManagers: Int, numSlots: Int) case object RegisterMessageListener - case object WaitForJobTermination case class StopYarnSession(status: FinalApplicationStatus) case class StartYarnSession(configuration: Configuration) diff --git a/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnMaster.scala b/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala similarity index 85% rename from flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnMaster.scala rename to flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala index bd26d6b1adf..a1bed1b7e09 100644 --- a/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnMaster.scala +++ b/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala @@ -42,7 +42,7 @@ import org.apache.hadoop.yarn.util.Records import scala.concurrent.duration._ -trait YarnMaster extends ActorLogMessages { +trait YarnJobManager extends ActorLogMessages { that: JobManager => import context._ @@ -59,16 +59,20 @@ trait YarnMaster extends ActorLogMessages { var allocatedContainers = 0 var completedContainers = 0 var numTaskManager = 0 - var actorState = startYarnSessionMessages abstract override def receiveWithLogMessages: Receive = { - actorState orElse receiveYarnMessages orElse super.receiveWithLogMessages + receiveYarnMessages orElse super.receiveWithLogMessages } def receiveYarnMessages: Receive = { case StopYarnSession(status) => - log.info("Stopping Yarn JobManager.") + log.info("Stopping Yarn Session.") + + instanceManager.getAllRegisteredInstances foreach { + instance => + instance.getTaskManager ! StopYarnSession(status) + } rmClientOption foreach { rmClient => @@ -84,22 +88,22 @@ trait YarnMaster extends ActorLogMessages { nmClientOption = None - self ! PoisonPill - - log.info("Stopped Yarn JobManager.") + context.system.shutdown() case RegisterMessageListener => messageListener = Some(sender()) - } - def startYarnSessionMessages: Receive = { case StartYarnSession(conf) => { + log.info("Start yarn session.") val memoryPerTaskManager = env.get(Client.ENV_TM_MEMORY).toInt val heapLimit = Utils.calculateHeapSize(memoryPerTaskManager) val applicationMasterHost = env.get(Environment.NM_HOST.key) require(applicationMasterHost != null, s"Application master (${Environment.NM_HOST} not set.") + numTaskManager = env.get(Client.ENV_TM_COUNT).toInt + log.info(s"Requesting ${numTaskManager} task managers.") + val coresPerTaskManager = env.get(Client.ENV_TM_CORES).toInt val remoteFlinkJarPath = env.get(Client.FLINK_JAR_PATH) val fs = FileSystem.get(conf) @@ -185,25 +189,20 @@ trait YarnMaster extends ActorLogMessages { yarnClientUsername, conf, taskManagerLocalResources)) context.system.scheduler.scheduleOnce(ALLOCATION_DELAY, self, PollContainerCompletion) - - actorState = pollContainerCompletionMessages } - } - def pollContainerCompletionMessages: Receive = { case PollContainerCompletion => { rmClientOption match { case Some(rmClient) => { val response = rmClient.allocate(completedContainers.toFloat / numTaskManager) + for (container <- response.getAllocatedContainers) { log.info(s"Got new container for TM ${container.getId} on host ${ - container.getNodeId - .getHost - }") + container.getNodeId.getHost}") allocatedContainers += 1 - log.info(s"Launching container $allocatedContainers.") + log.info(s"Launching container #$allocatedContainers.") nmClientOption match { case Some(nmClient) => { containerLaunchContext match { @@ -219,26 +218,26 @@ trait YarnMaster extends ActorLogMessages { self ! StopYarnSession(FinalApplicationStatus.FAILED) } } + } - for (status <- response.getCompletedContainersStatuses) { - completedContainers += 1 - log.info(s"Completed container ${status.getContainerId}. Total completed " + - s"$completedContainers.") - log.info(s"Diagnostics ${status.getDiagnostics}.") + for (status <- response.getCompletedContainersStatuses) { + completedContainers += 1 + log.info(s"Completed container ${status.getContainerId}. Total completed " + + s"$completedContainers.") + log.info(s"Diagnostics ${status.getDiagnostics}.") - messageListener foreach { - _ ! YarnMessage(s"Diagnostics for containerID=${status.getContainerId} in " + - s"state=${status.getState}.\n${status.getDiagnostics}") - } + messageListener foreach { + _ ! YarnMessage(s"Diagnostics for containerID=${status.getContainerId} in " + + s"state=${status.getState}.\n${status.getDiagnostics}") } + } - if (allocatedContainers < numTaskManager) { - context.system.scheduler.scheduleOnce(ALLOCATION_DELAY, self, PollContainerCompletion) - } else if (completedContainers < numTaskManager) { - context.system.scheduler.scheduleOnce(COMPLETION_DELAY, self, PollContainerCompletion) - } else { - self ! StopYarnSession(FinalApplicationStatus.FAILED) - } + if (allocatedContainers < numTaskManager) { + context.system.scheduler.scheduleOnce(ALLOCATION_DELAY, self, PollContainerCompletion) + } else if (completedContainers < numTaskManager) { + context.system.scheduler.scheduleOnce(COMPLETION_DELAY, self, PollContainerCompletion) + } else { + self ! StopYarnSession(FinalApplicationStatus.FAILED) } } case None => { @@ -253,10 +252,11 @@ trait YarnMaster extends ActorLogMessages { yarnClientUsername: String, yarnConf: Configuration, taskManagerLocalResources: Map[String, LocalResource]): ContainerLaunchContext = { + log.info("Create container launch context.") val ctx = Records.newRecord(classOf[ContainerLaunchContext]) val javaOpts = configuration.getString(ConfigConstants.FLINK_JVM_OPTIONS, "") - val tmCommand = new StringBuilder(s"$$JAVA_HOME/bin/java -Xmx ${heapLimit}m $javaOpts") + val tmCommand = new StringBuilder(s"$$JAVA_HOME/bin/java -Xmx${heapLimit}m $javaOpts") if (hasLogback || hasLog4j) { tmCommand ++= diff --git a/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala b/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala new file mode 100644 index 00000000000..16527059992 --- /dev/null +++ b/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn + +import org.apache.flink.runtime.ActorLogMessages +import org.apache.flink.runtime.taskmanager.TaskManager +import org.apache.flink.yarn.Messages.StopYarnSession + +trait YarnTaskManager extends ActorLogMessages { + that: TaskManager => + + abstract override def receiveWithLogMessages: Receive = { + receiveYarnMessages orElse super.receiveWithLogMessages + } + + def receiveYarnMessages: Receive = { + case StopYarnSession(status) => { + context.system.shutdown() + } + } +} diff --git a/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnUtils.scala b/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnUtils.scala index 4aaabd0462d..1fe13774e6b 100644 --- a/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnUtils.scala +++ b/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnUtils.scala @@ -18,10 +18,11 @@ package org.apache.flink.yarn -import akka.actor.ActorSystem +import akka.actor.{Props, ActorRef, ActorSystem} import com.typesafe.config.ConfigFactory import org.apache.flink.configuration.Configuration import org.apache.flink.runtime.akka.AkkaUtils +import org.apache.flink.runtime.taskmanager.TaskManager object YarnUtils { def createActorSystem(hostname: String, port: Int, configuration: Configuration): ActorSystem = { @@ -31,7 +32,28 @@ object YarnUtils { AkkaUtils.createActorSystem(akkaConfig) } + def createActorSystem: ActorSystem = { + val akkaConfig = ConfigFactory.parseString(AkkaUtils.getDefaultActorSystemConfigString + + getConfigString) + + AkkaUtils.createActorSystem(akkaConfig) + } + def getConfigString: String = { - s"""""".stripMargin + s"""akka.loglevel = "INFO" + |akka.stdout-loglevel = "INFO" + |""".stripMargin + } + + def startActorSystemAndTaskManager(args: Array[String]): (ActorSystem, ActorRef) = { + val (hostname, port, config) = TaskManager.parseArgs(args) + + val actorSystem = createActorSystem(hostname, port, config) + + val (connectionInfo, jobManagerURL, taskManagerConfig, networkConnectionConfiguration) = + TaskManager.parseConfiguration(hostname, config, false) + + (actorSystem, TaskManager.startActor(Props(new TaskManager(connectionInfo, jobManagerURL, + taskManagerConfig, networkConnectionConfiguration) with YarnTaskManager))(actorSystem)) } } diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml index 93c066747f3..d39d2ebcfe1 100644 --- a/flink-dist/pom.xml +++ b/flink-dist/pom.xml @@ -162,6 +162,7 @@ under the License. + org.apache.maven.plugins maven-shade-plugin 2.3 @@ -181,6 +182,16 @@ under the License. org.apache.flink:flink-streaming-examples + + + org.apache.flink:flink-clients + + + web-docs/** + + + + @@ -201,33 +212,6 @@ under the License. maven-assembly-plugin 2.4 - - - - - - - - - - - - - - - - - - - - - - - - - - - yarn-bin diff --git a/flink-dist/src/main/assemblies/bin.xml b/flink-dist/src/main/assemblies/bin.xml index 18c8b3d32c0..9ad8e47eb2d 100644 --- a/flink-dist/src/main/assemblies/bin.xml +++ b/flink-dist/src/main/assemblies/bin.xml @@ -50,8 +50,15 @@ under the License. - + + + src/main/resources/flink-conf.yaml + conf + 0644 + + + src/main/flink-bin/bin diff --git a/flink-dist/src/main/assemblies/yarn-uberjar.xml b/flink-dist/src/main/assemblies/yarn-uberjar.xml deleted file mode 100644 index d677230d107..00000000000 --- a/flink-dist/src/main/assemblies/yarn-uberjar.xml +++ /dev/null @@ -1,67 +0,0 @@ - - - - - - - yarn-uberjar - - jar - - - false - - - - / - true - true - runtime - - org.apache.flink:flink-java-examples:* - org.apache.flink:flink-scala-examples:* - org.apache.flink:flink-streaming-examples:* - - - - - - - - src/main/flink-bin/conf/flink-conf.yaml - / - 0644 - - - - - - - ../flink-runtime/resources - resources - 0644 - - *etc/users - - - - diff --git a/flink-dist/src/main/assemblies/yarn.xml b/flink-dist/src/main/assemblies/yarn.xml index 6cb7d3da178..7d49b014783 100644 --- a/flink-dist/src/main/assemblies/yarn.xml +++ b/flink-dist/src/main/assemblies/yarn.xml @@ -111,7 +111,7 @@ under the License. - src/main/flink-bin/conf/flink-conf.yaml + src/main/resources/flink-conf.yaml conf 0644 diff --git a/flink-dist/src/main/flink-bin/conf/flink-conf.yaml b/flink-dist/src/main/flink-bin/conf/flink-conf.yaml deleted file mode 100644 index 65a87b97619..00000000000 --- a/flink-dist/src/main/flink-bin/conf/flink-conf.yaml +++ /dev/null @@ -1,77 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ - - -#============================================================================== -# Common -#============================================================================== - -jobmanager.rpc.address: localhost - -jobmanager.rpc.port: 6123 - -jobmanager.heap.mb: 256 - -taskmanager.heap.mb: 512 - -taskmanager.numberOfTaskSlots: -1 - -parallelization.degree.default: 1 - -#============================================================================== -# Web Frontend -#============================================================================== - -jobmanager.web.port: 8081 - -webclient.port: 8080 - -#============================================================================== -# Advanced -#============================================================================== - -# The number of buffers for the network stack. -# -# taskmanager.network.numberOfBuffers: 2048 - -# Directories for temporary files. -# -# Add a delimited list for multiple directories, using the system directory -# delimiter (colon ':' on unix) or a comma, e.g.: -# /data1/tmp:/data2/tmp:/data3/tmp -# -# Note: Each directory entry is read from and written to by a different I/O -# thread. You can include the same directory multiple times in order to create -# multiple I/O threads against that directory. This is for example relevant for -# high-throughput RAIDs. -# -# If not specified, the system-specific Java temporary directory (java.io.tmpdir -# property) is taken. -# -# taskmanager.tmp.dirs: /tmp - -# Path to the Hadoop configuration directory. -# -# This configuration is used when writing into HDFS. Unless specified otherwise, -# HDFS file creation will use HDFS default settings with respect to block-size, -# replication factor, etc. -# -# You can also directly specify the paths to hdfs-default.xml and hdfs-site.xml -# via keys 'fs.hdfs.hdfsdefault' and 'fs.hdfs.hdfssite'. -# -# fs.hdfs.hadoopconf: /path/to/hadoop/conf/ diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala index c9880ac5710..bb17ea7f803 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala @@ -38,6 +38,8 @@ object AkkaUtils { implicit val AWAIT_DURATION: FiniteDuration = 1 minute implicit val FUTURE_DURATION: FiniteDuration = 1 minute + val INF_TIMEOUT = 21474835 seconds + var globalExecutionContext: ExecutionContext = ExecutionContext.global def createActorSystem(host: String, port: Int, configuration: Configuration): ActorSystem = { @@ -110,7 +112,7 @@ object AkkaUtils { |akka.loglevel = "WARNING" |akka.logging-filter = "akka.event.slf4j.Slf4jLoggingFilter" |akka.stdout-loglevel = "WARNING" - |akka.jvm-exit-on-fata-error = off + |akka.jvm-exit-on-fatal-error = off |akka.actor.provider = "akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.transport-class = "akka.remote.transport.netty.NettyTransport" |akka.remote.netty.tcp.tcp-nodelay = on @@ -154,6 +156,11 @@ object AkkaUtils { Await.result(future, duration).asInstanceOf[T] } + def askInf[T](actor: ActorRef, msg: Any): T = { + val future = Patterns.ask(actor, msg, INF_TIMEOUT) + Await.result(future, INF_TIMEOUT).asInstanceOf[T] + } + def retry[T](body: => T, tries: Int)(implicit executionContext: ExecutionContext): Future[T] = { Future{ body }.recoverWith{ case t:Throwable => diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala index ebbc57b4648..a9733ded13c 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala @@ -115,10 +115,8 @@ object JobClient{ @throws(classOf[JobExecutionException]) def submitJobAndWait(jobGraph: JobGraph, listen: Boolean, jobClient: ActorRef): JobExecutionResult = { - import AkkaUtils.FUTURE_TIMEOUT - val response = jobClient ? SubmitJobAndWait(jobGraph, listenToEvents = listen) - - Await.result(response.mapTo[JobExecutionResult],Duration.Inf) + AkkaUtils.askInf[JobExecutionResult](jobClient, + SubmitJobAndWait(jobGraph, listenToEvents = listen)) } diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index a2a207cf7a4..75f2a63175b 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -54,6 +54,8 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala { import context._ import AkkaUtils.FUTURE_TIMEOUT + log.info("Starting job manager.") + val (archiveCount, profiling, cleanupInterval) = JobManager.parseConfiguration(configuration) // Props for the profiler actor @@ -390,7 +392,7 @@ object JobManager { val PROFILER_NAME = "profiler" def main(args: Array[String]): Unit = { - val (hostname, port, configuration) = initialize(args) + val (hostname, port, configuration) = parseArgs(args) val jobManagerSystem = AkkaUtils.createActorSystem(hostname, port, configuration) @@ -398,7 +400,7 @@ object JobManager { jobManagerSystem.awaitTermination() } - def initialize(args: Array[String]): (String, Int, Configuration) = { + def parseArgs(args: Array[String]): (String, Int, Configuration) = { val parser = new scopt.OptionParser[JobManagerCLIConfiguration]("jobmanager") { head("flink jobmanager") opt[String]("configDir") action { (x, c) => c.copy(configDir = x) } text ("Specify " + diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/WithWebServer.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/WithWebServer.scala index 962c8fc6e9b..81fecbf700b 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/WithWebServer.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/WithWebServer.scala @@ -28,7 +28,10 @@ trait WithWebServer extends Actor { webServer.start() abstract override def postStop(): Unit = { - that.postStop() + log.info("Stopping webserver.") webServer.stop() + log.info("Stopped webserver.") + + super.postStop() } } diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index f2c1bf56690..136dc7f195a 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -72,6 +72,8 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka import AkkaUtils.FUTURE_TIMEOUT import taskManagerConfig._ + log.info(s"Starting task manager at ${self.path}.") + val REGISTRATION_DELAY = 0 seconds val REGISTRATION_INTERVAL = 10 seconds val MAX_REGISTRATION_ATTEMPTS = 10 @@ -440,13 +442,14 @@ object TaskManager { val PROFILER_NAME = "profiler" def main(args: Array[String]): Unit = { - val (hostname, port, configuration) = initialize(args) + val (hostname, port, configuration) = parseArgs(args) val (taskManagerSystem, _) = startActorSystemAndActor(hostname, port, configuration) + taskManagerSystem.awaitTermination() } - private def initialize(args: Array[String]): (String, Int, Configuration) = { + def parseArgs(args: Array[String]): (String, Int, Configuration) = { val parser = new scopt.OptionParser[TaskManagerCLIConfiguration]("taskmanager") { head("flink task manager") opt[String]("configDir") action { (x, c) => @@ -483,6 +486,7 @@ object TaskManager { (hostname, port, configuration) } getOrElse { + LOG.error(s"TaskManager parseArgs called with ${args.mkString(" ")}.") LOG.error("CLI parsing failed. Usage: " + parser.usage) sys.exit(FAILURE_RETURN_CODE) } @@ -491,8 +495,12 @@ object TaskManager { def startActorSystemAndActor(hostname: String, port: Int, configuration: Configuration, localExecution: Boolean = false): (ActorSystem, ActorRef) = { implicit val actorSystem = AkkaUtils.createActorSystem(hostname, port, configuration) - (actorSystem, (startActor _).tupled(parseConfiguration(hostname, configuration, - localExecution))) + + val (connectionInfo, jobManagerURL, taskManagerConfig, networkConnectionConfiguration) = + parseConfiguration(hostname, configuration, localExecution) + + (actorSystem, startActor(connectionInfo, jobManagerURL, taskManagerConfig, + networkConnectionConfiguration)) } def parseConfiguration(hostname: String, configuration: Configuration, @@ -600,14 +608,21 @@ object TaskManager { taskManagerConfig: TaskManagerConfiguration, networkConnectionConfiguration: NetworkConnectionConfiguration) (implicit actorSystem: ActorSystem): ActorRef = { - actorSystem.actorOf(Props(classOf[TaskManager], connectionInfo, jobManagerURL, - taskManagerConfig, networkConnectionConfiguration), TASK_MANAGER_NAME); + startActor(Props(new TaskManager(connectionInfo, jobManagerURL, taskManagerConfig, + networkConnectionConfiguration))) + } + + def startActor(props: Props)(implicit actorSystem: ActorSystem): ActorRef = { + actorSystem.actorOf(props, TASK_MANAGER_NAME) } def startActorWithConfiguration(hostname: String, configuration: Configuration, localExecution: Boolean = false) (implicit system: ActorSystem) = { - (startActor _).tupled(parseConfiguration(hostname, configuration, localExecution)) + val (connectionInfo, jobManagerURL, taskManagerConfig, networkConnectionConfiguration) = + parseConfiguration(hostname, configuration, localExecution) + + startActor(connectionInfo, jobManagerURL, taskManagerConfig, networkConnectionConfiguration) } def startProfiler(instancePath: String, reportInterval: Long)(implicit system: ActorSystem): -- GitLab