diff --git a/flink-dist/src/main/flink-bin/LICENSE b/flink-dist/src/main/flink-bin/LICENSE index d66922cd2085d8496426f22c486a89abb866a85b..12363818406f0a4d0542742fd26ac69e8fe90d70 100644 --- a/flink-dist/src/main/flink-bin/LICENSE +++ b/flink-dist/src/main/flink-bin/LICENSE @@ -308,6 +308,7 @@ BSD-style licenses: - Scala Compiler Reflect (http://www.scala-lang.org/) - Copyright (c) 2002-2014 EPFL, Copyright (c) 2011-2014 Typesafe, Inc. - Scala Quasiquotes (http://scalamacros.org/) - Copyright (c) 2002-2014 EPFL, Copyright (c) 2011-2014 Typesafe, Inc. - ASM (http://asm.ow2.org/) - Copyright (c) 2000-2011 INRIA, France Telecom + - Grizzled SLF4J (http://software.clapper.org/grizzled-slf4j/) - Copyright (c) 2010 Brian M. Clapper (Below is the 3-clause BSD license) diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml index f490ed97d1b5cfae98e6461fc588efa557616c1a..56ed0d2b803cca45bb13deafc684f8b873e5b497 100644 --- a/flink-runtime/pom.xml +++ b/flink-runtime/pom.xml @@ -147,6 +147,11 @@ under the License. akka-testkit_${scala.binary.version} + + org.clapper + grizzled-slf4j_${scala.binary.version} + + org.scalatest scalatest_${scala.binary.version} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index f6eb9075ba9194b9064747078bd52f7172b3c8ea..e6eee5be34dcf1f028eb280e6549c541517e72ac 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -30,7 +30,6 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.memorymanager.MemoryManager; import org.apache.flink.runtime.messages.ExecutionGraphMessages; -import org.apache.flink.runtime.messages.TaskMessages; import org.apache.flink.runtime.messages.TaskMessages.UnregisterTask; import org.apache.flink.runtime.profiling.TaskManagerProfiler; import org.apache.flink.util.ExceptionUtils; @@ -357,16 +356,6 @@ public class Task { taskManager.tell(new UnregisterTask(executionId), ActorRef.noSender()); } - protected void notifyExecutionStateChange(ExecutionState executionState, - Throwable optionalError) { - LOG.info("Update execution state of {} ({}) to {}.", this.getTaskName(), - this.getExecutionId(), executionState); - taskManager.tell(new TaskMessages.UpdateTaskExecutionState( - new TaskExecutionState(jobId, executionId, executionState, optionalError)), - ActorRef.noSender()); - - } - // ----------------------------------------------------------------------------------------------------------------- // Task Profiling // ----------------------------------------------------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorLogMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorLogMessages.scala index acd434622c5b08b3d92988b1ec2d7a59c8a0b3f8..c74c3398d1ec3516bcf4ccba1cd67a2d35e09e41 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorLogMessages.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorLogMessages.scala @@ -19,13 +19,12 @@ package org.apache.flink.runtime import _root_.akka.actor.Actor -import _root_.akka.event.LoggingAdapter /** * Mixin to add debug message logging */ trait ActorLogMessages { - that: Actor => + that: Actor with ActorSynchronousLogging => override def receive: Receive = new Actor.Receive { private val _receiveWithLogMessages = receiveWithLogMessages @@ -50,6 +49,4 @@ trait ActorLogMessages { } def receiveWithLogMessages: Receive - - protected def log: LoggingAdapter } diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorSynchronousLogging.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorSynchronousLogging.scala new file mode 100644 index 0000000000000000000000000000000000000000..4d3a988b577775c40fecf19c6070903f9fe24c99 --- /dev/null +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorSynchronousLogging.scala @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime + +import _root_.akka.actor.Actor +import grizzled.slf4j.Logger + +/** Adds a logger to an [[akka.actor.Actor]] implementation + * + */ +trait ActorSynchronousLogging { + self: Actor => + + lazy val log = Logger(getClass) +} diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 6870d9510d47814e3d224965226af4336ae1a4f4..83f9e3571b6ec57579a25996d2653240a7a54de3 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -23,6 +23,7 @@ import java.net.InetSocketAddress import java.util.Collections import akka.actor.Status.{Success, Failure} +import grizzled.slf4j.Logger import org.apache.flink.api.common.{JobID, ExecutionConfig} import org.apache.flink.configuration.{ConfigConstants, GlobalConfiguration, Configuration} import org.apache.flink.core.io.InputSplitAssigner @@ -42,7 +43,7 @@ import org.apache.flink.runtime.security.SecurityUtils import org.apache.flink.runtime.security.SecurityUtils.FlinkSecuredRunner import org.apache.flink.runtime.taskmanager.TaskManager import org.apache.flink.runtime.util.{SerializedValue, EnvironmentInformation} -import org.apache.flink.runtime.ActorLogMessages +import org.apache.flink.runtime.{ActorSynchronousLogging, ActorLogMessages} import org.apache.flink.runtime.akka.AkkaUtils import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager import org.apache.flink.runtime.instance.InstanceManager @@ -101,10 +102,7 @@ class JobManager(val flinkConfiguration: Configuration, val defaultExecutionRetries: Int, val delayBetweenRetries: Long, val timeout: FiniteDuration) - extends Actor with ActorLogMessages with ActorLogging { - - /** Reference to the log, for debugging */ - val LOG = JobManager.LOG + extends Actor with ActorLogMessages with ActorSynchronousLogging { /** List of current jobs running jobs */ val currentJobs = scala.collection.mutable.HashMap[JobID, (ExecutionGraph, JobInfo)]() @@ -114,7 +112,7 @@ class JobManager(val flinkConfiguration: Configuration, * Run when the job manager is started. Simply logs an informational message. */ override def preStart(): Unit = { - LOG.info(s"Starting JobManager at ${self.path.toSerializationFormat}.") + log.info(s"Starting JobManager at ${self.path.toSerializationFormat}.") } override def postStop(): Unit = { @@ -138,12 +136,11 @@ class JobManager(val flinkConfiguration: Configuration, try { libraryCacheManager.shutdown() } catch { - case e: IOException => log.error(e, "Could not properly shutdown the library cache manager.") + case e: IOException => log.error("Could not properly shutdown the library cache manager.", e) } - if (log.isDebugEnabled) { - log.debug("Job manager {} is completely stopped.", self.path) - } + log.debug(s"Job manager ${self.path} is completely stopped.") + } /** @@ -179,7 +176,7 @@ class JobManager(val flinkConfiguration: Configuration, // registerTaskManager throws an IllegalStateException if it is already shut down // let the actor crash and restart itself in this case case e: Exception => - log.error(e, "Failed to register TaskManager at instance manager") + log.error("Failed to register TaskManager at instance manager", e) // IMPORTANT: Send the response to the "sender", which is not the // TaskManager actor, but the ask future! @@ -197,7 +194,7 @@ class JobManager(val flinkConfiguration: Configuration, submitJob(jobGraph, listenToEvents = listen) case CancelJob(jobID) => - log.info("Trying to cancel job with ID {}.", jobID) + log.info(s"Trying to cancel job with ID $jobID.") currentJobs.get(jobID) match { case Some((executionGraph, _)) => @@ -208,7 +205,7 @@ class JobManager(val flinkConfiguration: Configuration, sender ! CancellationSuccess(jobID) case None => - log.info("No job found with ID {}.", jobID) + log.info(s"No job found with ID $jobID.") sender ! CancellationFailure(jobID, new IllegalArgumentException("No job found with " + s"ID $jobID.")) } @@ -227,8 +224,9 @@ class JobManager(val flinkConfiguration: Configuration, }(context.dispatcher) sender ! true - case None => log.error("Cannot find execution graph for ID {} to change state to {}.", - taskExecutionState.getJobID, taskExecutionState.getExecutionState) + case None => log.error("Cannot find execution graph for ID " + + s"${taskExecutionState.getJobID} to change state to " + + s"${taskExecutionState.getExecutionState}.") sender ! false } } @@ -239,7 +237,7 @@ class JobManager(val flinkConfiguration: Configuration, val execution = executionGraph.getRegisteredExecutions.get(executionAttempt) if (execution == null) { - log.error("Can not find Execution for attempt {}.", executionAttempt) + log.error(s"Can not find Execution for attempt $executionAttempt.") null } else { val slot = execution.getAssignedResource @@ -256,32 +254,30 @@ class JobManager(val flinkConfiguration: Configuration, case splitAssigner: InputSplitAssigner => val nextInputSplit = splitAssigner.getNextInputSplit(host, taskId) - if (log.isDebugEnabled) { - log.debug("Send next input split {}.", nextInputSplit) - } + log.debug(s"Send next input split $nextInputSplit.") try { InstantiationUtil.serializeObject(nextInputSplit) } catch { case ex: Exception => - log.error(ex, "Could not serialize the next input split of class {}.", - nextInputSplit.getClass) + log.error(s"Could not serialize the next input split of " + + s"class ${nextInputSplit.getClass}.", ex) vertex.fail(new RuntimeException("Could not serialize the next input split " + "of class " + nextInputSplit.getClass + ".", ex)) null } case _ => - log.error("No InputSplitAssigner for vertex ID {}.", vertexID) + log.error(s"No InputSplitAssigner for vertex ID $vertexID.") null } case _ => - log.error("Cannot find execution vertex for vertex ID {}.", vertexID) + log.error(s"Cannot find execution vertex for vertex ID $vertexID.") null } } case None => - log.error("Cannot find execution graph for job ID {}.", jobID) + log.error(s"Cannot find execution graph for job ID $jobID.") null } @@ -290,9 +286,8 @@ class JobManager(val flinkConfiguration: Configuration, case JobStatusChanged(jobID, newJobStatus, timeStamp, error) => currentJobs.get(jobID) match { case Some((executionGraph, jobInfo)) => executionGraph.getJobName - log.info("Status of job {} ({}) changed to {} {}.", - jobID, executionGraph.getJobName, newJobStatus, - if (error == null) "" else error.getMessage) + log.info(s"Status of job $jobID (${executionGraph.getJobName}) changed to $newJobStatus" + + s" ${if (error == null) "" else error.getMessage}.") if (newJobStatus.isTerminalState) { jobInfo.end = timeStamp @@ -304,7 +299,7 @@ class JobManager(val flinkConfiguration: Configuration, accumulatorManager.getJobAccumulatorResultsSerialized(jobID) } catch { case e: Exception => - log.error(e, "Cannot fetch serialized accumulators for job {}", jobID) + log.error(s"Cannot fetch serialized accumulators for job $jobID", e) Collections.emptyMap() } val result = new SerializedJobExecutionResult(jobID, jobInfo.duration, @@ -352,8 +347,8 @@ class JobManager(val flinkConfiguration: Configuration, sender ! Acknowledge executionGraph.scheduleOrUpdateConsumers(partitionId) case None => - log.error("Cannot find execution graph for job ID {} to schedule or update consumers", - jobId) + log.error(s"Cannot find execution graph for job ID $jobId to schedule or update " + + s"consumers.") sender ! Failure(new IllegalStateException("Cannot find execution graph for job ID " + s"$jobId to schedule or update consumers.")) } @@ -383,7 +378,7 @@ class JobManager(val flinkConfiguration: Configuration, sender ! RunningJobsStatus(jobs) } catch { - case t: Throwable => LOG.error("Exception while responding to RequestRunningJobsStatus", t) + case t: Throwable => log.error("Exception while responding to RequestRunningJobsStatus", t) } case RequestJob(jobID) => @@ -403,10 +398,10 @@ class JobManager(val flinkConfiguration: Configuration, case Heartbeat(instanceID, metricsReport) => try { - log.debug("Received hearbeat message from {}", instanceID) + log.debug(s"Received hearbeat message from $instanceID.") instanceManager.reportHeartBeat(instanceID, metricsReport) } catch { - case t: Throwable => log.error(t, "Could not report heart beat from {}.", sender().path) + case t: Throwable => log.error(s"Could not report heart beat from ${sender().path}.", t) } case message: AccumulatorMessage => handleAccumulatorMessage(message) @@ -417,7 +412,7 @@ class JobManager(val flinkConfiguration: Configuration, case Terminated(taskManager) => if (instanceManager.isRegistered(taskManager)) { - log.info("Task manager {} terminated.", taskManager.path) + log.info(s"Task manager ${taskManager.path} terminated.") instanceManager.unregisterTaskManager(taskManager) context.unwatch(taskManager) @@ -430,7 +425,7 @@ class JobManager(val flinkConfiguration: Configuration, val taskManager = sender() if (instanceManager.isRegistered(taskManager)) { - log.info("Task manager {} wants to disconnect, because {}.", taskManager.path, msg) + log.info(s"Task manager ${taskManager.path} wants to disconnect, because $msg.") instanceManager.unregisterTaskManager(taskManager) context.unwatch(taskManager) @@ -590,7 +585,7 @@ class JobManager(val flinkConfiguration: Configuration, } catch { case tt: Throwable => { - log.error(tt, "Error while marking ExecutionGraph as failed.") + log.error("Error while marking ExecutionGraph as failed.", tt) } } } @@ -618,7 +613,7 @@ class JobManager(val flinkConfiguration: Configuration, libraryCacheManager.getClassLoader(jobId) } catch { case e: Exception => - log.error(e, "Dropping accumulators. No class loader available for job " + jobId) + log.error("Dropping accumulators. No class loader available for job " + jobId, e) return } @@ -628,7 +623,7 @@ class JobManager(val flinkConfiguration: Configuration, accumulatorManager.processIncomingAccumulators(jobId, accumulators) } catch { - case e: Exception => log.error(e, "Cannot update accumulators for job " + jobId) + case e: Exception => log.error("Cannot update accumulators for job " + jobId, e) } } else { log.error("Dropping accumulators. No class loader available for job " + jobId) @@ -643,7 +638,7 @@ class JobManager(val flinkConfiguration: Configuration, } catch { case e: Exception => - log.error(e, "Cannot serialize accumulator result") + log.error("Cannot serialize accumulator result", e) sender() ! AccumulatorResultsErroneous(jobID, e) } @@ -656,7 +651,7 @@ class JobManager(val flinkConfiguration: Configuration, } catch { case e: Exception => - log.error(e, "Cannot fetch accumulator result") + log.error("Cannot fetch accumulator result", e) sender() ! AccumulatorResultsErroneous(jobId, e) } @@ -676,8 +671,8 @@ class JobManager(val flinkConfiguration: Configuration, archive ! ArchiveExecutionGraph(jobID, eg) } catch { - case t: Throwable => log.error(t, "Could not prepare the execution graph {} for " + - "archiving.", eg) + case t: Throwable => log.error(s"Could not prepare the execution graph $eg for " + + "archiving.", t) } case None => @@ -687,7 +682,7 @@ class JobManager(val flinkConfiguration: Configuration, libraryCacheManager.unregisterJob(jobID) } catch { case t: Throwable => - log.error(t, "Could not properly unregister job {} form the library cache.", jobID) + log.error(s"Could not properly unregister job $jobID form the library cache.", t) } } } @@ -699,7 +694,7 @@ class JobManager(val flinkConfiguration: Configuration, */ object JobManager { - val LOG = LoggerFactory.getLogger(classOf[JobManager]) + val LOG = Logger(classOf[JobManager]) val STARTUP_FAILURE_RETURN_CODE = 1 val RUNTIME_FAILURE_RETURN_CODE = 2 @@ -717,7 +712,7 @@ object JobManager { */ def main(args: Array[String]): Unit = { // startup checks and logging - EnvironmentInformation.logEnvironmentInfo(LOG, "JobManager", args) + EnvironmentInformation.logEnvironmentInfo(LOG.logger, "JobManager", args) EnvironmentInformation.checkJavaVersion() // parsing the command line arguments @@ -798,7 +793,7 @@ object JobManager { LOG.info("Starting JobManager") // Bring up the job manager actor system first, bind it to the given address. - LOG.info("Starting JobManager actor system at {}:{}", listeningAddress, listeningPort) + LOG.info(s"Starting JobManager actor system at $listeningAddress:$listeningPort.") val jobManagerSystem = try { val akkaConfig = AkkaUtils.getAkkaConfig(configuration, @@ -831,7 +826,7 @@ object JobManager { // the process reaper will kill the JVM process (to ensure easy failure detection) LOG.debug("Starting JobManager process reaper") jobManagerSystem.actorOf( - Props(classOf[ProcessReaper], jobManager, LOG, RUNTIME_FAILURE_RETURN_CODE), + Props(classOf[ProcessReaper], jobManager, LOG.logger, RUNTIME_FAILURE_RETURN_CODE), "JobManager_Process_Reaper") // bring up a local task manager, if needed @@ -847,7 +842,7 @@ object JobManager { LOG.debug("Starting TaskManager process reaper") jobManagerSystem.actorOf( - Props(classOf[ProcessReaper], taskManagerActor, LOG, RUNTIME_FAILURE_RETURN_CODE), + Props(classOf[ProcessReaper], taskManagerActor, LOG.logger, RUNTIME_FAILURE_RETURN_CODE), "TaskManager_Process_Reaper") } diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManagerProfiler.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManagerProfiler.scala index eb8f913a342a5ffcab1b070b1ea1508ea5f82eb3..dd3a1b7cd01586745bfbabb752aedb0a22ab3a57 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManagerProfiler.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManagerProfiler.scala @@ -18,18 +18,21 @@ package org.apache.flink.runtime.jobmanager -import akka.actor.{ActorLogging, Actor} -import org.apache.flink.runtime.ActorLogMessages +import akka.actor.Actor +import org.apache.flink.runtime.{ActorSynchronousLogging, ActorLogMessages} import org.apache.flink.runtime.messages.JobManagerProfilerMessages.ReportProfilingData import org.apache.flink.runtime.profiling.impl.types.{InternalInstanceProfilingData, InternalExecutionVertexThreadProfilingData} -import org.apache.flink.runtime.profiling.types.ThreadProfilingEvent import scala.collection.convert.WrapAsScala /** * Basic skeleton for the JobManager profiler. Currently, it simply logs the received messages. */ -class JobManagerProfiler extends Actor with ActorLogMessages with ActorLogging with WrapAsScala { +class JobManagerProfiler + extends Actor + with ActorLogMessages + with ActorSynchronousLogging + with WrapAsScala { override def receiveWithLogMessages: Receive = { case ReportProfilingData(profilingContainer) => profilingContainer.getIterator foreach { diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala index cb7bfec35c32856f5159e47b7ec97ba093e61e8e..9e71ebbcc4c7ce901baf93b061a4bb27dfe5f9e4 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala @@ -18,9 +18,9 @@ package org.apache.flink.runtime.jobmanager -import akka.actor.{ActorLogging, Actor} +import akka.actor.Actor import org.apache.flink.api.common.JobID -import org.apache.flink.runtime.ActorLogMessages +import org.apache.flink.runtime.{ActorSynchronousLogging, ActorLogMessages} import org.apache.flink.runtime.executiongraph.ExecutionGraph import org.apache.flink.runtime.messages.ArchiveMessages._ import org.apache.flink.runtime.messages.JobManagerMessages._ @@ -48,8 +48,10 @@ import scala.ref.SoftReference * * @param max_entries Maximum number of stored Flink jobs */ -class MemoryArchivist(private val max_entries: Int) extends Actor with ActorLogMessages with -ActorLogging { +class MemoryArchivist(private val max_entries: Int) + extends Actor + with ActorLogMessages + with ActorSynchronousLogging { /* * Map of execution graphs belonging to recently started jobs with the time stamp of the last * received job event. The insert order is preserved through a LinkedHashMap. diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala index 48266e21872b00b1bf67ad2b308f014dbaecf973..8bb1274c3e32c661bb2913d0d4917326f34e0019 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala @@ -21,7 +21,7 @@ package org.apache.flink.runtime.jobmanager import java.lang.{Long => JLong} import akka.actor._ -import org.apache.flink.runtime.ActorLogMessages +import org.apache.flink.runtime.{ActorSynchronousLogging, ActorLogMessages} import org.apache.flink.runtime.execution.ExecutionState import org.apache.flink.runtime.executiongraph.{ExecutionGraph, ExecutionVertex} import org.apache.flink.runtime.jobgraph.JobStatus._ @@ -69,7 +69,7 @@ class StreamCheckpointCoordinator(val executionGraph: ExecutionGraph, val interval: FiniteDuration, var curId: JLong, var ackId: JLong) -extends Actor with ActorLogMessages with ActorLogging { +extends Actor with ActorLogMessages with ActorSynchronousLogging { implicit private val executor = context.dispatcher @@ -78,13 +78,13 @@ extends Actor with ActorLogMessages with ActorLogging { case InitBarrierScheduler => context.system.scheduler.schedule(interval,interval,self,BarrierTimeout) context.system.scheduler.schedule(2 * interval,2 * interval,self,CompactAndUpdate) - log.info("Started Stream State Monitor for job {}{}", - executionGraph.getJobID,executionGraph.getJobName) + log.info("Started Stream State Monitor for job " + + s"${executionGraph.getJobID}${executionGraph.getJobName}") case BarrierTimeout => executionGraph.getState match { case FAILED | CANCELED | FINISHED => - log.info("Stopping monitor for terminated job {}", executionGraph.getJobID) + log.info(s"Stopping monitor for terminated job ${executionGraph.getJobID}.") self ! PoisonPill case RUNNING => curId += 1 @@ -94,8 +94,8 @@ extends Actor with ActorLogMessages with ActorLogging { => vertex.getCurrentAssignedResource.getInstance.getTaskManager ! BarrierReq(vertex.getCurrentExecutionAttempt.getAttemptId,curId)) case _ => - log.debug("Omitting sending barrier since graph is in {} state for job {}", - executionGraph.getState, executionGraph.getJobID) + log.debug("Omitting sending barrier since graph is in " + + s"${executionGraph.getState} state for job ${executionGraph.getJobID}.") } case StateBarrierAck(jobID, jobVertexID, instanceID, checkpointID, opState) => diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index d6b91ec9c0310d8ee5532bbb4c43e2385c9ce032..511de6b21178606cbca7a9e9ff3d731c4c482ef8 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -33,11 +33,12 @@ import com.codahale.metrics.json.MetricsModule import com.codahale.metrics.jvm.{MemoryUsageGaugeSet, GarbageCollectorMetricSet} import com.fasterxml.jackson.databind.ObjectMapper +import grizzled.slf4j.Logger import org.apache.flink.api.common.cache.DistributedCache import org.apache.flink.configuration._ import org.apache.flink.core.fs.Path -import org.apache.flink.runtime.ActorLogMessages +import org.apache.flink.runtime.{ActorSynchronousLogging, ActorLogMessages} import org.apache.flink.runtime.akka.AkkaUtils import org.apache.flink.runtime.blob.{BlobService, BlobCache} import org.apache.flink.runtime.broadcast.BroadcastVariableManager @@ -67,8 +68,6 @@ import org.apache.flink.runtime.security.SecurityUtils.FlinkSecuredRunner import org.apache.flink.runtime.util.{MathUtils, EnvironmentInformation} import org.apache.flink.util.ExceptionUtils -import org.slf4j.LoggerFactory - import scala.concurrent._ import scala.concurrent.duration._ import scala.util.{Failure, Success} @@ -125,10 +124,7 @@ class TaskManager(protected val config: TaskManagerConfiguration, protected val network: NetworkEnvironment, protected val numberOfSlots: Int) -extends Actor with ActorLogMessages with ActorLogging { - - /** The log for all synchronous logging calls */ - private val LOG = TaskManager.LOG +extends Actor with ActorLogMessages with ActorSynchronousLogging { /** The timeout for all actor ask futures */ protected val askTimeout = new Timeout(config.timeout) @@ -178,13 +174,13 @@ extends Actor with ActorLogMessages with ActorLogging { * JobManager. */ override def preStart(): Unit = { - LOG.info("Starting TaskManager actor at {}.", self.path.toSerializationFormat) - LOG.info("TaskManager data connection information: {}", connectionInfo) - LOG.info("TaskManager has {} task slot(s).", numberOfSlots) + log.info(s"Starting TaskManager actor at ${self.path.toSerializationFormat}.") + log.info(s"TaskManager data connection information: $connectionInfo") + log.info(s"TaskManager has $numberOfSlots task slot(s).") // log the initial memory utilization - if (LOG.isInfoEnabled) { - LOG.info(TaskManager.getMemoryUsageStatsAsString(ManagementFactory.getMemoryMXBean)) + if (log.isInfoEnabled) { + log.info(TaskManager.getMemoryUsageStatsAsString(ManagementFactory.getMemoryMXBean)) } // kick off the registration @@ -200,7 +196,7 @@ extends Actor with ActorLogMessages with ActorLogging { * (like network stack, library cache, memory manager, ...) are properly shut down. */ override def postStop(): Unit = { - LOG.info("Stopping TaskManager {}.", self.path.toSerializationFormat) + log.info(s"Stopping TaskManager ${self.path.toSerializationFormat}.") cancelAndClearEverything(new Exception("TaskManager is shutting down.")) @@ -208,35 +204,35 @@ extends Actor with ActorLogMessages with ActorLogging { try { disassociateFromJobManager() } catch { - case t: Exception => LOG.error("Could not cleanly disassociate from JobManager", t) + case t: Exception => log.error("Could not cleanly disassociate from JobManager", t) } } try { ioManager.shutdown() } catch { - case t: Exception => LOG.error("I/O manager did not shutdown properly.", t) + case t: Exception => log.error("I/O manager did not shutdown properly.", t) } try { memoryManager.shutdown() } catch { - case t: Exception => LOG.error("Memory manager did not shutdown properly.", t) + case t: Exception => log.error("Memory manager did not shutdown properly.", t) } try { network.shutdown() } catch { - case t: Exception => LOG.error("Network environment did not shutdown properly.", t) + case t: Exception => log.error("Network environment did not shutdown properly.", t) } try { fileCache.shutdown() } catch { - case t: Exception => LOG.error("FileCache did not shutdown properly.", t) + case t: Exception => log.error("FileCache did not shutdown properly.", t) } - LOG.info("Task manager {} is completely shut down.", self.path) + log.info(s"Task manager ${self.path} is completely shut down.") } /** @@ -277,8 +273,8 @@ extends Actor with ActorLogMessages with ActorLogging { handleJobManagerDisconnect(sender(), "JobManager is no longer reachable") } else { - LOG.warn("Received unrecognized disconnect message from {}", - if (actor == null) null else actor.path) + log.warn(s"Received unrecognized disconnect message " + + s"from ${if (actor == null) null else actor.path}.") } case Disconnect(msg) => @@ -291,7 +287,7 @@ extends Actor with ActorLogMessages with ActorLogging { override def unhandled(message: Any): Unit = { val errorMessage = "Received unknown message " + message val error = new RuntimeException(errorMessage) - LOG.error(errorMessage) + log.error(errorMessage) // terminate all we are currently running (with a dedicated message) // before the actor is stopped @@ -311,8 +307,8 @@ extends Actor with ActorLogMessages with ActorLogging { // at very first, check that we are actually currently associated with a JobManager if (!isConnected) { - LOG.debug("Dropping message {} because the TaskManager is currently " + - "not connected to a JobManager", message) + log.debug(s"Dropping message $message because the TaskManager is currently " + + "not connected to a JobManager.") } // we order the messages by frequency, to make sure the code paths for matching @@ -329,7 +325,7 @@ extends Actor with ActorLogMessages with ActorLogging { // discards intermediate result partitions of a task execution on this TaskManager case FailIntermediateResultPartitions(executionID) => - LOG.info("Discarding the results produced by task execution " + executionID) + log.info("Discarding the results produced by task execution " + executionID) if (network.isAssociated) { try { network.getPartitionManager.releasePartitionsProducedBy(executionID) @@ -392,7 +388,7 @@ extends Actor with ActorLogMessages with ActorLogging { Future { task.failExternally(cause) }.onFailure{ - case t: Throwable => LOG.error(s"Could not fail task ${task} externally.", t) + case t: Throwable => log.error(s"Could not fail task ${task} externally.", t) } case None => } @@ -406,7 +402,7 @@ extends Actor with ActorLogMessages with ActorLogging { Future { task.cancelExecution() }.onFailure{ - case t: Throwable => LOG.error("Could not cancel task " + task, t) + case t: Throwable => log.error("Could not cancel task " + task, t) } sender ! new TaskOperationResult(executionID, true) @@ -426,10 +422,9 @@ extends Actor with ActorLogMessages with ActorLogging { private def handleCheckpointingMessage(message: CheckpointingMessage): Unit = { message match { - case BarrierReq(attemptID, checkpointID) => - LOG.debug("[FT-TaskManager] Barrier {} request received for attempt {}", - checkpointID, attemptID) + log.debug(s"[FT-TaskManager] Barrier $checkpointID request received " + + s"for attempt $attemptID.") runningTasks.get(attemptID) match { case Some(i) => @@ -441,16 +436,14 @@ extends Actor with ActorLogMessages with ActorLogging { barrierTransceiver.broadcastBarrierFromSource(checkpointID) }).start() - case _ => LOG.error( - "Taskmanager received a checkpoint request for non-checkpointing task {}", - attemptID) + case _ => log.error("Taskmanager received a checkpoint request for " + + s"non-checkpointing task $attemptID.") } } case None => // may always happen in case of canceled/finished tasks - LOG.debug("Taskmanager received a checkpoint request for unknown task {}", - attemptID) + log.debug(s"Taskmanager received a checkpoint request for unknown task $attemptID.") } // unknown checkpoint message @@ -474,19 +467,19 @@ extends Actor with ActorLogMessages with ActorLogging { if (isConnected) { // this may be the case, if we queue another attempt and // in the meantime, the registration is acknowledged - LOG.debug( + log.debug( "TaskManager was triggered to register at JobManager, but is already registered") } else if (deadline.exists(_.isOverdue())) { // we failed to register in time. that means we should quit - LOG.error("Failed to register at the JobManager withing the defined maximum " + + log.error("Failed to register at the JobManager withing the defined maximum " + "connect time. Shutting down ...") // terminate ourselves (hasta la vista) self ! PoisonPill } else { - LOG.info(s"Trying to register at JobManager ${jobManagerURL} " + + log.info(s"Trying to register at JobManager ${jobManagerURL} " + s"(attempt ${attempt}, timeout: ${timeout})") val jobManager = context.actorSelection(jobManagerAkkaURL) @@ -510,9 +503,9 @@ extends Actor with ActorLogMessages with ActorLogging { case AcknowledgeRegistration(jobManager, id, blobPort) => if (isConnected) { if (jobManager == currentJobManager.orNull) { - LOG.debug("Ignoring duplicate registration acknowledgement.") + log.debug("Ignoring duplicate registration acknowledgement.") } else { - LOG.warn(s"Ignoring 'AcknowledgeRegistration' message from ${jobManager.path} , " + + log.warn(s"Ignoring 'AcknowledgeRegistration' message from ${jobManager.path} , " + s"because the TaskManager is already registered at ${currentJobManager.orNull}") } } @@ -531,15 +524,15 @@ extends Actor with ActorLogMessages with ActorLogging { case AlreadyRegistered(jobManager, id, blobPort) => if (isConnected) { if (jobManager == currentJobManager.orNull) { - LOG.debug("Ignoring duplicate registration acknowledgement.") + log.debug("Ignoring duplicate registration acknowledgement.") } else { - LOG.warn(s"Received 'AlreadyRegistered' message from JobManager ${jobManager.path}, " + + log.warn(s"Received 'AlreadyRegistered' message from JobManager ${jobManager.path}, " + s"even through TaskManager is currently registered at ${currentJobManager.orNull}") } } else { // not connected, yet, to let's associate - LOG.info("Received 'AlreadyRegistered' message before 'AcknowledgeRegistration'") + log.info("Received 'AlreadyRegistered' message before 'AcknowledgeRegistration'") try { associateWithJobManager(jobManager, id, blobPort) @@ -552,7 +545,7 @@ extends Actor with ActorLogMessages with ActorLogging { case RefuseRegistration(reason) => if (currentJobManager.isEmpty) { - LOG.error(s"The registration at JobManager ${jobManagerAkkaURL} was refused, " + + log.error(s"The registration at JobManager ${jobManagerAkkaURL} was refused, " + s"because: ${reason}. Retrying later...") // try the registration again after some time @@ -570,11 +563,11 @@ extends Actor with ActorLogMessages with ActorLogging { else { // ignore RefuseRegistration messages which arrived after AcknowledgeRegistration if (sender() == currentJobManager.orNull) { - LOG.warn(s"Received 'RefuseRegistration' from the JobManager (${sender().path})" + + log.warn(s"Received 'RefuseRegistration' from the JobManager (${sender().path})" + s" even though this TaskManager is already registered there.") } else { - LOG.warn(s"Ignoring 'RefuseRegistration' from unrelated JobManager (${sender().path})") + log.warn(s"Ignoring 'RefuseRegistration' from unrelated JobManager (${sender().path})") } } @@ -621,7 +614,7 @@ extends Actor with ActorLogMessages with ActorLogging { // sanity check that we are not currently registered with a different JobManager if (isConnected) { if (currentJobManager.get == jobManager) { - LOG.warn("Received call to finish registration with JobManager " + + log.warn("Received call to finish registration with JobManager " + jobManager.path + " even though TaskManager is already registered.") return } @@ -633,8 +626,8 @@ extends Actor with ActorLogMessages with ActorLogging { } // not yet associated, so associate - LOG.info("Successful registration at JobManager ({}), " + - "starting network stack and library cache.", jobManager.path) + log.info(s"Successful registration at JobManager (${jobManager.path}), " + + "starting network stack and library cache.") // sanity check that the JobManager dependent components are not set up currently if (network.isAssociated || blobService.isDefined) { @@ -648,7 +641,7 @@ extends Actor with ActorLogMessages with ActorLogging { catch { case e: Exception => val message = "Could not start network environment." - LOG.error(message, e) + log.error(message, e) throw new RuntimeException(message, e) } @@ -657,7 +650,7 @@ extends Actor with ActorLogMessages with ActorLogging { val jmHost = jobManager.path.address.host.getOrElse("localhost") val address = new InetSocketAddress(jmHost, blobPort) - LOG.info("Determined BLOB server address to be {}. Starting BLOB cache.", address) + log.info(s"Determined BLOB server address to be $address. Starting BLOB cache.") try { val blobcache = new BlobCache(address, config.configuration) @@ -667,7 +660,7 @@ extends Actor with ActorLogMessages with ActorLogging { catch { case e: Exception => val message = "Could not create BLOB cache or library cache." - LOG.error(message, e) + log.error(message, e) throw new RuntimeException(message, e) } } @@ -700,12 +693,12 @@ extends Actor with ActorLogMessages with ActorLogging { */ private def disassociateFromJobManager(): Unit = { if (!isConnected) { - LOG.warn("TaskManager received message to disassociate from JobManager, even though " + + log.warn("TaskManager received message to disassociate from JobManager, even though " + "it is not currently associated with a JobManager") return } - LOG.info("Disassociating from JobManager") + log.info("Disassociating from JobManager") // stop the periodic heartbeats heartbeatScheduler foreach { @@ -748,7 +741,7 @@ extends Actor with ActorLogMessages with ActorLogging { if (jobManager == currentJobManager.orNull) { try { val message = "Disconnecting from JobManager: " + msg - LOG.info(message) + log.info(message) // cancel all our tasks with a proper error message cancelAndClearEverything(new Exception(message)) @@ -769,7 +762,7 @@ extends Actor with ActorLogMessages with ActorLogging { } } else { - LOG.warn("Received erroneous JobManager disconnect message for {}", jobManager.path) + log.warn(s"Received erroneous JobManager disconnect message for ${jobManager.path}.") } } } @@ -807,17 +800,15 @@ extends Actor with ActorLogMessages with ActorLogging { val userCodeClassLoader = libraryCacheManager match { case Some(manager) => - if (LOG.isDebugEnabled) { + if (log.isDebugEnabled) { startRegisteringTask = System.currentTimeMillis() } // triggers the download of all missing jar files from the job manager manager.registerTask(jobID, executionID, tdd.getRequiredJarFiles) - if (LOG.isDebugEnabled) { - LOG.debug("Register task {} at library cache manager took {}s", executionID, - (System.currentTimeMillis() - startRegisteringTask) / 1000.0) - } + log.debug(s"Register task $executionID at library cache manager " + + s"took ${(System.currentTimeMillis() - startRegisteringTask) / 1000.0}s") manager.getClassLoader(jobID) case None => throw new IllegalStateException("There is no valid library cache manager.") @@ -859,7 +850,7 @@ extends Actor with ActorLogMessages with ActorLogging { } // register the task with the network stack and profiles - LOG.info("Register task {}", task) + log.info(s"Register task $task.") network.registerTask(task) val cpTasks = new util.HashMap[String, FutureTask[Path]]() @@ -881,7 +872,7 @@ extends Actor with ActorLogMessages with ActorLogging { val message = if (t.isInstanceOf[CancelTaskException]) { "Task was canceled" } else { - LOG.error("Could not instantiate task with execution ID " + executionID, t) + log.error("Could not instantiate task with execution ID " + executionID, t) ExceptionUtils.stringifyException(t) } @@ -893,7 +884,7 @@ extends Actor with ActorLogMessages with ActorLogging { libraryCacheManager foreach { _.unregisterTask(jobID, executionID) } } catch { - case t: Throwable => LOG.error("Error during cleanup of task deployment.", t) + case t: Throwable => log.error("Error during cleanup of task deployment.", t) } sender ! new TaskOperationResult(executionID, false, message) @@ -924,7 +915,7 @@ extends Actor with ActorLogMessages with ActorLogging { reader.updateInputChannel(partitionInfo) } catch { case t: Throwable => - LOG.error(s"Could not update input data location for task " + + log.error(s"Could not update input data location for task " + s"${task.getTaskName}. Trying to fail task.", t) try { @@ -932,7 +923,7 @@ extends Actor with ActorLogMessages with ActorLogging { } catch { case t: Throwable => - LOG.error("Failed canceling task with execution ID " + executionId + + log.error("Failed canceling task with execution ID " + executionId + " after task update failure.", t) } } @@ -951,8 +942,8 @@ extends Actor with ActorLogMessages with ActorLogging { } case None => - LOG.debug("Discard update for input partitions of task {} : task is no longer running.", - executionId) + log.debug(s"Discard update for input partitions of task $executionId : " + + s"task is no longer running.") sender ! Acknowledge } } @@ -965,7 +956,7 @@ extends Actor with ActorLogMessages with ActorLogging { */ private def cancelAndClearEverything(cause: Throwable) { if (runningTasks.size > 0) { - LOG.info("Cancelling all computations and discarding all cached data.") + log.info("Cancelling all computations and discarding all cached data.") for (t <- runningTasks.values) { t.failExternally(cause) @@ -983,22 +974,22 @@ extends Actor with ActorLogMessages with ActorLogging { try { task.failExternally(new Exception("Task is being removed from TaskManager")) } catch { - case e: Exception => LOG.error("Could not properly fail task", e) + case e: Exception => log.error("Could not properly fail task", e) } } - LOG.info("Unregister task with execution ID {}.", executionID) + log.info(s"Unregister task with execution ID $executionID.") removeAllTaskResources(task) libraryCacheManager foreach { _.unregisterTask(task.getJobID, executionID) } - LOG.info("Updating FINAL execution state of {} ({}) to {}.", - task.getTaskName, task.getExecutionId, task.getExecutionState) + log.info(s"Updating FINAL execution state of ${task.getTaskName} " + + s"(${task.getExecutionId}) to ${task.getExecutionState}.") self ! UpdateTaskExecutionState(new TaskExecutionState( task.getJobID, task.getExecutionId, task.getExecutionState, task.getFailureCause)) case None => - LOG.debug("Cannot find task with ID {} to unregister.", executionID) + log.debug(s"Cannot find task with ID $executionID to unregister.") } } @@ -1044,7 +1035,7 @@ extends Actor with ActorLogMessages with ActorLogging { } } catch { // this is pretty unpleasant, but not a reason to give up immediately - case e: Exception => LOG.error( + case e: Exception => log.error( "Error cleaning up local temp files from the distributed cache.", e) } } @@ -1060,14 +1051,14 @@ extends Actor with ActorLogMessages with ActorLogging { */ private def sendHeartbeatToJobManager(): Unit = { try { - LOG.debug("Sending heartbeat to JobManager") + log.debug("Sending heartbeat to JobManager") val report: Array[Byte] = metricRegistryMapper.writeValueAsBytes(metricRegistry) currentJobManager foreach { jm => jm ! Heartbeat(instanceID, report) } } catch { - case e: Exception => LOG.warn("Error sending the metric heartbeat to the JobManager", e) + case e: Exception => log.warn("Error sending the metric heartbeat to the JobManager", e) } } @@ -1093,7 +1084,7 @@ extends Actor with ActorLogMessages with ActorLogging { recipient ! StackTrace(instanceID, stackTraceStr) } catch { - case e: Exception => LOG.error("Failed to send stack trace to " + recipient.path, e) + case e: Exception => log.error("Failed to send stack trace to " + recipient.path, e) } } @@ -1103,7 +1094,7 @@ extends Actor with ActorLogMessages with ActorLogging { * @param cause The exception that caused the fatal problem. */ private def killTaskManagerFatal(message: String, cause: Throwable): Unit = { - LOG.error("\n" + + log.error("\n" + "==============================================================\n" + "====================== FATAL =======================\n" + "==============================================================\n" + @@ -1121,7 +1112,7 @@ extends Actor with ActorLogMessages with ActorLogging { object TaskManager { /** TaskManager logger for synchronous logging (not through the logging actor) */ - val LOG = LoggerFactory.getLogger(classOf[TaskManager]) + val LOG = Logger(classOf[TaskManager]) /** Return code for unsuccessful TaskManager startup */ val STARTUP_FAILURE_RETURN_CODE = 1 @@ -1159,7 +1150,7 @@ object TaskManager { */ def main(args: Array[String]): Unit = { // startup checks and logging - EnvironmentInformation.logEnvironmentInfo(LOG, "TaskManager", args) + EnvironmentInformation.logEnvironmentInfo(LOG.logger, "TaskManager", args) EnvironmentInformation.checkJavaVersion() // try to parse the command line arguments @@ -1283,8 +1274,8 @@ object TaskManager { LOG.info("Trying to select the network interface and address to use " + "by connecting to the configured JobManager.") - LOG.info("TaskManager will try to connect for {} seconds before falling back to heuristics", - MAX_STARTUP_CONNECT_TIME) + LOG.info(s"TaskManager will try to connect for $MAX_STARTUP_CONNECT_TIME seconds before " + + "falling back to heuristics") val jobManagerAddress = new InetSocketAddress(jobManagerHostname, jobManagerPort) val taskManagerAddress = try { @@ -1361,7 +1352,7 @@ object TaskManager { // Bring up the TaskManager actor system first, bind it to the given address. - LOG.info("Starting TaskManager actor system at {}:{}", taskManagerHostname, actorSystemPort) + LOG.info(s"Starting TaskManager actor system at $taskManagerHostname:$actorSystemPort") val taskManagerSystem = try { val akkaConfig = AkkaUtils.getAkkaConfig(configuration, @@ -1400,7 +1391,7 @@ object TaskManager { // the process reaper will kill the JVM process (to ensure easy failure detection) LOG.debug("Starting TaskManager process reaper") taskManagerSystem.actorOf( - Props(classOf[ProcessReaper], taskManager, LOG, RUNTIME_FAILURE_RETURN_CODE), + Props(classOf[ProcessReaper], taskManager, LOG.logger, RUNTIME_FAILURE_RETURN_CODE), "TaskManager_Process_Reaper") // if desired, start the logging daemon that periodically logs the @@ -1521,7 +1512,7 @@ object TaskManager { "pick a fraction of the available memory.") val memorySize = if (configuredMemory > 0) { - LOG.info("Using {} MB for Flink managed memory.", configuredMemory) + LOG.info(s"Using $configuredMemory MB for Flink managed memory.") configuredMemory << 20 // megabytes to bytes } else { @@ -1534,8 +1525,8 @@ object TaskManager { val relativeMemSize = (EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() * fraction).toLong - LOG.info("Using {} of the currently free heap space for Flink managed memory ({} MB).", - fraction, relativeMemSize >> 20) + LOG.info(s"Using $fraction of the currently free heap space for Flink managed " + + s"memory (${relativeMemSize >> 20} MB).") relativeMemSize } diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerProfiler.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerProfiler.scala index 51e99f9f22753e55c3eb643c325a6d1bdcdce69a..f0079f85e5b856dcee92862d95221bd41603357c 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerProfiler.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerProfiler.scala @@ -22,7 +22,7 @@ import java.lang.management.ManagementFactory import java.util.concurrent.TimeUnit import akka.actor.{Cancellable, ActorRef, Actor, ActorLogging} -import org.apache.flink.runtime.ActorLogMessages +import org.apache.flink.runtime.{ActorSynchronousLogging, ActorLogMessages} import org.apache.flink.runtime.execution.{RuntimeEnvironment, ExecutionState} import org.apache.flink.runtime.executiongraph.ExecutionAttemptID import org.apache.flink.runtime.jobgraph.JobVertexID @@ -44,7 +44,7 @@ import scala.concurrent.duration.FiniteDuration * @param reportInterval Interval of profiling action */ class TaskManagerProfiler(val instancePath: String, val reportInterval: Int) - extends Actor with ActorLogMessages with ActorLogging { + extends Actor with ActorLogMessages with ActorSynchronousLogging { import context.dispatcher @@ -101,7 +101,7 @@ class TaskManagerProfiler(val instancePath: String, val reportInterval: Int) Some(instanceProfiler.generateProfilingData(timestamp)) } catch { case e: ProfilingException => - log.error(e, "Error while retrieving instance profiling data.") + log.error("Error while retrieving instance profiling data.", e) None } @@ -133,7 +133,7 @@ class TaskManagerProfiler(val instancePath: String, val reportInterval: Int) case _ => } case None => - log.warning(s"Could not find environment for execution id $executionID.") + log.warn(s"Could not find environment for execution id $executionID.") } } diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala index b400bb9794fd6668d992ce3bf59100f4f129947d..d6760ecf2f24cb29079e85720004b2781e954681 100644 --- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala +++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala @@ -23,7 +23,7 @@ import java.net.InetSocketAddress import akka.actor._ import akka.pattern.ask import org.apache.flink.configuration.Configuration -import org.apache.flink.runtime.ActorLogMessages +import org.apache.flink.runtime.{ActorSynchronousLogging, ActorLogMessages} import org.apache.flink.runtime.akka.AkkaUtils import org.apache.flink.runtime.jobmanager.JobManager import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus @@ -34,8 +34,10 @@ import scala.concurrent.duration._ import scala.language.postfixOps import scala.util.{Failure, Success} -class ApplicationClient(flinkConfig: Configuration) extends Actor - with ActorLogMessages with ActorLogging { +class ApplicationClient(flinkConfig: Configuration) + extends Actor + with ActorLogMessages + with ActorSynchronousLogging { import context._ val INITIAL_POLLING_DELAY = 0 seconds @@ -78,8 +80,8 @@ class ApplicationClient(flinkConfig: Configuration) extends Actor jobManagerFuture.onComplete { case Success(jm) => self ! JobManagerActorRef(jm) case Failure(t) => - log.error(t, "Registration at JobManager/ApplicationMaster failed. Shutting " + - "ApplicationClient down.") + log.error("Registration at JobManager/ApplicationMaster failed. Shutting " + + "ApplicationClient down.", t) // we could not connect to the job manager --> poison ourselves self ! PoisonPill @@ -93,7 +95,7 @@ class ApplicationClient(flinkConfig: Configuration) extends Actor // sender as the Application Client (this class). (jm ? RegisterClient(self))(timeout).onFailure{ case t: Throwable => - log.error(t, "Could not register at the job manager.") + log.error("Could not register at the job manager.", t) self ! PoisonPill } @@ -144,7 +146,7 @@ class ApplicationClient(flinkConfig: Configuration) extends Actor // ----------------- handle messages from the cluster ------------------- // receive remote messages case msg: YarnMessage => - log.debug("Received new YarnMessage {}. Now {} messages in queue", msg, messagesQueue.size) + log.debug(s"Received new YarnMessage $msg. Now ${messagesQueue.size} messages in queue") messagesQueue.enqueue(msg) // locally forward messages diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala index 0fdef4775489623cdb6588b5212f2562c9b47bf1..06e16dcb6b2705e7b51e0a9ec058e3e7f34f03b3 100644 --- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala +++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala @@ -21,6 +21,7 @@ import java.io.{PrintWriter, FileWriter, BufferedWriter} import java.security.PrivilegedAction import akka.actor._ +import grizzled.slf4j.Logger import org.apache.flink.client.CliFrontend import org.apache.flink.configuration.{GlobalConfiguration, Configuration, ConfigConstants} import org.apache.flink.runtime.akka.AkkaUtils @@ -38,7 +39,7 @@ import scala.io.Source object ApplicationMaster { import scala.collection.JavaConversions._ - val LOG = LoggerFactory.getLogger(this.getClass) + val LOG = Logger(getClass) val CONF_FILE = "flink-conf.yaml" val MODIFIED_CONF_FILE = "flink-conf-modified.yaml" @@ -50,9 +51,9 @@ object ApplicationMaster { LOG.info(s"YARN daemon runs as ${UserGroupInformation.getCurrentUser.getShortUserName} " + s"setting user to execute Flink ApplicationMaster/JobManager to ${yarnClientUsername}") - EnvironmentInformation.logEnvironmentInfo(LOG, "YARN ApplicationMaster/JobManager", args) + EnvironmentInformation.logEnvironmentInfo(LOG.logger, "YARN ApplicationMaster/JobManager", args) EnvironmentInformation.checkJavaVersion() - org.apache.flink.runtime.util.SignalHandler.register(LOG) + org.apache.flink.runtime.util.SignalHandler.register(LOG.logger) val ugi = UserGroupInformation.createRemoteUser(yarnClientUsername) @@ -206,7 +207,7 @@ object ApplicationMaster { (Configuration, ActorSystem, ActorRef, ActorRef) = { LOG.info("Starting JobManager for YARN") - LOG.info("Loading config from: {}", currDir) + LOG.info(s"Loading config from: $currDir.") GlobalConfiguration.loadConfiguration(currDir) val configuration = GlobalConfiguration.getConfiguration() diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala index f3841306a201d317dd7330172ea21458aa0045f2..26d1f691176e9c44f6c83cadc08942f95a8b045d 100644 --- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala +++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala @@ -98,7 +98,7 @@ trait ApplicationMasterActor extends ActorLogMessages { def receiveYarnMessages: Receive = { case StopYarnSession(status, diag) => - log.info("Stopping YARN JobManager with status {} and diagnostic {}.", status, diag) + log.info(s"Stopping YARN JobManager with status $status and diagnostic $diag.") instanceManager.getAllRegisteredInstances.asScala foreach { instance => @@ -108,11 +108,11 @@ trait ApplicationMasterActor extends ActorLogMessages { rmClientOption foreach { rmClient => Try(rmClient.unregisterApplicationMaster(status, diag, "")).recover{ - case t: Throwable => log.error(t, "Could not unregister the application master.") + case t: Throwable => log.error("Could not unregister the application master.", t) } Try(rmClient.close()).recover{ - case t:Throwable => log.error(t, "Could not close the AMRMClient.") + case t:Throwable => log.error("Could not close the AMRMClient.", t) } } @@ -121,7 +121,7 @@ trait ApplicationMasterActor extends ActorLogMessages { nmClientOption foreach { nmClient => Try(nmClient.close()).recover{ - case t: Throwable => log.error(t, "Could not close the NMClient.") + case t: Throwable => log.error("Could not close the NMClient.", t) } } @@ -133,7 +133,7 @@ trait ApplicationMasterActor extends ActorLogMessages { context.system.shutdown() case RegisterClient(client) => - log.info("Register {} as client.", client.path) + log.info(s"Register ${client.path} as client.") messageListener = Some(client) sender ! Acknowledge @@ -142,7 +142,7 @@ trait ApplicationMasterActor extends ActorLogMessages { case msg: StopAMAfterJob => val jobId = msg.jobId - log.info("ApplicatonMaster will shut down YARN session when job {} has finished", jobId) + log.info(s"ApplicatonMaster will shut down YARN session when job $jobId has finished.") stopWhenJobFinished = jobId sender() ! Acknowledge @@ -155,26 +155,26 @@ trait ApplicationMasterActor extends ActorLogMessages { startYarnSession(conf, actorSystemPort, webServerPort) case jnf: JobNotFound => - LOG.warn("Job with ID {} not found in JobManager", jnf.jobID) + log.warn(s"Job with ID ${jnf.jobID} not found in JobManager") if(stopWhenJobFinished == null) { - LOG.warn("The ApplicationMaster didn't expect to receive this message") + log.warn("The ApplicationMaster didn't expect to receive this message") } case jobStatus: CurrentJobStatus => if(stopWhenJobFinished == null) { - LOG.warn("Received job status {} which wasn't requested", jobStatus) + log.warn(s"Received job status $jobStatus which wasn't requested.") } else { if(stopWhenJobFinished != jobStatus.jobID) { - LOG.warn(s"Received job status for job ${jobStatus.jobID} but expected status for " + + log.warn(s"Received job status for job ${jobStatus.jobID} but expected status for " + s"job $stopWhenJobFinished") } else { if(jobStatus.status.isTerminalState) { - LOG.info(s"Job with ID ${jobStatus.jobID} is in terminal state ${jobStatus.status}. " + + log.info(s"Job with ID ${jobStatus.jobID} is in terminal state ${jobStatus.status}. " + s"Shutting down YARN session") self ! StopYarnSession(FinalApplicationStatus.SUCCEEDED, s"The monitored job with ID ${jobStatus.jobID} has finished.") } else { - LOG.debug(s"Monitored job with ID ${jobStatus.jobID} is in state ${jobStatus.status}") + log.debug(s"Monitored job with ID ${jobStatus.jobID} is in state ${jobStatus.status}") } } } @@ -229,14 +229,14 @@ trait ApplicationMasterActor extends ActorLogMessages { }) } // return containers if the RM wants them and we haven't allocated them yet. - val preemtionMessage = response.getPreemptionMessage - if(preemtionMessage != null) { - log.info("Received preemtion message from YARN {}", preemtionMessage) - val contract = preemtionMessage.getContract + val preemptionMessage = response.getPreemptionMessage + if(preemptionMessage != null) { + log.info(s"Received preemtion message from YARN $preemptionMessage.") + val contract = preemptionMessage.getContract if(contract != null) { tryToReturnContainers(contract.getContainers.asScala) } - val strictContract = preemtionMessage.getStrictContract + val strictContract = preemptionMessage.getStrictContract if(strictContract != null) { tryToReturnContainers(strictContract.getContainers.asScala) } @@ -247,13 +247,13 @@ trait ApplicationMasterActor extends ActorLogMessages { // check if we want to start some of our allocated containers. if(runningContainers < numTaskManager) { var missingContainers = numTaskManager - runningContainers - log.info("The user requested {} containers, {} running. {} containers missing", - numTaskManager, runningContainers, missingContainers) + log.info(s"The user requested $numTaskManager containers, $runningContainers " + + s"running. $missingContainers containers missing") // not enough containers running if(allocatedContainersList.size > 0) { - log.info("{} containers already allocated by YARN. Starting...", - allocatedContainersList.size) + log.info(s"${allocatedContainersList.size} containers already allocated by YARN. " + + "Starting...") // we have some containers allocated to us --> start them allocatedContainersList = allocatedContainersList.dropWhile(container => { if (missingContainers <= 0) { @@ -279,7 +279,7 @@ trait ApplicationMasterActor extends ActorLogMessages { } } catch { case e: YarnException => - log.error(e, "Exception while starting YARN container") + log.error("Exception while starting YARN container", e) } } case None => @@ -305,24 +305,24 @@ trait ApplicationMasterActor extends ActorLogMessages { log.info(s"There are $missingContainers containers missing." + s" $numPendingRequests are already requested. " + s"Requesting $toAllocateFromYarn additional container(s) from YARN. " + - s"Reallocation of failed containers is enabled=$reallocate ('{}')", - ConfigConstants.YARN_REALLOCATE_FAILED_CONTAINERS) + s"Reallocation of failed containers is enabled=$reallocate " + + s"('${ConfigConstants.YARN_REALLOCATE_FAILED_CONTAINERS}')") // there are still containers missing. Request them from YARN if(reallocate) { for(i <- 1 to toAllocateFromYarn) { val containerRequest = getContainerRequest(memoryPerTaskManager) rmClient.addContainerRequest(containerRequest) numPendingRequests += 1 - log.info("Requested additional container from YARN. Pending requests {}", - numPendingRequests) + log.info("Requested additional container from YARN. Pending requests " + + s"$numPendingRequests.") } } } } if(runningContainers >= numTaskManager && allocatedContainersList.size > 0) { - log.info("Flink has {} allocated containers which are not needed right now. " + - "Returning them", allocatedContainersList.size) + log.info(s"Flink has ${allocatedContainersList.size} allocated containers which " + + s"are not needed right now. Returning them") for(container <- allocatedContainersList) { rmClient.releaseAssignedContainer(container.getId) } @@ -353,9 +353,9 @@ trait ApplicationMasterActor extends ActorLogMessages { self ! StopYarnSession(FinalApplicationStatus.FAILED, "Fatal error in AM: AMRMClient " + "was not set") } - log.debug("Processed Heartbeat with RMClient. Running containers {}," + - "failed containers {}, allocated containers {}", runningContainers, failedContainers, - allocatedContainersList.size) + log.debug(s"Processed Heartbeat with RMClient. Running containers $runningContainers, " + + s"failed containers $failedContainers, " + + s"allocated containers ${allocatedContainersList.size}.") } private def runningContainerIds(): mutable.MutableList[ContainerId] = { @@ -381,17 +381,16 @@ trait ApplicationMasterActor extends ActorLogMessages { YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS), MILLISECONDS) if(YARN_HEARTBEAT_DELAY.gteq(yarnExpiryInterval)) { - log.warning("The heartbeat interval of the Flink Application master ({}) is greater than " + - "YARN's expiry interval ({}). The application is likely to be killed by YARN.", - YARN_HEARTBEAT_DELAY, yarnExpiryInterval) + log.warn(s"The heartbeat interval of the Flink Application master " + + s"($YARN_HEARTBEAT_DELAY) is greater than YARN's expiry interval " + + s"($yarnExpiryInterval). The application is likely to be killed by YARN.") } numTaskManager = env.get(FlinkYarnClient.ENV_TM_COUNT).toInt maxFailedContainers = flinkConfiguration. getInteger(ConfigConstants.YARN_MAX_FAILED_CONTAINERS, numTaskManager) - log.info("Requesting {} TaskManagers. Tolerating {} failed TaskManagers", - numTaskManager, maxFailedContainers) - + log.info(s"Requesting $numTaskManager TaskManagers. Tolerating $maxFailedContainers failed " + + "TaskManagers") val remoteFlinkJarPath = env.get(FlinkYarnClient.FLINK_JAR_PATH) val fs = FileSystem.get(conf) @@ -468,7 +467,7 @@ trait ApplicationMasterActor extends ActorLogMessages { context.system.scheduler.scheduleOnce(FAST_YARN_HEARTBEAT_DELAY, self, HeartbeatWithYarn) } recover { case t: Throwable => - log.error(t, "Could not start yarn session.") + log.error("Could not start yarn session.", t) self ! StopYarnSession(FinalApplicationStatus.FAILED, s"ApplicationMaster failed while starting. Exception Message: ${t.getMessage}") } @@ -479,7 +478,7 @@ trait ApplicationMasterActor extends ActorLogMessages { allocatedContainersList = allocatedContainersList.dropWhile( container => { val result = requestedBackContainers.getId.equals(container.getId) if(result) { - log.info("Returning container {} back to ResourceManager.", container) + log.info(s"Returning container $container back to ResourceManager.") } result }) @@ -548,7 +547,7 @@ trait ApplicationMasterActor extends ActorLogMessages { ctx.setTokens(securityTokens) } catch { case t: Throwable => - log.error(t, "Getting current user info failed when trying to launch the container") + log.error("Getting current user info failed when trying to launch the container", t) } ctx diff --git a/pom.xml b/pom.xml index 524e45fc018e33c359879d23ab05c05c7027c170..1278386888362a5e5e9f71a70933b4ade7cf41fd 100644 --- a/pom.xml +++ b/pom.xml @@ -274,6 +274,12 @@ under the License. ${scala.version} + + org.clapper + grizzled-slf4j_${scala.binary.version} + 1.0.2 + + com.typesafe.akka akka-actor_${scala.binary.version}