From d43bf8d9b3085d1341bfca61e05c2a77e5426226 Mon Sep 17 00:00:00 2001 From: zentol Date: Wed, 22 Jun 2016 10:37:03 +0200 Subject: [PATCH] [FLINK-4093] Expose metric interfaces This closes #2134 --- .../org/apache/flink/metrics/Counter.java | 24 ++----- .../java/org/apache/flink/metrics/Gauge.java | 5 +- .../org/apache/flink/metrics/MetricGroup.java | 32 +++++++-- .../apache/flink/metrics/SimpleCounter.java | 71 +++++++++++++++++++ .../metrics/groups/AbstractMetricGroup.java | 16 ++++- .../groups/UnregisteredMetricsGroup.java | 19 +++-- .../runtime/taskmanager/TaskManager.scala | 41 +++++------ .../partition/consumer/InputChannelTest.java | 4 +- 8 files changed, 154 insertions(+), 58 deletions(-) create mode 100644 flink-core/src/main/java/org/apache/flink/metrics/SimpleCounter.java diff --git a/flink-core/src/main/java/org/apache/flink/metrics/Counter.java b/flink-core/src/main/java/org/apache/flink/metrics/Counter.java index acc37cf99cf..ffb1cc7ae55 100644 --- a/flink-core/src/main/java/org/apache/flink/metrics/Counter.java +++ b/flink-core/src/main/java/org/apache/flink/metrics/Counter.java @@ -24,48 +24,36 @@ import org.apache.flink.annotation.PublicEvolving; * A Counter is a {@link Metric} that measures a count. */ @PublicEvolving -public final class Counter implements Metric { - - private long count; +public interface Counter extends Metric { /** * Increment the current count by 1. */ - public void inc() { - count++; - } + void inc(); /** * Increment the current count by the given value. * * @param n value to increment the current count by */ - public void inc(long n) { - count += n; - } + void inc(long n); /** * Decrement the current count by 1. */ - public void dec() { - count--; - } + void dec(); /** * Decrement the current count by the given value. * * @param n value to decrement the current count by */ - public void dec(long n) { - count -= n; - } + void dec(long n); /** * Returns the current count. * * @return current count */ - public long getCount() { - return count; - } + long getCount(); } diff --git a/flink-core/src/main/java/org/apache/flink/metrics/Gauge.java b/flink-core/src/main/java/org/apache/flink/metrics/Gauge.java index aad8deb5df9..740645d3256 100644 --- a/flink-core/src/main/java/org/apache/flink/metrics/Gauge.java +++ b/flink-core/src/main/java/org/apache/flink/metrics/Gauge.java @@ -24,12 +24,11 @@ import org.apache.flink.annotation.PublicEvolving; * A Gauge is a {@link Metric} that calculates a specific value at a point in time. */ @PublicEvolving -public abstract class Gauge implements Metric { - +public interface Gauge extends Metric { /** * Calculates and returns the measured value. * * @return calculated value */ - public abstract T getValue(); + T getValue(); } diff --git a/flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java index 6c9e04477ac..b13194961cb 100644 --- a/flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java +++ b/flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java @@ -63,7 +63,7 @@ public interface MetricGroup { * Creates and registers a new {@link org.apache.flink.metrics.Counter} with Flink. * * @param name name of the counter - * @return the registered counter + * @return the created counter */ Counter counter(int name); @@ -71,19 +71,39 @@ public interface MetricGroup { * Creates and registers a new {@link org.apache.flink.metrics.Counter} with Flink. * * @param name name of the counter - * @return the registered counter + * @return the created counter */ Counter counter(String name); + /** + * Registers a {@link org.apache.flink.metrics.Counter} with Flink. + * + * @param name name of the counter + * @param counter counter to register + * @param counter type + * @return the given counter + */ + C counter(int name, C counter); + + /** + * Registers a {@link org.apache.flink.metrics.Counter} with Flink. + * + * @param name name of the counter + * @param counter counter to register + * @param counter type + * @return the given counter + */ + C counter(String name, C counter); + /** * Registers a new {@link org.apache.flink.metrics.Gauge} with Flink. * * @param name name of the gauge * @param gauge gauge to register * @param return type of the gauge - * @return the registered gauge + * @return the given gauge */ - Gauge gauge(int name, Gauge gauge); + > G gauge(int name, G gauge); /** * Registers a new {@link org.apache.flink.metrics.Gauge} with Flink. @@ -91,9 +111,9 @@ public interface MetricGroup { * @param name name of the gauge * @param gauge gauge to register * @param return type of the gauge - * @return the registered gauge + * @return the given gauge */ - Gauge gauge(String name, Gauge gauge); + > G gauge(String name, G gauge); // ------------------------------------------------------------------------ // Groups diff --git a/flink-core/src/main/java/org/apache/flink/metrics/SimpleCounter.java b/flink-core/src/main/java/org/apache/flink/metrics/SimpleCounter.java new file mode 100644 index 00000000000..9720b080243 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/metrics/SimpleCounter.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.metrics; + +/** + * A simple low-overhead {@link org.apache.flink.metrics.Counter} that is not thread-safe. + */ +public class SimpleCounter implements Counter { + private long count; + + /** + * Increment the current count by 1. + */ + @Override + public void inc() { + count++; + } + + /** + * Increment the current count by the given value. + * + * @param n value to increment the current count by + */ + @Override + public void inc(long n) { + count += n; + } + + /** + * Decrement the current count by 1. + */ + @Override + public void dec() { + count--; + } + + /** + * Decrement the current count by the given value. + * + * @param n value to decrement the current count by + */ + @Override + public void dec(long n) { + count -= n; + } + + /** + * Returns the current count. + * + * @return current count + */ + @Override + public long getCount() { + return count; + } +} diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java index 032fa04b115..93eb7348073 100644 --- a/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java +++ b/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java @@ -24,6 +24,7 @@ import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.Metric; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.MetricRegistry; +import org.apache.flink.metrics.SimpleCounter; import org.apache.flink.metrics.groups.scope.ScopeFormat; import org.slf4j.Logger; @@ -146,18 +147,27 @@ public abstract class AbstractMetricGroup implements MetricGroup { @Override public Counter counter(String name) { - Counter counter = new Counter(); + return counter(name, new SimpleCounter()); + } + + @Override + public C counter(int name, C counter) { + return counter(String.valueOf(name), counter); + } + + @Override + public C counter(String name, C counter) { addMetric(name, counter); return counter; } @Override - public Gauge gauge(int name, Gauge gauge) { + public > G gauge(int name, G gauge) { return gauge(String.valueOf(name), gauge); } @Override - public Gauge gauge(String name, Gauge gauge) { + public > G gauge(String name, G gauge) { addMetric(name, gauge); return gauge; } diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java index 961bcceae6a..29d71d9ff7b 100644 --- a/flink-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java +++ b/flink-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java @@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.SimpleCounter; /** * A special {@link MetricGroup} that does not register any metrics at the metrics registry @@ -42,21 +43,31 @@ public class UnregisteredMetricsGroup implements MetricGroup { @Override public Counter counter(int name) { - return new Counter(); + return new SimpleCounter(); } @Override public Counter counter(String name) { - return new Counter(); + return new SimpleCounter(); } @Override - public Gauge gauge(int name, Gauge gauge) { + public C counter(int name, C counter) { + return counter; + } + + @Override + public C counter(String name, C counter) { + return counter; + } + + @Override + public > G gauge(int name, G gauge) { return gauge; } @Override - public Gauge gauge(String name, Gauge gauge) { + public > G gauge(String name, G gauge) { return gauge; } diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index 8ef22afbf98..1fb0e090d96 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -2294,11 +2294,10 @@ object TaskManager { private def instantiateClassLoaderMetrics(metrics: MetricGroup) { val mxBean = ManagementFactory.getClassLoadingMXBean - metrics - .gauge("ClassesLoaded", new FlinkGauge[Long] { + metrics.gauge[Long, FlinkGauge[Long]]("ClassesLoaded", new FlinkGauge[Long] { override def getValue: Long = mxBean.getTotalLoadedClassCount }) - metrics.gauge("ClassesUnloaded", new FlinkGauge[Long] { + metrics.gauge[Long, FlinkGauge[Long]]("ClassesUnloaded", new FlinkGauge[Long] { override def getValue: Long = mxBean.getUnloadedClassCount }) } @@ -2308,10 +2307,10 @@ object TaskManager { for (garbageCollector <- garbageCollectors) { val gcGroup = metrics.addGroup("\"" + garbageCollector.getName + "\"") - gcGroup.gauge("Count", new FlinkGauge[Long] { + gcGroup.gauge[Long, FlinkGauge[Long]]("Count", new FlinkGauge[Long] { override def getValue: Long = garbageCollector.getCollectionCount }) - gcGroup.gauge("Time", new FlinkGauge[Long] { + gcGroup.gauge[Long, FlinkGauge[Long]]("Time", new FlinkGauge[Long] { override def getValue: Long = garbageCollector.getCollectionTime }) } @@ -2320,24 +2319,24 @@ object TaskManager { private def instantiateMemoryMetrics(metrics: MetricGroup) { val mxBean = ManagementFactory.getMemoryMXBean val heap = metrics.addGroup("Heap") - heap.gauge("Used", new FlinkGauge[Long] { + heap.gauge[Long, FlinkGauge[Long]]("Used", new FlinkGauge[Long] { override def getValue: Long = mxBean.getHeapMemoryUsage.getUsed }) - heap.gauge("Committed", new FlinkGauge[Long] { + heap.gauge[Long, FlinkGauge[Long]]("Committed", new FlinkGauge[Long] { override def getValue: Long = mxBean.getHeapMemoryUsage.getCommitted }) - heap.gauge("Max", new FlinkGauge[Long] { + heap.gauge[Long, FlinkGauge[Long]]("Max", new FlinkGauge[Long] { override def getValue: Long = mxBean.getHeapMemoryUsage.getMax }) val nonHeap = metrics.addGroup("NonHeap") - nonHeap.gauge("Used", new FlinkGauge[Long] { + nonHeap.gauge[Long, FlinkGauge[Long]]("Used", new FlinkGauge[Long] { override def getValue: Long = mxBean.getNonHeapMemoryUsage.getUsed }) - nonHeap.gauge("Committed", new FlinkGauge[Long] { + nonHeap.gauge[Long, FlinkGauge[Long]]("Committed", new FlinkGauge[Long] { override def getValue: Long = mxBean.getNonHeapMemoryUsage.getCommitted }) - nonHeap.gauge("Max", new FlinkGauge[Long] { + nonHeap.gauge[Long, FlinkGauge[Long]]("Max", new FlinkGauge[Long] { override def getValue: Long = mxBean.getNonHeapMemoryUsage.getMax }) @@ -2346,15 +2345,15 @@ object TaskManager { val directObjectName = new ObjectName("java.nio:type=BufferPool,name=direct") val direct = metrics.addGroup("Direct") - direct.gauge("Count", new FlinkGauge[Long] { + direct.gauge[Long, FlinkGauge[Long]]("Count", new FlinkGauge[Long] { override def getValue: Long = con .getAttribute(directObjectName, "Count").asInstanceOf[Long] }) - direct.gauge("MemoryUsed", new FlinkGauge[Long] { + direct.gauge[Long, FlinkGauge[Long]]("MemoryUsed", new FlinkGauge[Long] { override def getValue: Long = con .getAttribute(directObjectName, "MemoryUsed").asInstanceOf[Long] }) - direct.gauge("TotalCapacity", new FlinkGauge[Long] { + direct.gauge[Long, FlinkGauge[Long]]("TotalCapacity", new FlinkGauge[Long] { override def getValue: Long = con .getAttribute(directObjectName, "TotalCapacity").asInstanceOf[Long] }) @@ -2362,15 +2361,15 @@ object TaskManager { val mappedObjectName = new ObjectName("java.nio:type=BufferPool,name=direct") val mapped = metrics.addGroup("Mapped") - mapped.gauge("Count", new FlinkGauge[Long] { + mapped.gauge[Long, FlinkGauge[Long]]("Count", new FlinkGauge[Long] { override def getValue: Long = con .getAttribute(mappedObjectName, "Count").asInstanceOf[Long] }) - mapped.gauge("MemoryUsed", new FlinkGauge[Long] { + mapped.gauge[Long, FlinkGauge[Long]]("MemoryUsed", new FlinkGauge[Long] { override def getValue: Long = con .getAttribute(mappedObjectName, "MemoryUsed").asInstanceOf[Long] }) - mapped.gauge("TotalCapacity", new FlinkGauge[Long] { + mapped.gauge[Long, FlinkGauge[Long]]("TotalCapacity", new FlinkGauge[Long] { override def getValue: Long = con .getAttribute(mappedObjectName, "TotalCapacity").asInstanceOf[Long] }) @@ -2379,8 +2378,7 @@ object TaskManager { private def instantiateThreadMetrics(metrics: MetricGroup): Unit = { val mxBean = ManagementFactory.getThreadMXBean - metrics - .gauge("Count", new FlinkGauge[Int] { + metrics.gauge[Int, FlinkGauge[Int]]("Count", new FlinkGauge[Int] { override def getValue: Int = mxBean.getThreadCount }) } @@ -2390,11 +2388,10 @@ object TaskManager { val mxBean = ManagementFactory.getOperatingSystemMXBean .asInstanceOf[com.sun.management.OperatingSystemMXBean] - metrics - .gauge("Load", new FlinkGauge[Double] { + metrics.gauge[Double, FlinkGauge[Double]]("Load", new FlinkGauge[Double] { override def getValue: Double = mxBean.getProcessCpuLoad }) - metrics.gauge("Time", new FlinkGauge[Long] { + metrics.gauge[Long, FlinkGauge[Long]]("Time", new FlinkGauge[Long] { override def getValue: Long = mxBean.getProcessCpuTime }) } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java index da15f08de33..0868398377f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java @@ -18,7 +18,7 @@ package org.apache.flink.runtime.io.network.partition.consumer; -import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.SimpleCounter; import org.apache.flink.runtime.event.TaskEvent; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; @@ -119,7 +119,7 @@ public class InputChannelTest { ResultPartitionID partitionId, Tuple2 initialAndMaxBackoff) { - super(inputGate, channelIndex, partitionId, initialAndMaxBackoff, new Counter()); + super(inputGate, channelIndex, partitionId, initialAndMaxBackoff, new SimpleCounter()); } @Override -- GitLab