提交 0ae72041 编写于 作者: T Till Rohrmann

Implemented proper shutdown of yarn containers if the system is shutdown via the yarn client.

上级 76d603c0
......@@ -65,6 +65,12 @@ under the License.
<artifactId>akka-camel_2.10</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-stream</artifactId>
<version>2.14.0</version>
</dependency>
<!-- guava needs to be in "provided" scope, to make sure it is not included into the jars by the shading -->
<dependency>
<groupId>com.google.guava</groupId>
......
......@@ -42,14 +42,12 @@ import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flink.client.CliFrontend;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.yarn.Messages.WaitForJobTermination$;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
......@@ -73,7 +71,6 @@ import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.YarnClientApplication;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.Records;
import scala.concurrent.duration.Duration;
/**
* All classes in this package contain code taken from
......@@ -128,16 +125,11 @@ public class Client {
public static final String ENV_CLIENT_HOME_DIR = "_CLIENT_HOME_DIR";
public static final String ENV_CLIENT_SHIP_FILES = "_CLIENT_SHIP_FILES";
public static final String ENV_CLIENT_USERNAME = "_CLIENT_USERNAME";
public static final String ENV_AM_PRC_PORT = "_AM_PRC_PORT";
public static final String ENV_SLOTS = "_SLOTS";
public static final String ENV_DYNAMIC_PROPERTIES = "_DYNAMIC_PROPERTIES";
private static final String CONFIG_FILE_NAME = "flink-conf.yaml";
/**
* Seconds to wait between each status query to the AM.
*/
private static final int CLIENT_POLLING_INTERVALL = 3;
/**
* Minimum memory requirements, checked by the Client.
*/
......@@ -531,23 +523,23 @@ public class Client {
LOG.info("Submitting application master " + appId);
yarnClient.submitApplication(appContext);
ApplicationReport appReport = yarnClient.getApplicationReport(appId);
Runtime.getRuntime().addShutdownHook(new ClientShutdownHook());
// start actor system
LOG.info("Start actor system.");
actorSystem = AkkaUtils.createActorSystem();
actorSystem = YarnUtils.createActorSystem();
// start application client
String path = appReport.getHost() + ":" + jmPort;
LOG.info("Start application client.");
applicationClient = actorSystem.actorOf(Props.create(ApplicationClient.class, path,
yarnClient));
applicationClient = actorSystem.actorOf(Props.create(ApplicationClient.class, appId, jmPort,
yarnClient, confDirPath, slots, taskManagerCount, dynamicPropertiesEncoded));
actorSystem.awaitTermination();
actorSystem = null;
appReport = AkkaUtils.ask(applicationClient,
WaitForJobTermination$.MODULE$,
AkkaUtils.FUTURE_TIMEOUT(), Duration.Inf());
ApplicationReport appReport = yarnClient.getApplicationReport(appId);
LOG.info("Application " + appId + " finished with state " + appReport
.getYarnApplicationState() + " and final state " + appReport
......@@ -564,7 +556,7 @@ public class Client {
}
private void stopSession() {
try {
if(actorSystem != null){
LOG.info("Sending shutdown request to the Application Master");
if(applicationClient != ActorRef.noSender()) {
applicationClient.tell(new Messages.StopYarnSession(FinalApplicationStatus.KILLED),
......@@ -572,15 +564,20 @@ public class Client {
applicationClient = ActorRef.noSender();
}
if(actorSystem != null){
actorSystem.shutdown();
actorSystem.awaitTermination();
actorSystem = null;
}
} catch (Exception e) {
LOG.warn("Exception while killing the YARN application", e);
try {
FileSystem shutFS = FileSystem.get(conf);
shutFS.delete(sessionFilesDir, true); // delete conf and jar file.
shutFS.close();
}catch(IOException e){
LOG.error("Could not delete the conf and jar files.", e);
}
try {
yarnPropertiesFile.delete();
} catch (Exception e) {
......@@ -590,14 +587,6 @@ public class Client {
yarnClient.stop();
LOG.info("Deleting files in "+sessionFilesDir );
try {
FileSystem shutFS = FileSystem.get(conf);
shutFS.delete(sessionFilesDir, true); // delete conf and jar file.
shutFS.close();
}catch(IOException e){
LOG.error("Could not delete the conf and jar files.", e);
}
}
public class ClientShutdownHook extends Thread {
......
......@@ -23,14 +23,17 @@ import java.security.PrivilegedAction;
import java.util.Arrays;
import java.util.Map;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import org.apache.flink.yarn.YarnUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flink.runtime.taskmanager.TaskManager;
import org.apache.flink.yarn.Client;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import scala.Tuple2;
public class YarnTaskManagerRunner {
......@@ -44,7 +47,7 @@ public class YarnTaskManagerRunner {
// configure local directory
final String[] newArgs = Arrays.copyOf(args, args.length + 2);
newArgs[newArgs.length-2] = "--configDir";
newArgs[newArgs.length-2] = "--tempDir";
newArgs[newArgs.length-1] = localDirs;
LOG.info("Setting log path "+localDirs);
LOG.info("YARN daemon runs as '"+UserGroupInformation.getCurrentUser().getShortUserName()+"' setting"
......@@ -57,7 +60,10 @@ public class YarnTaskManagerRunner {
@Override
public Object run() {
try {
TaskManager.main(newArgs);
Tuple2<ActorSystem, ActorRef> tuple = YarnUtils
.startActorSystemAndTaskManager(newArgs);
tuple._1().awaitTermination();
} catch (Exception e) {
LOG.error("Error while running the TaskManager", e);
}
......
......@@ -33,10 +33,10 @@ ApplicationId}
import org.apache.hadoop.yarn.client.api.YarnClient
import scala.concurrent.duration._
class ApplicationClient(appId: ApplicationId, address: String, yarnClient: YarnClient,
class ApplicationClient(appId: ApplicationId, port: Int, yarnClient: YarnClient,
confDirPath: String, slots: Int, numTaskManagers: Int,
dynamicPropertiesEncoded: String)
extends Actor with ActorLogMessages with ActorLogging with Consumer {
extends Actor with Consumer with ActorLogMessages with ActorLogging {
import context._
val INITIAL_POLLING_DELAY = 0 seconds
......@@ -45,8 +45,6 @@ class ApplicationClient(appId: ApplicationId, address: String, yarnClient: YarnC
val waitingChars = Array[Char]('/', '|', '\\', '-')
val terminationListeners = scala.collection.mutable.HashSet[ActorRef]()
var jobManager: Option[ActorRef] = None
var pollingTimer: Option[Cancellable] = None
var running = false
......@@ -55,11 +53,13 @@ class ApplicationClient(appId: ApplicationId, address: String, yarnClient: YarnC
def endpointUri = "stream:in"
override def preStart(): Unit = {
super.preStart()
pollingTimer = Some(context.system.scheduler.schedule(INITIAL_POLLING_DELAY,
WAIT_FOR_YARN_INTERVAL, self, PollYarnReport))
}
override def postStop(): Unit = {
log.info("Stopped Application client.")
pollingTimer foreach {
_.cancel()
}
......@@ -68,25 +68,25 @@ class ApplicationClient(appId: ApplicationId, address: String, yarnClient: YarnC
}
override def receiveWithLogMessages: Receive = {
case WaitForJobTermination =>
terminationListeners += sender()
case PollYarnReport => {
val report = yarnClient.getApplicationReport(appId)
log.info(s"Yarn state ${report.getYarnApplicationState}, " +
s"state ${report.getFinalApplicationStatus}")
report.getYarnApplicationState match {
case YarnApplicationState.FINISHED | YarnApplicationState.KILLED | YarnApplicationState
.FAILED => {
terminationListeners foreach {
_ ! report
}
log.info(s"Terminate polling.")
self ! PoisonPill
context.system.shutdown()
}
case YarnApplicationState.RUNNING => {
println(s"Flink JobManager is now running on $address")
println(s"JobManager Web Interface: ${report.getTrackingUrl}")
case YarnApplicationState.RUNNING if !running => {
val address = s"${report.getHost}:$port"
log.info(s"Flink JobManager is now running on $address")
log.info(s"JobManager Web Interface: ${report.getTrackingUrl}")
writeYarnProperties()
writeYarnProperties(address)
jobManager = Some(AkkaUtils.getReference(JobManager.getAkkaURL(address)))
jobManager.get ! RegisterMessageListener
......@@ -116,6 +116,7 @@ class ApplicationClient(appId: ApplicationId, address: String, yarnClient: YarnC
println(msg)
}
case msg: StopYarnSession => {
log.info("Stop yarn session.")
jobManager foreach {
_ forward msg
}
......@@ -136,7 +137,7 @@ class ApplicationClient(appId: ApplicationId, address: String, yarnClient: YarnC
""".stripMargin)
}
def writeYarnProperties(): Unit = {
def writeYarnProperties(address: String): Unit = {
val yarnProps = new Properties()
yarnProps.setProperty(CliFrontend.YARN_PROPERTIES_JOBMANAGER_KEY, address)
......
......@@ -37,6 +37,9 @@ object ApplicationMaster{
val LOG = LoggerFactory.getLogger(this.getClass)
val CONF_FILE = "flink-conf.yaml"
val MODIFIED_CONF_FILE = "flink-conf-modified.yaml"
def main(args: Array[String]): Unit ={
val yarnClientUsername = System.getenv(Client.ENV_CLIENT_USERNAME)
LOG.info(s"YARN daemon runs as ${UserGroupInformation.getCurrentUser.getShortUserName} " +
......@@ -90,12 +93,14 @@ object ApplicationMaster{
actorSystem = system
jobManager = actor
LOG.info("Start yarn session on job manager.")
jobManager ! StartYarnSession(conf)
LOG.info("Await termination of actor system.")
actorSystem.awaitTermination()
}catch{
case t: Throwable =>
LOG.error("Error while running the application master.")
LOG.error("Error while running the application master.", t)
if(actorSystem != null){
actorSystem.shutdown()
......@@ -115,13 +120,13 @@ object ApplicationMaster{
jobManagerWebPort: Int, logDirs: String, slots: Int,
taskManagerCount: Int, dynamicPropertiesEncodedString: String)
: Unit = {
val output = new PrintWriter(new BufferedWriter(new FileWriter(s"$currDir/flink-conf-modified" +
s".yaml")))
LOG.info("Generate configuration file for application master.")
val output = new PrintWriter(new BufferedWriter(
new FileWriter(s"$currDir/$MODIFIED_CONF_FILE"))
)
val exclusions = Set(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY,
ConfigConstants.JOB_MANAGER_WEB_ROOT_PATH_KEY)
for (line <- Source.fromFile(s"$currDir/flink-conf.yaml").getLines() if !(exclusions exists
{line.contains(_)})) {
for (line <- Source.fromFile(s"$currDir/$CONF_FILE").getLines() if !(line.contains
(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY))) {
output.println(line)
}
......@@ -149,15 +154,17 @@ object ApplicationMaster{
}
def startJobManager(currDir: String): (ActorSystem, ActorRef) = {
val pathToConfig = s"$currDir/flink-conf.modified.yaml"
LOG.info("Start job manager for yarn")
val pathToConfig = s"$currDir/$MODIFIED_CONF_FILE"
val args = Array[String]("--configDir", pathToConfig)
val (hostname, port, configuration) = JobManager.initialize(args)
LOG.info(s"Config path: ${pathToConfig}.")
val (hostname, port, configuration) = JobManager.parseArgs(args)
implicit val jobManagerSystem = YarnUtils.createActorSystem(hostname, port, configuration)
LOG.info("Start job manager actor.")
(jobManagerSystem, JobManager.startActor(Props(new JobManager(configuration) with
WithWebServer with YarnMaster)))
WithWebServer with YarnJobManager)))
}
}
......@@ -27,7 +27,6 @@ object Messages {
case class YarnMessage(message: String, date: Date = new Date())
case class ApplicationMasterStatus(numTaskManagers: Int, numSlots: Int)
case object RegisterMessageListener
case object WaitForJobTermination
case class StopYarnSession(status: FinalApplicationStatus)
case class StartYarnSession(configuration: Configuration)
......
......@@ -42,7 +42,7 @@ import org.apache.hadoop.yarn.util.Records
import scala.concurrent.duration._
trait YarnMaster extends ActorLogMessages {
trait YarnJobManager extends ActorLogMessages {
that: JobManager =>
import context._
......@@ -59,16 +59,20 @@ trait YarnMaster extends ActorLogMessages {
var allocatedContainers = 0
var completedContainers = 0
var numTaskManager = 0
var actorState = startYarnSessionMessages
abstract override def receiveWithLogMessages: Receive = {
actorState orElse receiveYarnMessages orElse super.receiveWithLogMessages
receiveYarnMessages orElse super.receiveWithLogMessages
}
def receiveYarnMessages: Receive = {
case StopYarnSession(status) =>
log.info("Stopping Yarn JobManager.")
log.info("Stopping Yarn Session.")
instanceManager.getAllRegisteredInstances foreach {
instance =>
instance.getTaskManager ! StopYarnSession(status)
}
rmClientOption foreach {
rmClient =>
......@@ -84,22 +88,22 @@ trait YarnMaster extends ActorLogMessages {
nmClientOption = None
self ! PoisonPill
log.info("Stopped Yarn JobManager.")
context.system.shutdown()
case RegisterMessageListener =>
messageListener = Some(sender())
}
def startYarnSessionMessages: Receive = {
case StartYarnSession(conf) => {
log.info("Start yarn session.")
val memoryPerTaskManager = env.get(Client.ENV_TM_MEMORY).toInt
val heapLimit = Utils.calculateHeapSize(memoryPerTaskManager)
val applicationMasterHost = env.get(Environment.NM_HOST.key)
require(applicationMasterHost != null, s"Application master (${Environment.NM_HOST} not set.")
numTaskManager = env.get(Client.ENV_TM_COUNT).toInt
log.info(s"Requesting ${numTaskManager} task managers.")
val coresPerTaskManager = env.get(Client.ENV_TM_CORES).toInt
val remoteFlinkJarPath = env.get(Client.FLINK_JAR_PATH)
val fs = FileSystem.get(conf)
......@@ -185,25 +189,20 @@ trait YarnMaster extends ActorLogMessages {
yarnClientUsername, conf, taskManagerLocalResources))
context.system.scheduler.scheduleOnce(ALLOCATION_DELAY, self, PollContainerCompletion)
actorState = pollContainerCompletionMessages
}
}
def pollContainerCompletionMessages: Receive = {
case PollContainerCompletion => {
rmClientOption match {
case Some(rmClient) => {
val response = rmClient.allocate(completedContainers.toFloat / numTaskManager)
for (container <- response.getAllocatedContainers) {
log.info(s"Got new container for TM ${container.getId} on host ${
container.getNodeId
.getHost
}")
container.getNodeId.getHost}")
allocatedContainers += 1
log.info(s"Launching container $allocatedContainers.")
log.info(s"Launching container #$allocatedContainers.")
nmClientOption match {
case Some(nmClient) => {
containerLaunchContext match {
......@@ -219,6 +218,7 @@ trait YarnMaster extends ActorLogMessages {
self ! StopYarnSession(FinalApplicationStatus.FAILED)
}
}
}
for (status <- response.getCompletedContainersStatuses) {
completedContainers += 1
......@@ -240,7 +240,6 @@ trait YarnMaster extends ActorLogMessages {
self ! StopYarnSession(FinalApplicationStatus.FAILED)
}
}
}
case None => {
log.error("The AMRMClient was not set.")
self ! StopYarnSession(FinalApplicationStatus.FAILED)
......@@ -253,10 +252,11 @@ trait YarnMaster extends ActorLogMessages {
yarnClientUsername: String, yarnConf: Configuration,
taskManagerLocalResources: Map[String, LocalResource]):
ContainerLaunchContext = {
log.info("Create container launch context.")
val ctx = Records.newRecord(classOf[ContainerLaunchContext])
val javaOpts = configuration.getString(ConfigConstants.FLINK_JVM_OPTIONS, "")
val tmCommand = new StringBuilder(s"$$JAVA_HOME/bin/java -Xmx ${heapLimit}m $javaOpts")
val tmCommand = new StringBuilder(s"$$JAVA_HOME/bin/java -Xmx${heapLimit}m $javaOpts")
if (hasLogback || hasLog4j) {
tmCommand ++=
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.yarn
import org.apache.flink.runtime.ActorLogMessages
import org.apache.flink.runtime.taskmanager.TaskManager
import org.apache.flink.yarn.Messages.StopYarnSession
trait YarnTaskManager extends ActorLogMessages {
that: TaskManager =>
abstract override def receiveWithLogMessages: Receive = {
receiveYarnMessages orElse super.receiveWithLogMessages
}
def receiveYarnMessages: Receive = {
case StopYarnSession(status) => {
context.system.shutdown()
}
}
}
......@@ -18,10 +18,11 @@
package org.apache.flink.yarn
import akka.actor.ActorSystem
import akka.actor.{Props, ActorRef, ActorSystem}
import com.typesafe.config.ConfigFactory
import org.apache.flink.configuration.Configuration
import org.apache.flink.runtime.akka.AkkaUtils
import org.apache.flink.runtime.taskmanager.TaskManager
object YarnUtils {
def createActorSystem(hostname: String, port: Int, configuration: Configuration): ActorSystem = {
......@@ -31,7 +32,28 @@ object YarnUtils {
AkkaUtils.createActorSystem(akkaConfig)
}
def createActorSystem: ActorSystem = {
val akkaConfig = ConfigFactory.parseString(AkkaUtils.getDefaultActorSystemConfigString +
getConfigString)
AkkaUtils.createActorSystem(akkaConfig)
}
def getConfigString: String = {
s"""""".stripMargin
s"""akka.loglevel = "INFO"
|akka.stdout-loglevel = "INFO"
|""".stripMargin
}
def startActorSystemAndTaskManager(args: Array[String]): (ActorSystem, ActorRef) = {
val (hostname, port, config) = TaskManager.parseArgs(args)
val actorSystem = createActorSystem(hostname, port, config)
val (connectionInfo, jobManagerURL, taskManagerConfig, networkConnectionConfiguration) =
TaskManager.parseConfiguration(hostname, config, false)
(actorSystem, TaskManager.startActor(Props(new TaskManager(connectionInfo, jobManagerURL,
taskManagerConfig, networkConnectionConfiguration) with YarnTaskManager))(actorSystem))
}
}
......@@ -162,6 +162,7 @@ under the License.
<build>
<plugins>
<plugin>
<!--Build uber jar-->
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.3</version>
......@@ -181,6 +182,16 @@ under the License.
<exclude>org.apache.flink:flink-streaming-examples</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<artifact>org.apache.flink:flink-clients</artifact>
<excludes>
<exclude>
web-docs/**
</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
......@@ -201,33 +212,6 @@ under the License.
<artifactId>maven-assembly-plugin</artifactId>
<version>2.4</version>
<executions>
<!--&lt;!&ndash; Uber-jar &ndash;&gt;-->
<!--<execution>-->
<!--<id>uber-jar</id>-->
<!--<phase>package</phase>-->
<!--<goals>-->
<!--<goal>single</goal>-->
<!--</goals>-->
<!--<configuration>-->
<!--<archiverConfig>-->
<!--&lt;!&ndash; https://jira.codehaus.org/browse/MASSEMBLY-449 &ndash;&gt;-->
<!--<fileMode>420</fileMode> &lt;!&ndash; 420(dec) = 644(oct) &ndash;&gt;-->
<!--<directoryMode>493</directoryMode> &lt;!&ndash; 493(dec) = 755(oct) &ndash;&gt;-->
<!--<defaultDirectoryMode>493</defaultDirectoryMode>-->
<!--</archiverConfig>-->
<!--<archive>-->
<!--<manifest>-->
<!--<mainClass>org.apache.flink.yarn.Client</mainClass>-->
<!--<addDefaultSpecificationEntries>true</addDefaultSpecificationEntries>-->
<!--<addDefaultImplementationEntries>true</addDefaultImplementationEntries>-->
<!--</manifest>-->
<!--</archive>-->
<!--<descriptors>-->
<!--<descriptor>src/main/assemblies/yarn-uberjar.xml</descriptor>-->
<!--</descriptors>-->
<!--</configuration>-->
<!--</execution>-->
<!-- yarn bin directory -->
<execution>
<id>yarn-bin</id>
......
......@@ -50,8 +50,15 @@ under the License.
</dependencySet>
</dependencySets>
<fileSets>
<files>
<file>
<source>src/main/resources/flink-conf.yaml</source>
<outputDirectory>conf</outputDirectory>
<fileMode>0644</fileMode>
</file>
</files>
<fileSets>
<fileSet>
<!-- copy start scripts -->
<directory>src/main/flink-bin/bin</directory>
......
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
<!-- Note: This file has been copied and adapted from: http://maven.apache.org/plugins/maven-assembly-plugin/descriptor-refs.html#jar-with-dependencies -->
<id>yarn-uberjar</id>
<formats>
<format>jar</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<dependencySets>
<dependencySet>
<outputDirectory>/</outputDirectory>
<useProjectArtifact>true</useProjectArtifact>
<unpack>true</unpack>
<scope>runtime</scope>
<excludes>
<exclude>org.apache.flink:flink-java-examples:*</exclude>
<exclude>org.apache.flink:flink-scala-examples:*</exclude>
<exclude>org.apache.flink:flink-streaming-examples:*</exclude>
</excludes>
</dependencySet>
</dependencySets>
<files>
<!-- copy default configuration -->
<file>
<source>src/main/flink-bin/conf/flink-conf.yaml</source>
<outputDirectory>/</outputDirectory>
<fileMode>0644</fileMode>
</file>
</files>
<fileSets>
<!-- copy files for Jobmanager web frontend -->
<fileSet>
<directory>../flink-runtime/resources</directory>
<outputDirectory>resources</outputDirectory>
<fileMode>0644</fileMode>
<excludes>
<exclude>*etc/users</exclude>
</excludes>
</fileSet>
</fileSets>
</assembly>
......@@ -111,7 +111,7 @@ under the License.
<files>
<!-- copy default configuration -->
<file>
<source>src/main/flink-bin/conf/flink-conf.yaml</source>
<source>src/main/resources/flink-conf.yaml</source>
<outputDirectory>conf</outputDirectory>
<fileMode>0644</fileMode>
</file>
......
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
#==============================================================================
# Common
#==============================================================================
jobmanager.rpc.address: localhost
jobmanager.rpc.port: 6123
jobmanager.heap.mb: 256
taskmanager.heap.mb: 512
taskmanager.numberOfTaskSlots: -1
parallelization.degree.default: 1
#==============================================================================
# Web Frontend
#==============================================================================
jobmanager.web.port: 8081
webclient.port: 8080
#==============================================================================
# Advanced
#==============================================================================
# The number of buffers for the network stack.
#
# taskmanager.network.numberOfBuffers: 2048
# Directories for temporary files.
#
# Add a delimited list for multiple directories, using the system directory
# delimiter (colon ':' on unix) or a comma, e.g.:
# /data1/tmp:/data2/tmp:/data3/tmp
#
# Note: Each directory entry is read from and written to by a different I/O
# thread. You can include the same directory multiple times in order to create
# multiple I/O threads against that directory. This is for example relevant for
# high-throughput RAIDs.
#
# If not specified, the system-specific Java temporary directory (java.io.tmpdir
# property) is taken.
#
# taskmanager.tmp.dirs: /tmp
# Path to the Hadoop configuration directory.
#
# This configuration is used when writing into HDFS. Unless specified otherwise,
# HDFS file creation will use HDFS default settings with respect to block-size,
# replication factor, etc.
#
# You can also directly specify the paths to hdfs-default.xml and hdfs-site.xml
# via keys 'fs.hdfs.hdfsdefault' and 'fs.hdfs.hdfssite'.
#
# fs.hdfs.hadoopconf: /path/to/hadoop/conf/
......@@ -38,6 +38,8 @@ object AkkaUtils {
implicit val AWAIT_DURATION: FiniteDuration = 1 minute
implicit val FUTURE_DURATION: FiniteDuration = 1 minute
val INF_TIMEOUT = 21474835 seconds
var globalExecutionContext: ExecutionContext = ExecutionContext.global
def createActorSystem(host: String, port: Int, configuration: Configuration): ActorSystem = {
......@@ -110,7 +112,7 @@ object AkkaUtils {
|akka.loglevel = "WARNING"
|akka.logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
|akka.stdout-loglevel = "WARNING"
|akka.jvm-exit-on-fata-error = off
|akka.jvm-exit-on-fatal-error = off
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.transport-class = "akka.remote.transport.netty.NettyTransport"
|akka.remote.netty.tcp.tcp-nodelay = on
......@@ -154,6 +156,11 @@ object AkkaUtils {
Await.result(future, duration).asInstanceOf[T]
}
def askInf[T](actor: ActorRef, msg: Any): T = {
val future = Patterns.ask(actor, msg, INF_TIMEOUT)
Await.result(future, INF_TIMEOUT).asInstanceOf[T]
}
def retry[T](body: => T, tries: Int)(implicit executionContext: ExecutionContext): Future[T] = {
Future{ body }.recoverWith{
case t:Throwable =>
......
......@@ -115,10 +115,8 @@ object JobClient{
@throws(classOf[JobExecutionException])
def submitJobAndWait(jobGraph: JobGraph, listen: Boolean,
jobClient: ActorRef): JobExecutionResult = {
import AkkaUtils.FUTURE_TIMEOUT
val response = jobClient ? SubmitJobAndWait(jobGraph, listenToEvents = listen)
Await.result(response.mapTo[JobExecutionResult],Duration.Inf)
AkkaUtils.askInf[JobExecutionResult](jobClient,
SubmitJobAndWait(jobGraph, listenToEvents = listen))
}
......
......@@ -54,6 +54,8 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala {
import context._
import AkkaUtils.FUTURE_TIMEOUT
log.info("Starting job manager.")
val (archiveCount, profiling, cleanupInterval) = JobManager.parseConfiguration(configuration)
// Props for the profiler actor
......@@ -390,7 +392,7 @@ object JobManager {
val PROFILER_NAME = "profiler"
def main(args: Array[String]): Unit = {
val (hostname, port, configuration) = initialize(args)
val (hostname, port, configuration) = parseArgs(args)
val jobManagerSystem = AkkaUtils.createActorSystem(hostname, port, configuration)
......@@ -398,7 +400,7 @@ object JobManager {
jobManagerSystem.awaitTermination()
}
def initialize(args: Array[String]): (String, Int, Configuration) = {
def parseArgs(args: Array[String]): (String, Int, Configuration) = {
val parser = new scopt.OptionParser[JobManagerCLIConfiguration]("jobmanager") {
head("flink jobmanager")
opt[String]("configDir") action { (x, c) => c.copy(configDir = x) } text ("Specify " +
......
......@@ -28,7 +28,10 @@ trait WithWebServer extends Actor {
webServer.start()
abstract override def postStop(): Unit = {
that.postStop()
log.info("Stopping webserver.")
webServer.stop()
log.info("Stopped webserver.")
super.postStop()
}
}
......@@ -72,6 +72,8 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
import AkkaUtils.FUTURE_TIMEOUT
import taskManagerConfig._
log.info(s"Starting task manager at ${self.path}.")
val REGISTRATION_DELAY = 0 seconds
val REGISTRATION_INTERVAL = 10 seconds
val MAX_REGISTRATION_ATTEMPTS = 10
......@@ -440,13 +442,14 @@ object TaskManager {
val PROFILER_NAME = "profiler"
def main(args: Array[String]): Unit = {
val (hostname, port, configuration) = initialize(args)
val (hostname, port, configuration) = parseArgs(args)
val (taskManagerSystem, _) = startActorSystemAndActor(hostname, port, configuration)
taskManagerSystem.awaitTermination()
}
private def initialize(args: Array[String]): (String, Int, Configuration) = {
def parseArgs(args: Array[String]): (String, Int, Configuration) = {
val parser = new scopt.OptionParser[TaskManagerCLIConfiguration]("taskmanager") {
head("flink task manager")
opt[String]("configDir") action { (x, c) =>
......@@ -483,6 +486,7 @@ object TaskManager {
(hostname, port, configuration)
} getOrElse {
LOG.error(s"TaskManager parseArgs called with ${args.mkString(" ")}.")
LOG.error("CLI parsing failed. Usage: " + parser.usage)
sys.exit(FAILURE_RETURN_CODE)
}
......@@ -491,8 +495,12 @@ object TaskManager {
def startActorSystemAndActor(hostname: String, port: Int, configuration: Configuration,
localExecution: Boolean = false): (ActorSystem, ActorRef) = {
implicit val actorSystem = AkkaUtils.createActorSystem(hostname, port, configuration)
(actorSystem, (startActor _).tupled(parseConfiguration(hostname, configuration,
localExecution)))
val (connectionInfo, jobManagerURL, taskManagerConfig, networkConnectionConfiguration) =
parseConfiguration(hostname, configuration, localExecution)
(actorSystem, startActor(connectionInfo, jobManagerURL, taskManagerConfig,
networkConnectionConfiguration))
}
def parseConfiguration(hostname: String, configuration: Configuration,
......@@ -600,14 +608,21 @@ object TaskManager {
taskManagerConfig: TaskManagerConfiguration,
networkConnectionConfiguration: NetworkConnectionConfiguration)
(implicit actorSystem: ActorSystem): ActorRef = {
actorSystem.actorOf(Props(classOf[TaskManager], connectionInfo, jobManagerURL,
taskManagerConfig, networkConnectionConfiguration), TASK_MANAGER_NAME);
startActor(Props(new TaskManager(connectionInfo, jobManagerURL, taskManagerConfig,
networkConnectionConfiguration)))
}
def startActor(props: Props)(implicit actorSystem: ActorSystem): ActorRef = {
actorSystem.actorOf(props, TASK_MANAGER_NAME)
}
def startActorWithConfiguration(hostname: String, configuration: Configuration,
localExecution: Boolean = false)
(implicit system: ActorSystem) = {
(startActor _).tupled(parseConfiguration(hostname, configuration, localExecution))
val (connectionInfo, jobManagerURL, taskManagerConfig, networkConnectionConfiguration) =
parseConfiguration(hostname, configuration, localExecution)
startActor(connectionInfo, jobManagerURL, taskManagerConfig, networkConnectionConfiguration)
}
def startProfiler(instancePath: String, reportInterval: Long)(implicit system: ActorSystem):
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册