From 35ea6505cd66f29d9e9d382fc4133dc4aac75923 Mon Sep 17 00:00:00 2001 From: Stephan Ewen Date: Thu, 21 May 2015 20:38:31 +0200 Subject: [PATCH] [runtime] Extend memory and GC monitor logging. --- .../runtime/taskmanager/MemoryLogger.java | 183 ++++++++++++++++++ .../runtime/taskmanager/TaskManager.scala | 61 +----- .../test/classloading/ClassLoaderITCase.java | 6 +- 3 files changed, 188 insertions(+), 62 deletions(-) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/MemoryLogger.java diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/MemoryLogger.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/MemoryLogger.java new file mode 100644 index 00000000000..5c821e902ed --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/MemoryLogger.java @@ -0,0 +1,183 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.flink.runtime.taskmanager; + +import akka.actor.ActorSystem; + +import org.slf4j.Logger; + +import java.lang.management.GarbageCollectorMXBean; +import java.lang.management.ManagementFactory; +import java.lang.management.MemoryMXBean; +import java.lang.management.MemoryPoolMXBean; +import java.lang.management.MemoryType; +import java.lang.management.MemoryUsage; +import java.util.List; + +/** + * A thread the periodically logs statistics about: + * + */ +public class MemoryLogger extends Thread { + + private final Logger logger; + + private final long interval; + + private final MemoryMXBean memoryBean; + + private final List poolBeans; + + private final List gcBeans; + + private final ActorSystem monitored; + + private volatile boolean running = true; + + + public MemoryLogger(Logger logger, long interval) { + this(logger, interval, null); + } + + public MemoryLogger(Logger logger, long interval, ActorSystem monitored) { + super("Memory Logger"); + setDaemon(true); + setPriority(Thread.MIN_PRIORITY); + + this.logger = logger; + this.interval = interval; + this.monitored = monitored; + + this.memoryBean = ManagementFactory.getMemoryMXBean(); + this.poolBeans = ManagementFactory.getMemoryPoolMXBeans(); + this.gcBeans = ManagementFactory.getGarbageCollectorMXBeans(); + } + + public void shutdown() { + this.running = false; + interrupt(); + } + + // ------------------------------------------------------------------------ + + @Override + public void run() { + try { + while (running && (monitored == null || !monitored.isTerminated())) { + logger.info(getMemoryUsageStatsAsString(memoryBean)); + logger.info(getMemoryPoolStatsAsString(poolBeans)); + logger.info(getGarbageCollectorStatsAsString(gcBeans)); + + try { + Thread.sleep(interval); + } + catch (InterruptedException e) { + if (running) { + throw e; + } + } + } + } + catch (Throwable t) { + logger.error("Memory logger terminated with exception", t); + } + } + + // ------------------------------------------------------------------------ + + /** + * Gets the memory footprint of the JVM in a string representation. + * + * @return A string describing how much heap memory and direct memory are allocated and used. + */ + public static String getMemoryUsageStatsAsString(MemoryMXBean memoryMXBean) { + MemoryUsage heap = memoryMXBean.getHeapMemoryUsage(); + MemoryUsage nonHeap = memoryMXBean.getNonHeapMemoryUsage(); + + long heapUsed = heap.getUsed() >> 20; + long heapCommitted = heap.getCommitted() >> 20; + long heapMax = heap.getMax() >> 20; + + long nonHeapUsed = nonHeap.getUsed() >> 20; + long nonHeapCommitted = nonHeap.getCommitted() >> 20; + long nonHeapMax = nonHeap.getMax() >> 20; + + return String.format("Memory usage stats: [HEAP: %d/%d/%d MB, " + + "NON HEAP: %d/%d/%d MB (used/committed/max)]", + heapUsed, heapCommitted, heapMax, nonHeapUsed, nonHeapCommitted, nonHeapMax); + } + + /** + * Gets the memory pool statistics from the JVM. + * + * @param poolBeans The collection of memory pool beans. + * @return A string denoting the names and sizes of the memory pools. + */ + public static String getMemoryPoolStatsAsString(List poolBeans) { + StringBuilder bld = new StringBuilder("Off-heap pool stats: "); + int count = 0; + + for (MemoryPoolMXBean bean : poolBeans) { + if (bean.getType() == MemoryType.NON_HEAP) { + if (count > 0) { + bld.append(", "); + } + count++; + + MemoryUsage usage = bean.getUsage(); + long used = usage.getUsed() >> 20; + long committed = usage.getCommitted() >> 20; + long max = usage.getMax() >> 20; + + bld.append('[').append(bean.getName()).append(": "); + bld.append(used).append('/').append(committed).append('/').append(max); + bld.append(" MB (used/committed/max)]"); + } + } + + return bld.toString(); + } + + /** + * Gets the garbage collection statistics from the JVM. + * + * @param gcMXBeans The collection of garbage collector beans. + * @return A string denoting the number of times and total elapsed time in garbage collection. + */ + public static String getGarbageCollectorStatsAsString(List gcMXBeans) { + StringBuilder bld = new StringBuilder("Garbage collector stats: "); + + for (GarbageCollectorMXBean bean : gcMXBeans) { + bld.append('[').append(bean.getName()).append(", GC TIME (ms): ").append(bean.getCollectionTime()); + bld.append(", GC COUNT: ").append(bean.getCollectionCount()).append(']'); + + bld.append(", "); + } + + if (!gcMXBeans.isEmpty()) { + bld.setLength(bld.length() - 2); + } + + return bld.toString(); + } +} diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index e6fcca28fd0..6a896242f8f 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -176,7 +176,7 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging { // log the initial memory utilization if (log.isInfoEnabled) { - log.info(TaskManager.getMemoryUsageStatsAsString(ManagementFactory.getMemoryMXBean)) + log.info(MemoryLogger.getMemoryUsageStatsAsString(ManagementFactory.getMemoryMXBean)) } // kick off the registration @@ -1278,25 +1278,7 @@ object TaskManager { ConfigConstants.TASK_MANAGER_DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS, ConfigConstants.DEFAULT_TASK_MANAGER_DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS) - val logger = new Thread("Memory Usage Logger") { - override def run(): Unit = { - try { - val memoryMXBean = ManagementFactory.getMemoryMXBean - val gcMXBeans = ManagementFactory.getGarbageCollectorMXBeans.asScala - - while (!taskManagerSystem.isTerminated) { - Thread.sleep(interval) - LOG.info(getMemoryUsageStatsAsString(memoryMXBean)) - LOG.info(TaskManager.getGarbageCollectorStatsAsString(gcMXBeans)) - } - } - catch { - case t: Throwable => LOG.error("Memory usage logging thread died", t) - } - } - } - logger.setDaemon(true) - logger.setPriority(Thread.MIN_PRIORITY) + val logger = new MemoryLogger(LOG.logger, interval, taskManagerSystem) logger.start() } @@ -1717,45 +1699,6 @@ object TaskManager { } } - /** - * Gets the memory footprint of the JVM in a string representation. - * - * @param memoryMXBean The memory management bean used to access the memory statistics. - * @return A string describing how much heap memory and direct memory are allocated and used. - */ - private def getMemoryUsageStatsAsString(memoryMXBean: MemoryMXBean): String = { - val heap = memoryMXBean.getHeapMemoryUsage - val nonHeap = memoryMXBean.getNonHeapMemoryUsage - - val heapUsed = heap.getUsed >> 20 - val heapCommitted = heap.getCommitted >> 20 - val heapMax = heap.getMax >> 20 - - val nonHeapUsed = nonHeap.getUsed >> 20 - val nonHeapCommitted = nonHeap.getCommitted >> 20 - val nonHeapMax = nonHeap.getMax >> 20 - - s"Memory usage stats: [HEAP: $heapUsed/$heapCommitted/$heapMax MB, " + - s"NON HEAP: $nonHeapUsed/$nonHeapCommitted/$nonHeapMax MB (used/committed/max)]" - } - - /** - * Gets the garbage collection statistics from the JVM. - * - * @param gcMXBeans The collection of garbage collector beans. - * @return A string denoting the number of times and total elapsed time in garbage collection. - */ - private def getGarbageCollectorStatsAsString(gcMXBeans: Iterable[GarbageCollectorMXBean]) - : String = { - val beans = gcMXBeans map { - bean => - s"[${bean.getName}, GC TIME (ms): ${bean.getCollectionTime}, " + - s"GC COUNT: ${bean.getCollectionCount}]" - } mkString ", " - - "Garbage collector stats: " + beans - } - /** * Creates the registry of default metrics, including stats about garbage collection, memory * usage, and system CPU load. diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java index 09fd14ce38b..9069573a447 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java @@ -76,9 +76,9 @@ public class ClassLoaderITCase { testCluster.shutdown(); } } - catch (Throwable t) { - t.printStackTrace(); - Assert.fail(t.getMessage()); + catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); } } } -- GitLab