提交 5a2ca819 编写于 作者: T Till Rohrmann 提交者: Stephan Ewen

[FLINK-1923] [runtime] Replaces asynchronous logging with synchronous logging...

[FLINK-1923] [runtime] Replaces asynchronous logging with synchronous logging using grizzled-slf4j wrapper for Scala.

This closes #628
上级 ccd574a4
......@@ -308,6 +308,7 @@ BSD-style licenses:
- Scala Compiler Reflect (http://www.scala-lang.org/) - Copyright (c) 2002-2014 EPFL, Copyright (c) 2011-2014 Typesafe, Inc.
- Scala Quasiquotes (http://scalamacros.org/) - Copyright (c) 2002-2014 EPFL, Copyright (c) 2011-2014 Typesafe, Inc.
- ASM (http://asm.ow2.org/) - Copyright (c) 2000-2011 INRIA, France Telecom
- Grizzled SLF4J (http://software.clapper.org/grizzled-slf4j/) - Copyright (c) 2010 Brian M. Clapper
(Below is the 3-clause BSD license)
......
......@@ -147,6 +147,11 @@ under the License.
<artifactId>akka-testkit_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.clapper</groupId>
<artifactId>grizzled-slf4j_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.binary.version}</artifactId>
......
......@@ -30,7 +30,6 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.memorymanager.MemoryManager;
import org.apache.flink.runtime.messages.ExecutionGraphMessages;
import org.apache.flink.runtime.messages.TaskMessages;
import org.apache.flink.runtime.messages.TaskMessages.UnregisterTask;
import org.apache.flink.runtime.profiling.TaskManagerProfiler;
import org.apache.flink.util.ExceptionUtils;
......@@ -357,16 +356,6 @@ public class Task {
taskManager.tell(new UnregisterTask(executionId), ActorRef.noSender());
}
protected void notifyExecutionStateChange(ExecutionState executionState,
Throwable optionalError) {
LOG.info("Update execution state of {} ({}) to {}.", this.getTaskName(),
this.getExecutionId(), executionState);
taskManager.tell(new TaskMessages.UpdateTaskExecutionState(
new TaskExecutionState(jobId, executionId, executionState, optionalError)),
ActorRef.noSender());
}
// -----------------------------------------------------------------------------------------------------------------
// Task Profiling
// -----------------------------------------------------------------------------------------------------------------
......
......@@ -19,13 +19,12 @@
package org.apache.flink.runtime
import _root_.akka.actor.Actor
import _root_.akka.event.LoggingAdapter
/**
* Mixin to add debug message logging
*/
trait ActorLogMessages {
that: Actor =>
that: Actor with ActorSynchronousLogging =>
override def receive: Receive = new Actor.Receive {
private val _receiveWithLogMessages = receiveWithLogMessages
......@@ -50,6 +49,4 @@ trait ActorLogMessages {
}
def receiveWithLogMessages: Receive
protected def log: LoggingAdapter
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime
import _root_.akka.actor.Actor
import grizzled.slf4j.Logger
/** Adds a logger to an [[akka.actor.Actor]] implementation
*
*/
trait ActorSynchronousLogging {
self: Actor =>
lazy val log = Logger(getClass)
}
......@@ -23,6 +23,7 @@ import java.net.InetSocketAddress
import java.util.Collections
import akka.actor.Status.{Success, Failure}
import grizzled.slf4j.Logger
import org.apache.flink.api.common.{JobID, ExecutionConfig}
import org.apache.flink.configuration.{ConfigConstants, GlobalConfiguration, Configuration}
import org.apache.flink.core.io.InputSplitAssigner
......@@ -42,7 +43,7 @@ import org.apache.flink.runtime.security.SecurityUtils
import org.apache.flink.runtime.security.SecurityUtils.FlinkSecuredRunner
import org.apache.flink.runtime.taskmanager.TaskManager
import org.apache.flink.runtime.util.{SerializedValue, EnvironmentInformation}
import org.apache.flink.runtime.ActorLogMessages
import org.apache.flink.runtime.{ActorSynchronousLogging, ActorLogMessages}
import org.apache.flink.runtime.akka.AkkaUtils
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
import org.apache.flink.runtime.instance.InstanceManager
......@@ -101,10 +102,7 @@ class JobManager(val flinkConfiguration: Configuration,
val defaultExecutionRetries: Int,
val delayBetweenRetries: Long,
val timeout: FiniteDuration)
extends Actor with ActorLogMessages with ActorLogging {
/** Reference to the log, for debugging */
val LOG = JobManager.LOG
extends Actor with ActorLogMessages with ActorSynchronousLogging {
/** List of current jobs running jobs */
val currentJobs = scala.collection.mutable.HashMap[JobID, (ExecutionGraph, JobInfo)]()
......@@ -114,7 +112,7 @@ class JobManager(val flinkConfiguration: Configuration,
* Run when the job manager is started. Simply logs an informational message.
*/
override def preStart(): Unit = {
LOG.info(s"Starting JobManager at ${self.path.toSerializationFormat}.")
log.info(s"Starting JobManager at ${self.path.toSerializationFormat}.")
}
override def postStop(): Unit = {
......@@ -138,12 +136,11 @@ class JobManager(val flinkConfiguration: Configuration,
try {
libraryCacheManager.shutdown()
} catch {
case e: IOException => log.error(e, "Could not properly shutdown the library cache manager.")
case e: IOException => log.error("Could not properly shutdown the library cache manager.", e)
}
if (log.isDebugEnabled) {
log.debug("Job manager {} is completely stopped.", self.path)
}
log.debug(s"Job manager ${self.path} is completely stopped.")
}
/**
......@@ -179,7 +176,7 @@ class JobManager(val flinkConfiguration: Configuration,
// registerTaskManager throws an IllegalStateException if it is already shut down
// let the actor crash and restart itself in this case
case e: Exception =>
log.error(e, "Failed to register TaskManager at instance manager")
log.error("Failed to register TaskManager at instance manager", e)
// IMPORTANT: Send the response to the "sender", which is not the
// TaskManager actor, but the ask future!
......@@ -197,7 +194,7 @@ class JobManager(val flinkConfiguration: Configuration,
submitJob(jobGraph, listenToEvents = listen)
case CancelJob(jobID) =>
log.info("Trying to cancel job with ID {}.", jobID)
log.info(s"Trying to cancel job with ID $jobID.")
currentJobs.get(jobID) match {
case Some((executionGraph, _)) =>
......@@ -208,7 +205,7 @@ class JobManager(val flinkConfiguration: Configuration,
sender ! CancellationSuccess(jobID)
case None =>
log.info("No job found with ID {}.", jobID)
log.info(s"No job found with ID $jobID.")
sender ! CancellationFailure(jobID, new IllegalArgumentException("No job found with " +
s"ID $jobID."))
}
......@@ -227,8 +224,9 @@ class JobManager(val flinkConfiguration: Configuration,
}(context.dispatcher)
sender ! true
case None => log.error("Cannot find execution graph for ID {} to change state to {}.",
taskExecutionState.getJobID, taskExecutionState.getExecutionState)
case None => log.error("Cannot find execution graph for ID " +
s"${taskExecutionState.getJobID} to change state to " +
s"${taskExecutionState.getExecutionState}.")
sender ! false
}
}
......@@ -239,7 +237,7 @@ class JobManager(val flinkConfiguration: Configuration,
val execution = executionGraph.getRegisteredExecutions.get(executionAttempt)
if (execution == null) {
log.error("Can not find Execution for attempt {}.", executionAttempt)
log.error(s"Can not find Execution for attempt $executionAttempt.")
null
} else {
val slot = execution.getAssignedResource
......@@ -256,32 +254,30 @@ class JobManager(val flinkConfiguration: Configuration,
case splitAssigner: InputSplitAssigner =>
val nextInputSplit = splitAssigner.getNextInputSplit(host, taskId)
if (log.isDebugEnabled) {
log.debug("Send next input split {}.", nextInputSplit)
}
log.debug(s"Send next input split $nextInputSplit.")
try {
InstantiationUtil.serializeObject(nextInputSplit)
} catch {
case ex: Exception =>
log.error(ex, "Could not serialize the next input split of class {}.",
nextInputSplit.getClass)
log.error(s"Could not serialize the next input split of " +
s"class ${nextInputSplit.getClass}.", ex)
vertex.fail(new RuntimeException("Could not serialize the next input split " +
"of class " + nextInputSplit.getClass + ".", ex))
null
}
case _ =>
log.error("No InputSplitAssigner for vertex ID {}.", vertexID)
log.error(s"No InputSplitAssigner for vertex ID $vertexID.")
null
}
case _ =>
log.error("Cannot find execution vertex for vertex ID {}.", vertexID)
log.error(s"Cannot find execution vertex for vertex ID $vertexID.")
null
}
}
case None =>
log.error("Cannot find execution graph for job ID {}.", jobID)
log.error(s"Cannot find execution graph for job ID $jobID.")
null
}
......@@ -290,9 +286,8 @@ class JobManager(val flinkConfiguration: Configuration,
case JobStatusChanged(jobID, newJobStatus, timeStamp, error) =>
currentJobs.get(jobID) match {
case Some((executionGraph, jobInfo)) => executionGraph.getJobName
log.info("Status of job {} ({}) changed to {} {}.",
jobID, executionGraph.getJobName, newJobStatus,
if (error == null) "" else error.getMessage)
log.info(s"Status of job $jobID (${executionGraph.getJobName}) changed to $newJobStatus" +
s" ${if (error == null) "" else error.getMessage}.")
if (newJobStatus.isTerminalState) {
jobInfo.end = timeStamp
......@@ -304,7 +299,7 @@ class JobManager(val flinkConfiguration: Configuration,
accumulatorManager.getJobAccumulatorResultsSerialized(jobID)
} catch {
case e: Exception =>
log.error(e, "Cannot fetch serialized accumulators for job {}", jobID)
log.error(s"Cannot fetch serialized accumulators for job $jobID", e)
Collections.emptyMap()
}
val result = new SerializedJobExecutionResult(jobID, jobInfo.duration,
......@@ -352,8 +347,8 @@ class JobManager(val flinkConfiguration: Configuration,
sender ! Acknowledge
executionGraph.scheduleOrUpdateConsumers(partitionId)
case None =>
log.error("Cannot find execution graph for job ID {} to schedule or update consumers",
jobId)
log.error(s"Cannot find execution graph for job ID $jobId to schedule or update " +
s"consumers.")
sender ! Failure(new IllegalStateException("Cannot find execution graph for job ID " +
s"$jobId to schedule or update consumers."))
}
......@@ -383,7 +378,7 @@ class JobManager(val flinkConfiguration: Configuration,
sender ! RunningJobsStatus(jobs)
}
catch {
case t: Throwable => LOG.error("Exception while responding to RequestRunningJobsStatus", t)
case t: Throwable => log.error("Exception while responding to RequestRunningJobsStatus", t)
}
case RequestJob(jobID) =>
......@@ -403,10 +398,10 @@ class JobManager(val flinkConfiguration: Configuration,
case Heartbeat(instanceID, metricsReport) =>
try {
log.debug("Received hearbeat message from {}", instanceID)
log.debug(s"Received hearbeat message from $instanceID.")
instanceManager.reportHeartBeat(instanceID, metricsReport)
} catch {
case t: Throwable => log.error(t, "Could not report heart beat from {}.", sender().path)
case t: Throwable => log.error(s"Could not report heart beat from ${sender().path}.", t)
}
case message: AccumulatorMessage => handleAccumulatorMessage(message)
......@@ -417,7 +412,7 @@ class JobManager(val flinkConfiguration: Configuration,
case Terminated(taskManager) =>
if (instanceManager.isRegistered(taskManager)) {
log.info("Task manager {} terminated.", taskManager.path)
log.info(s"Task manager ${taskManager.path} terminated.")
instanceManager.unregisterTaskManager(taskManager)
context.unwatch(taskManager)
......@@ -430,7 +425,7 @@ class JobManager(val flinkConfiguration: Configuration,
val taskManager = sender()
if (instanceManager.isRegistered(taskManager)) {
log.info("Task manager {} wants to disconnect, because {}.", taskManager.path, msg)
log.info(s"Task manager ${taskManager.path} wants to disconnect, because $msg.")
instanceManager.unregisterTaskManager(taskManager)
context.unwatch(taskManager)
......@@ -590,7 +585,7 @@ class JobManager(val flinkConfiguration: Configuration,
}
catch {
case tt: Throwable => {
log.error(tt, "Error while marking ExecutionGraph as failed.")
log.error("Error while marking ExecutionGraph as failed.", tt)
}
}
}
......@@ -618,7 +613,7 @@ class JobManager(val flinkConfiguration: Configuration,
libraryCacheManager.getClassLoader(jobId)
} catch {
case e: Exception =>
log.error(e, "Dropping accumulators. No class loader available for job " + jobId)
log.error("Dropping accumulators. No class loader available for job " + jobId, e)
return
}
......@@ -628,7 +623,7 @@ class JobManager(val flinkConfiguration: Configuration,
accumulatorManager.processIncomingAccumulators(jobId, accumulators)
}
catch {
case e: Exception => log.error(e, "Cannot update accumulators for job " + jobId)
case e: Exception => log.error("Cannot update accumulators for job " + jobId, e)
}
} else {
log.error("Dropping accumulators. No class loader available for job " + jobId)
......@@ -643,7 +638,7 @@ class JobManager(val flinkConfiguration: Configuration,
}
catch {
case e: Exception =>
log.error(e, "Cannot serialize accumulator result")
log.error("Cannot serialize accumulator result", e)
sender() ! AccumulatorResultsErroneous(jobID, e)
}
......@@ -656,7 +651,7 @@ class JobManager(val flinkConfiguration: Configuration,
}
catch {
case e: Exception =>
log.error(e, "Cannot fetch accumulator result")
log.error("Cannot fetch accumulator result", e)
sender() ! AccumulatorResultsErroneous(jobId, e)
}
......@@ -676,8 +671,8 @@ class JobManager(val flinkConfiguration: Configuration,
archive ! ArchiveExecutionGraph(jobID, eg)
} catch {
case t: Throwable => log.error(t, "Could not prepare the execution graph {} for " +
"archiving.", eg)
case t: Throwable => log.error(s"Could not prepare the execution graph $eg for " +
"archiving.", t)
}
case None =>
......@@ -687,7 +682,7 @@ class JobManager(val flinkConfiguration: Configuration,
libraryCacheManager.unregisterJob(jobID)
} catch {
case t: Throwable =>
log.error(t, "Could not properly unregister job {} form the library cache.", jobID)
log.error(s"Could not properly unregister job $jobID form the library cache.", t)
}
}
}
......@@ -699,7 +694,7 @@ class JobManager(val flinkConfiguration: Configuration,
*/
object JobManager {
val LOG = LoggerFactory.getLogger(classOf[JobManager])
val LOG = Logger(classOf[JobManager])
val STARTUP_FAILURE_RETURN_CODE = 1
val RUNTIME_FAILURE_RETURN_CODE = 2
......@@ -717,7 +712,7 @@ object JobManager {
*/
def main(args: Array[String]): Unit = {
// startup checks and logging
EnvironmentInformation.logEnvironmentInfo(LOG, "JobManager", args)
EnvironmentInformation.logEnvironmentInfo(LOG.logger, "JobManager", args)
EnvironmentInformation.checkJavaVersion()
// parsing the command line arguments
......@@ -798,7 +793,7 @@ object JobManager {
LOG.info("Starting JobManager")
// Bring up the job manager actor system first, bind it to the given address.
LOG.info("Starting JobManager actor system at {}:{}", listeningAddress, listeningPort)
LOG.info(s"Starting JobManager actor system at $listeningAddress:$listeningPort.")
val jobManagerSystem = try {
val akkaConfig = AkkaUtils.getAkkaConfig(configuration,
......@@ -831,7 +826,7 @@ object JobManager {
// the process reaper will kill the JVM process (to ensure easy failure detection)
LOG.debug("Starting JobManager process reaper")
jobManagerSystem.actorOf(
Props(classOf[ProcessReaper], jobManager, LOG, RUNTIME_FAILURE_RETURN_CODE),
Props(classOf[ProcessReaper], jobManager, LOG.logger, RUNTIME_FAILURE_RETURN_CODE),
"JobManager_Process_Reaper")
// bring up a local task manager, if needed
......@@ -847,7 +842,7 @@ object JobManager {
LOG.debug("Starting TaskManager process reaper")
jobManagerSystem.actorOf(
Props(classOf[ProcessReaper], taskManagerActor, LOG, RUNTIME_FAILURE_RETURN_CODE),
Props(classOf[ProcessReaper], taskManagerActor, LOG.logger, RUNTIME_FAILURE_RETURN_CODE),
"TaskManager_Process_Reaper")
}
......
......@@ -18,18 +18,21 @@
package org.apache.flink.runtime.jobmanager
import akka.actor.{ActorLogging, Actor}
import org.apache.flink.runtime.ActorLogMessages
import akka.actor.Actor
import org.apache.flink.runtime.{ActorSynchronousLogging, ActorLogMessages}
import org.apache.flink.runtime.messages.JobManagerProfilerMessages.ReportProfilingData
import org.apache.flink.runtime.profiling.impl.types.{InternalInstanceProfilingData, InternalExecutionVertexThreadProfilingData}
import org.apache.flink.runtime.profiling.types.ThreadProfilingEvent
import scala.collection.convert.WrapAsScala
/**
* Basic skeleton for the JobManager profiler. Currently, it simply logs the received messages.
*/
class JobManagerProfiler extends Actor with ActorLogMessages with ActorLogging with WrapAsScala {
class JobManagerProfiler
extends Actor
with ActorLogMessages
with ActorSynchronousLogging
with WrapAsScala {
override def receiveWithLogMessages: Receive = {
case ReportProfilingData(profilingContainer) =>
profilingContainer.getIterator foreach {
......
......@@ -18,9 +18,9 @@
package org.apache.flink.runtime.jobmanager
import akka.actor.{ActorLogging, Actor}
import akka.actor.Actor
import org.apache.flink.api.common.JobID
import org.apache.flink.runtime.ActorLogMessages
import org.apache.flink.runtime.{ActorSynchronousLogging, ActorLogMessages}
import org.apache.flink.runtime.executiongraph.ExecutionGraph
import org.apache.flink.runtime.messages.ArchiveMessages._
import org.apache.flink.runtime.messages.JobManagerMessages._
......@@ -48,8 +48,10 @@ import scala.ref.SoftReference
*
* @param max_entries Maximum number of stored Flink jobs
*/
class MemoryArchivist(private val max_entries: Int) extends Actor with ActorLogMessages with
ActorLogging {
class MemoryArchivist(private val max_entries: Int)
extends Actor
with ActorLogMessages
with ActorSynchronousLogging {
/*
* Map of execution graphs belonging to recently started jobs with the time stamp of the last
* received job event. The insert order is preserved through a LinkedHashMap.
......
......@@ -21,7 +21,7 @@ package org.apache.flink.runtime.jobmanager
import java.lang.{Long => JLong}
import akka.actor._
import org.apache.flink.runtime.ActorLogMessages
import org.apache.flink.runtime.{ActorSynchronousLogging, ActorLogMessages}
import org.apache.flink.runtime.execution.ExecutionState
import org.apache.flink.runtime.executiongraph.{ExecutionGraph, ExecutionVertex}
import org.apache.flink.runtime.jobgraph.JobStatus._
......@@ -69,7 +69,7 @@ class StreamCheckpointCoordinator(val executionGraph: ExecutionGraph,
val interval: FiniteDuration,
var curId: JLong,
var ackId: JLong)
extends Actor with ActorLogMessages with ActorLogging {
extends Actor with ActorLogMessages with ActorSynchronousLogging {
implicit private val executor = context.dispatcher
......@@ -78,13 +78,13 @@ extends Actor with ActorLogMessages with ActorLogging {
case InitBarrierScheduler =>
context.system.scheduler.schedule(interval,interval,self,BarrierTimeout)
context.system.scheduler.schedule(2 * interval,2 * interval,self,CompactAndUpdate)
log.info("Started Stream State Monitor for job {}{}",
executionGraph.getJobID,executionGraph.getJobName)
log.info("Started Stream State Monitor for job " +
s"${executionGraph.getJobID}${executionGraph.getJobName}")
case BarrierTimeout =>
executionGraph.getState match {
case FAILED | CANCELED | FINISHED =>
log.info("Stopping monitor for terminated job {}", executionGraph.getJobID)
log.info(s"Stopping monitor for terminated job ${executionGraph.getJobID}.")
self ! PoisonPill
case RUNNING =>
curId += 1
......@@ -94,8 +94,8 @@ extends Actor with ActorLogMessages with ActorLogging {
=> vertex.getCurrentAssignedResource.getInstance.getTaskManager
! BarrierReq(vertex.getCurrentExecutionAttempt.getAttemptId,curId))
case _ =>
log.debug("Omitting sending barrier since graph is in {} state for job {}",
executionGraph.getState, executionGraph.getJobID)
log.debug("Omitting sending barrier since graph is in " +
s"${executionGraph.getState} state for job ${executionGraph.getJobID}.")
}
case StateBarrierAck(jobID, jobVertexID, instanceID, checkpointID, opState) =>
......
......@@ -22,7 +22,7 @@ import java.lang.management.ManagementFactory
import java.util.concurrent.TimeUnit
import akka.actor.{Cancellable, ActorRef, Actor, ActorLogging}
import org.apache.flink.runtime.ActorLogMessages
import org.apache.flink.runtime.{ActorSynchronousLogging, ActorLogMessages}
import org.apache.flink.runtime.execution.{RuntimeEnvironment, ExecutionState}
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
import org.apache.flink.runtime.jobgraph.JobVertexID
......@@ -44,7 +44,7 @@ import scala.concurrent.duration.FiniteDuration
* @param reportInterval Interval of profiling action
*/
class TaskManagerProfiler(val instancePath: String, val reportInterval: Int)
extends Actor with ActorLogMessages with ActorLogging {
extends Actor with ActorLogMessages with ActorSynchronousLogging {
import context.dispatcher
......@@ -101,7 +101,7 @@ class TaskManagerProfiler(val instancePath: String, val reportInterval: Int)
Some(instanceProfiler.generateProfilingData(timestamp))
} catch {
case e: ProfilingException =>
log.error(e, "Error while retrieving instance profiling data.")
log.error("Error while retrieving instance profiling data.", e)
None
}
......@@ -133,7 +133,7 @@ class TaskManagerProfiler(val instancePath: String, val reportInterval: Int)
case _ =>
}
case None =>
log.warning(s"Could not find environment for execution id $executionID.")
log.warn(s"Could not find environment for execution id $executionID.")
}
}
......
......@@ -23,7 +23,7 @@ import java.net.InetSocketAddress
import akka.actor._
import akka.pattern.ask
import org.apache.flink.configuration.Configuration
import org.apache.flink.runtime.ActorLogMessages
import org.apache.flink.runtime.{ActorSynchronousLogging, ActorLogMessages}
import org.apache.flink.runtime.akka.AkkaUtils
import org.apache.flink.runtime.jobmanager.JobManager
import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus
......@@ -34,8 +34,10 @@ import scala.concurrent.duration._
import scala.language.postfixOps
import scala.util.{Failure, Success}
class ApplicationClient(flinkConfig: Configuration) extends Actor
with ActorLogMessages with ActorLogging {
class ApplicationClient(flinkConfig: Configuration)
extends Actor
with ActorLogMessages
with ActorSynchronousLogging {
import context._
val INITIAL_POLLING_DELAY = 0 seconds
......@@ -78,8 +80,8 @@ class ApplicationClient(flinkConfig: Configuration) extends Actor
jobManagerFuture.onComplete {
case Success(jm) => self ! JobManagerActorRef(jm)
case Failure(t) =>
log.error(t, "Registration at JobManager/ApplicationMaster failed. Shutting " +
"ApplicationClient down.")
log.error("Registration at JobManager/ApplicationMaster failed. Shutting " +
"ApplicationClient down.", t)
// we could not connect to the job manager --> poison ourselves
self ! PoisonPill
......@@ -93,7 +95,7 @@ class ApplicationClient(flinkConfig: Configuration) extends Actor
// sender as the Application Client (this class).
(jm ? RegisterClient(self))(timeout).onFailure{
case t: Throwable =>
log.error(t, "Could not register at the job manager.")
log.error("Could not register at the job manager.", t)
self ! PoisonPill
}
......@@ -144,7 +146,7 @@ class ApplicationClient(flinkConfig: Configuration) extends Actor
// ----------------- handle messages from the cluster -------------------
// receive remote messages
case msg: YarnMessage =>
log.debug("Received new YarnMessage {}. Now {} messages in queue", msg, messagesQueue.size)
log.debug(s"Received new YarnMessage $msg. Now ${messagesQueue.size} messages in queue")
messagesQueue.enqueue(msg)
// locally forward messages
......
......@@ -21,6 +21,7 @@ import java.io.{PrintWriter, FileWriter, BufferedWriter}
import java.security.PrivilegedAction
import akka.actor._
import grizzled.slf4j.Logger
import org.apache.flink.client.CliFrontend
import org.apache.flink.configuration.{GlobalConfiguration, Configuration, ConfigConstants}
import org.apache.flink.runtime.akka.AkkaUtils
......@@ -38,7 +39,7 @@ import scala.io.Source
object ApplicationMaster {
import scala.collection.JavaConversions._
val LOG = LoggerFactory.getLogger(this.getClass)
val LOG = Logger(getClass)
val CONF_FILE = "flink-conf.yaml"
val MODIFIED_CONF_FILE = "flink-conf-modified.yaml"
......@@ -50,9 +51,9 @@ object ApplicationMaster {
LOG.info(s"YARN daemon runs as ${UserGroupInformation.getCurrentUser.getShortUserName} " +
s"setting user to execute Flink ApplicationMaster/JobManager to ${yarnClientUsername}")
EnvironmentInformation.logEnvironmentInfo(LOG, "YARN ApplicationMaster/JobManager", args)
EnvironmentInformation.logEnvironmentInfo(LOG.logger, "YARN ApplicationMaster/JobManager", args)
EnvironmentInformation.checkJavaVersion()
org.apache.flink.runtime.util.SignalHandler.register(LOG)
org.apache.flink.runtime.util.SignalHandler.register(LOG.logger)
val ugi = UserGroupInformation.createRemoteUser(yarnClientUsername)
......@@ -206,7 +207,7 @@ object ApplicationMaster {
(Configuration, ActorSystem, ActorRef, ActorRef) = {
LOG.info("Starting JobManager for YARN")
LOG.info("Loading config from: {}", currDir)
LOG.info(s"Loading config from: $currDir.")
GlobalConfiguration.loadConfiguration(currDir)
val configuration = GlobalConfiguration.getConfiguration()
......
......@@ -98,7 +98,7 @@ trait ApplicationMasterActor extends ActorLogMessages {
def receiveYarnMessages: Receive = {
case StopYarnSession(status, diag) =>
log.info("Stopping YARN JobManager with status {} and diagnostic {}.", status, diag)
log.info(s"Stopping YARN JobManager with status $status and diagnostic $diag.")
instanceManager.getAllRegisteredInstances.asScala foreach {
instance =>
......@@ -108,11 +108,11 @@ trait ApplicationMasterActor extends ActorLogMessages {
rmClientOption foreach {
rmClient =>
Try(rmClient.unregisterApplicationMaster(status, diag, "")).recover{
case t: Throwable => log.error(t, "Could not unregister the application master.")
case t: Throwable => log.error("Could not unregister the application master.", t)
}
Try(rmClient.close()).recover{
case t:Throwable => log.error(t, "Could not close the AMRMClient.")
case t:Throwable => log.error("Could not close the AMRMClient.", t)
}
}
......@@ -121,7 +121,7 @@ trait ApplicationMasterActor extends ActorLogMessages {
nmClientOption foreach {
nmClient =>
Try(nmClient.close()).recover{
case t: Throwable => log.error(t, "Could not close the NMClient.")
case t: Throwable => log.error("Could not close the NMClient.", t)
}
}
......@@ -133,7 +133,7 @@ trait ApplicationMasterActor extends ActorLogMessages {
context.system.shutdown()
case RegisterClient(client) =>
log.info("Register {} as client.", client.path)
log.info(s"Register ${client.path} as client.")
messageListener = Some(client)
sender ! Acknowledge
......@@ -142,7 +142,7 @@ trait ApplicationMasterActor extends ActorLogMessages {
case msg: StopAMAfterJob =>
val jobId = msg.jobId
log.info("ApplicatonMaster will shut down YARN session when job {} has finished", jobId)
log.info(s"ApplicatonMaster will shut down YARN session when job $jobId has finished.")
stopWhenJobFinished = jobId
sender() ! Acknowledge
......@@ -155,26 +155,26 @@ trait ApplicationMasterActor extends ActorLogMessages {
startYarnSession(conf, actorSystemPort, webServerPort)
case jnf: JobNotFound =>
LOG.warn("Job with ID {} not found in JobManager", jnf.jobID)
log.warn(s"Job with ID ${jnf.jobID} not found in JobManager")
if(stopWhenJobFinished == null) {
LOG.warn("The ApplicationMaster didn't expect to receive this message")
log.warn("The ApplicationMaster didn't expect to receive this message")
}
case jobStatus: CurrentJobStatus =>
if(stopWhenJobFinished == null) {
LOG.warn("Received job status {} which wasn't requested", jobStatus)
log.warn(s"Received job status $jobStatus which wasn't requested.")
} else {
if(stopWhenJobFinished != jobStatus.jobID) {
LOG.warn(s"Received job status for job ${jobStatus.jobID} but expected status for " +
log.warn(s"Received job status for job ${jobStatus.jobID} but expected status for " +
s"job $stopWhenJobFinished")
} else {
if(jobStatus.status.isTerminalState) {
LOG.info(s"Job with ID ${jobStatus.jobID} is in terminal state ${jobStatus.status}. " +
log.info(s"Job with ID ${jobStatus.jobID} is in terminal state ${jobStatus.status}. " +
s"Shutting down YARN session")
self ! StopYarnSession(FinalApplicationStatus.SUCCEEDED,
s"The monitored job with ID ${jobStatus.jobID} has finished.")
} else {
LOG.debug(s"Monitored job with ID ${jobStatus.jobID} is in state ${jobStatus.status}")
log.debug(s"Monitored job with ID ${jobStatus.jobID} is in state ${jobStatus.status}")
}
}
}
......@@ -229,14 +229,14 @@ trait ApplicationMasterActor extends ActorLogMessages {
})
}
// return containers if the RM wants them and we haven't allocated them yet.
val preemtionMessage = response.getPreemptionMessage
if(preemtionMessage != null) {
log.info("Received preemtion message from YARN {}", preemtionMessage)
val contract = preemtionMessage.getContract
val preemptionMessage = response.getPreemptionMessage
if(preemptionMessage != null) {
log.info(s"Received preemtion message from YARN $preemptionMessage.")
val contract = preemptionMessage.getContract
if(contract != null) {
tryToReturnContainers(contract.getContainers.asScala)
}
val strictContract = preemtionMessage.getStrictContract
val strictContract = preemptionMessage.getStrictContract
if(strictContract != null) {
tryToReturnContainers(strictContract.getContainers.asScala)
}
......@@ -247,13 +247,13 @@ trait ApplicationMasterActor extends ActorLogMessages {
// check if we want to start some of our allocated containers.
if(runningContainers < numTaskManager) {
var missingContainers = numTaskManager - runningContainers
log.info("The user requested {} containers, {} running. {} containers missing",
numTaskManager, runningContainers, missingContainers)
log.info(s"The user requested $numTaskManager containers, $runningContainers " +
s"running. $missingContainers containers missing")
// not enough containers running
if(allocatedContainersList.size > 0) {
log.info("{} containers already allocated by YARN. Starting...",
allocatedContainersList.size)
log.info(s"${allocatedContainersList.size} containers already allocated by YARN. " +
"Starting...")
// we have some containers allocated to us --> start them
allocatedContainersList = allocatedContainersList.dropWhile(container => {
if (missingContainers <= 0) {
......@@ -279,7 +279,7 @@ trait ApplicationMasterActor extends ActorLogMessages {
}
} catch {
case e: YarnException =>
log.error(e, "Exception while starting YARN container")
log.error("Exception while starting YARN container", e)
}
}
case None =>
......@@ -305,24 +305,24 @@ trait ApplicationMasterActor extends ActorLogMessages {
log.info(s"There are $missingContainers containers missing." +
s" $numPendingRequests are already requested. " +
s"Requesting $toAllocateFromYarn additional container(s) from YARN. " +
s"Reallocation of failed containers is enabled=$reallocate ('{}')",
ConfigConstants.YARN_REALLOCATE_FAILED_CONTAINERS)
s"Reallocation of failed containers is enabled=$reallocate " +
s"('${ConfigConstants.YARN_REALLOCATE_FAILED_CONTAINERS}')")
// there are still containers missing. Request them from YARN
if(reallocate) {
for(i <- 1 to toAllocateFromYarn) {
val containerRequest = getContainerRequest(memoryPerTaskManager)
rmClient.addContainerRequest(containerRequest)
numPendingRequests += 1
log.info("Requested additional container from YARN. Pending requests {}",
numPendingRequests)
log.info("Requested additional container from YARN. Pending requests " +
s"$numPendingRequests.")
}
}
}
}
if(runningContainers >= numTaskManager && allocatedContainersList.size > 0) {
log.info("Flink has {} allocated containers which are not needed right now. " +
"Returning them", allocatedContainersList.size)
log.info(s"Flink has ${allocatedContainersList.size} allocated containers which " +
s"are not needed right now. Returning them")
for(container <- allocatedContainersList) {
rmClient.releaseAssignedContainer(container.getId)
}
......@@ -353,9 +353,9 @@ trait ApplicationMasterActor extends ActorLogMessages {
self ! StopYarnSession(FinalApplicationStatus.FAILED, "Fatal error in AM: AMRMClient " +
"was not set")
}
log.debug("Processed Heartbeat with RMClient. Running containers {}," +
"failed containers {}, allocated containers {}", runningContainers, failedContainers,
allocatedContainersList.size)
log.debug(s"Processed Heartbeat with RMClient. Running containers $runningContainers, " +
s"failed containers $failedContainers, " +
s"allocated containers ${allocatedContainersList.size}.")
}
private def runningContainerIds(): mutable.MutableList[ContainerId] = {
......@@ -381,17 +381,16 @@ trait ApplicationMasterActor extends ActorLogMessages {
YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS), MILLISECONDS)
if(YARN_HEARTBEAT_DELAY.gteq(yarnExpiryInterval)) {
log.warning("The heartbeat interval of the Flink Application master ({}) is greater than " +
"YARN's expiry interval ({}). The application is likely to be killed by YARN.",
YARN_HEARTBEAT_DELAY, yarnExpiryInterval)
log.warn(s"The heartbeat interval of the Flink Application master " +
s"($YARN_HEARTBEAT_DELAY) is greater than YARN's expiry interval " +
s"($yarnExpiryInterval). The application is likely to be killed by YARN.")
}
numTaskManager = env.get(FlinkYarnClient.ENV_TM_COUNT).toInt
maxFailedContainers = flinkConfiguration.
getInteger(ConfigConstants.YARN_MAX_FAILED_CONTAINERS, numTaskManager)
log.info("Requesting {} TaskManagers. Tolerating {} failed TaskManagers",
numTaskManager, maxFailedContainers)
log.info(s"Requesting $numTaskManager TaskManagers. Tolerating $maxFailedContainers failed " +
"TaskManagers")
val remoteFlinkJarPath = env.get(FlinkYarnClient.FLINK_JAR_PATH)
val fs = FileSystem.get(conf)
......@@ -468,7 +467,7 @@ trait ApplicationMasterActor extends ActorLogMessages {
context.system.scheduler.scheduleOnce(FAST_YARN_HEARTBEAT_DELAY, self, HeartbeatWithYarn)
} recover {
case t: Throwable =>
log.error(t, "Could not start yarn session.")
log.error("Could not start yarn session.", t)
self ! StopYarnSession(FinalApplicationStatus.FAILED,
s"ApplicationMaster failed while starting. Exception Message: ${t.getMessage}")
}
......@@ -479,7 +478,7 @@ trait ApplicationMasterActor extends ActorLogMessages {
allocatedContainersList = allocatedContainersList.dropWhile( container => {
val result = requestedBackContainers.getId.equals(container.getId)
if(result) {
log.info("Returning container {} back to ResourceManager.", container)
log.info(s"Returning container $container back to ResourceManager.")
}
result
})
......@@ -548,7 +547,7 @@ trait ApplicationMasterActor extends ActorLogMessages {
ctx.setTokens(securityTokens)
} catch {
case t: Throwable =>
log.error(t, "Getting current user info failed when trying to launch the container")
log.error("Getting current user info failed when trying to launch the container", t)
}
ctx
......
......@@ -274,6 +274,12 @@ under the License.
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.clapper</groupId>
<artifactId>grizzled-slf4j_${scala.binary.version}</artifactId>
<version>1.0.2</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_${scala.binary.version}</artifactId>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册