diff --git a/flink-core/src/main/java/org/apache/flink/metrics/Counter.java b/flink-core/src/main/java/org/apache/flink/metrics/Counter.java index acc37cf99cf45012b51b211a61df784de7904efa..ffb1cc7ae550c7ba1f1096f959cfb23ff9585383 100644 --- a/flink-core/src/main/java/org/apache/flink/metrics/Counter.java +++ b/flink-core/src/main/java/org/apache/flink/metrics/Counter.java @@ -24,48 +24,36 @@ import org.apache.flink.annotation.PublicEvolving; * A Counter is a {@link Metric} that measures a count. */ @PublicEvolving -public final class Counter implements Metric { - - private long count; +public interface Counter extends Metric { /** * Increment the current count by 1. */ - public void inc() { - count++; - } + void inc(); /** * Increment the current count by the given value. * * @param n value to increment the current count by */ - public void inc(long n) { - count += n; - } + void inc(long n); /** * Decrement the current count by 1. */ - public void dec() { - count--; - } + void dec(); /** * Decrement the current count by the given value. * * @param n value to decrement the current count by */ - public void dec(long n) { - count -= n; - } + void dec(long n); /** * Returns the current count. * * @return current count */ - public long getCount() { - return count; - } + long getCount(); } diff --git a/flink-core/src/main/java/org/apache/flink/metrics/Gauge.java b/flink-core/src/main/java/org/apache/flink/metrics/Gauge.java index aad8deb5df9ef5bf925194abd86093d50bc1ebea..740645d32565df4328a775a1bcad2bd3b16990f9 100644 --- a/flink-core/src/main/java/org/apache/flink/metrics/Gauge.java +++ b/flink-core/src/main/java/org/apache/flink/metrics/Gauge.java @@ -24,12 +24,11 @@ import org.apache.flink.annotation.PublicEvolving; * A Gauge is a {@link Metric} that calculates a specific value at a point in time. */ @PublicEvolving -public abstract class Gauge implements Metric { - +public interface Gauge extends Metric { /** * Calculates and returns the measured value. * * @return calculated value */ - public abstract T getValue(); + T getValue(); } diff --git a/flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java index 6c9e04477acd2455d049e4690c34b77ba67662a9..b13194961cb60dd82d4695ef8783109477cf67d3 100644 --- a/flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java +++ b/flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java @@ -63,7 +63,7 @@ public interface MetricGroup { * Creates and registers a new {@link org.apache.flink.metrics.Counter} with Flink. * * @param name name of the counter - * @return the registered counter + * @return the created counter */ Counter counter(int name); @@ -71,19 +71,39 @@ public interface MetricGroup { * Creates and registers a new {@link org.apache.flink.metrics.Counter} with Flink. * * @param name name of the counter - * @return the registered counter + * @return the created counter */ Counter counter(String name); + /** + * Registers a {@link org.apache.flink.metrics.Counter} with Flink. + * + * @param name name of the counter + * @param counter counter to register + * @param counter type + * @return the given counter + */ + C counter(int name, C counter); + + /** + * Registers a {@link org.apache.flink.metrics.Counter} with Flink. + * + * @param name name of the counter + * @param counter counter to register + * @param counter type + * @return the given counter + */ + C counter(String name, C counter); + /** * Registers a new {@link org.apache.flink.metrics.Gauge} with Flink. * * @param name name of the gauge * @param gauge gauge to register * @param return type of the gauge - * @return the registered gauge + * @return the given gauge */ - Gauge gauge(int name, Gauge gauge); + > G gauge(int name, G gauge); /** * Registers a new {@link org.apache.flink.metrics.Gauge} with Flink. @@ -91,9 +111,9 @@ public interface MetricGroup { * @param name name of the gauge * @param gauge gauge to register * @param return type of the gauge - * @return the registered gauge + * @return the given gauge */ - Gauge gauge(String name, Gauge gauge); + > G gauge(String name, G gauge); // ------------------------------------------------------------------------ // Groups diff --git a/flink-core/src/main/java/org/apache/flink/metrics/SimpleCounter.java b/flink-core/src/main/java/org/apache/flink/metrics/SimpleCounter.java new file mode 100644 index 0000000000000000000000000000000000000000..9720b080243268a873e085fb449ce0475ffb7ea4 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/metrics/SimpleCounter.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.metrics; + +/** + * A simple low-overhead {@link org.apache.flink.metrics.Counter} that is not thread-safe. + */ +public class SimpleCounter implements Counter { + private long count; + + /** + * Increment the current count by 1. + */ + @Override + public void inc() { + count++; + } + + /** + * Increment the current count by the given value. + * + * @param n value to increment the current count by + */ + @Override + public void inc(long n) { + count += n; + } + + /** + * Decrement the current count by 1. + */ + @Override + public void dec() { + count--; + } + + /** + * Decrement the current count by the given value. + * + * @param n value to decrement the current count by + */ + @Override + public void dec(long n) { + count -= n; + } + + /** + * Returns the current count. + * + * @return current count + */ + @Override + public long getCount() { + return count; + } +} diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java index 032fa04b1151b6f7b114d83766e4d6431038c8cb..93eb7348073d2c55a0c4990e75150a349ea69ae8 100644 --- a/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java +++ b/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java @@ -24,6 +24,7 @@ import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.Metric; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.MetricRegistry; +import org.apache.flink.metrics.SimpleCounter; import org.apache.flink.metrics.groups.scope.ScopeFormat; import org.slf4j.Logger; @@ -146,18 +147,27 @@ public abstract class AbstractMetricGroup implements MetricGroup { @Override public Counter counter(String name) { - Counter counter = new Counter(); + return counter(name, new SimpleCounter()); + } + + @Override + public C counter(int name, C counter) { + return counter(String.valueOf(name), counter); + } + + @Override + public C counter(String name, C counter) { addMetric(name, counter); return counter; } @Override - public Gauge gauge(int name, Gauge gauge) { + public > G gauge(int name, G gauge) { return gauge(String.valueOf(name), gauge); } @Override - public Gauge gauge(String name, Gauge gauge) { + public > G gauge(String name, G gauge) { addMetric(name, gauge); return gauge; } diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java index 961bcceae6a5c7f3fdc99e988e21af9ecf799a3b..29d71d9ff7bd40762d9f8433f9567d1212995b23 100644 --- a/flink-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java +++ b/flink-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java @@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.SimpleCounter; /** * A special {@link MetricGroup} that does not register any metrics at the metrics registry @@ -42,21 +43,31 @@ public class UnregisteredMetricsGroup implements MetricGroup { @Override public Counter counter(int name) { - return new Counter(); + return new SimpleCounter(); } @Override public Counter counter(String name) { - return new Counter(); + return new SimpleCounter(); } @Override - public Gauge gauge(int name, Gauge gauge) { + public C counter(int name, C counter) { + return counter; + } + + @Override + public C counter(String name, C counter) { + return counter; + } + + @Override + public > G gauge(int name, G gauge) { return gauge; } @Override - public Gauge gauge(String name, Gauge gauge) { + public > G gauge(String name, G gauge) { return gauge; } diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index 8ef22afbf982a81cc012329f36e96ef5a4559af8..1fb0e090d961312a5287759defaf4795126b96c7 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -2294,11 +2294,10 @@ object TaskManager { private def instantiateClassLoaderMetrics(metrics: MetricGroup) { val mxBean = ManagementFactory.getClassLoadingMXBean - metrics - .gauge("ClassesLoaded", new FlinkGauge[Long] { + metrics.gauge[Long, FlinkGauge[Long]]("ClassesLoaded", new FlinkGauge[Long] { override def getValue: Long = mxBean.getTotalLoadedClassCount }) - metrics.gauge("ClassesUnloaded", new FlinkGauge[Long] { + metrics.gauge[Long, FlinkGauge[Long]]("ClassesUnloaded", new FlinkGauge[Long] { override def getValue: Long = mxBean.getUnloadedClassCount }) } @@ -2308,10 +2307,10 @@ object TaskManager { for (garbageCollector <- garbageCollectors) { val gcGroup = metrics.addGroup("\"" + garbageCollector.getName + "\"") - gcGroup.gauge("Count", new FlinkGauge[Long] { + gcGroup.gauge[Long, FlinkGauge[Long]]("Count", new FlinkGauge[Long] { override def getValue: Long = garbageCollector.getCollectionCount }) - gcGroup.gauge("Time", new FlinkGauge[Long] { + gcGroup.gauge[Long, FlinkGauge[Long]]("Time", new FlinkGauge[Long] { override def getValue: Long = garbageCollector.getCollectionTime }) } @@ -2320,24 +2319,24 @@ object TaskManager { private def instantiateMemoryMetrics(metrics: MetricGroup) { val mxBean = ManagementFactory.getMemoryMXBean val heap = metrics.addGroup("Heap") - heap.gauge("Used", new FlinkGauge[Long] { + heap.gauge[Long, FlinkGauge[Long]]("Used", new FlinkGauge[Long] { override def getValue: Long = mxBean.getHeapMemoryUsage.getUsed }) - heap.gauge("Committed", new FlinkGauge[Long] { + heap.gauge[Long, FlinkGauge[Long]]("Committed", new FlinkGauge[Long] { override def getValue: Long = mxBean.getHeapMemoryUsage.getCommitted }) - heap.gauge("Max", new FlinkGauge[Long] { + heap.gauge[Long, FlinkGauge[Long]]("Max", new FlinkGauge[Long] { override def getValue: Long = mxBean.getHeapMemoryUsage.getMax }) val nonHeap = metrics.addGroup("NonHeap") - nonHeap.gauge("Used", new FlinkGauge[Long] { + nonHeap.gauge[Long, FlinkGauge[Long]]("Used", new FlinkGauge[Long] { override def getValue: Long = mxBean.getNonHeapMemoryUsage.getUsed }) - nonHeap.gauge("Committed", new FlinkGauge[Long] { + nonHeap.gauge[Long, FlinkGauge[Long]]("Committed", new FlinkGauge[Long] { override def getValue: Long = mxBean.getNonHeapMemoryUsage.getCommitted }) - nonHeap.gauge("Max", new FlinkGauge[Long] { + nonHeap.gauge[Long, FlinkGauge[Long]]("Max", new FlinkGauge[Long] { override def getValue: Long = mxBean.getNonHeapMemoryUsage.getMax }) @@ -2346,15 +2345,15 @@ object TaskManager { val directObjectName = new ObjectName("java.nio:type=BufferPool,name=direct") val direct = metrics.addGroup("Direct") - direct.gauge("Count", new FlinkGauge[Long] { + direct.gauge[Long, FlinkGauge[Long]]("Count", new FlinkGauge[Long] { override def getValue: Long = con .getAttribute(directObjectName, "Count").asInstanceOf[Long] }) - direct.gauge("MemoryUsed", new FlinkGauge[Long] { + direct.gauge[Long, FlinkGauge[Long]]("MemoryUsed", new FlinkGauge[Long] { override def getValue: Long = con .getAttribute(directObjectName, "MemoryUsed").asInstanceOf[Long] }) - direct.gauge("TotalCapacity", new FlinkGauge[Long] { + direct.gauge[Long, FlinkGauge[Long]]("TotalCapacity", new FlinkGauge[Long] { override def getValue: Long = con .getAttribute(directObjectName, "TotalCapacity").asInstanceOf[Long] }) @@ -2362,15 +2361,15 @@ object TaskManager { val mappedObjectName = new ObjectName("java.nio:type=BufferPool,name=direct") val mapped = metrics.addGroup("Mapped") - mapped.gauge("Count", new FlinkGauge[Long] { + mapped.gauge[Long, FlinkGauge[Long]]("Count", new FlinkGauge[Long] { override def getValue: Long = con .getAttribute(mappedObjectName, "Count").asInstanceOf[Long] }) - mapped.gauge("MemoryUsed", new FlinkGauge[Long] { + mapped.gauge[Long, FlinkGauge[Long]]("MemoryUsed", new FlinkGauge[Long] { override def getValue: Long = con .getAttribute(mappedObjectName, "MemoryUsed").asInstanceOf[Long] }) - mapped.gauge("TotalCapacity", new FlinkGauge[Long] { + mapped.gauge[Long, FlinkGauge[Long]]("TotalCapacity", new FlinkGauge[Long] { override def getValue: Long = con .getAttribute(mappedObjectName, "TotalCapacity").asInstanceOf[Long] }) @@ -2379,8 +2378,7 @@ object TaskManager { private def instantiateThreadMetrics(metrics: MetricGroup): Unit = { val mxBean = ManagementFactory.getThreadMXBean - metrics - .gauge("Count", new FlinkGauge[Int] { + metrics.gauge[Int, FlinkGauge[Int]]("Count", new FlinkGauge[Int] { override def getValue: Int = mxBean.getThreadCount }) } @@ -2390,11 +2388,10 @@ object TaskManager { val mxBean = ManagementFactory.getOperatingSystemMXBean .asInstanceOf[com.sun.management.OperatingSystemMXBean] - metrics - .gauge("Load", new FlinkGauge[Double] { + metrics.gauge[Double, FlinkGauge[Double]]("Load", new FlinkGauge[Double] { override def getValue: Double = mxBean.getProcessCpuLoad }) - metrics.gauge("Time", new FlinkGauge[Long] { + metrics.gauge[Long, FlinkGauge[Long]]("Time", new FlinkGauge[Long] { override def getValue: Long = mxBean.getProcessCpuTime }) } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java index da15f08de330a45c5d9c3add9ff5cb9d5aef3419..0868398377fb6b7573310a2076c2afc44dac8fc5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java @@ -18,7 +18,7 @@ package org.apache.flink.runtime.io.network.partition.consumer; -import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.SimpleCounter; import org.apache.flink.runtime.event.TaskEvent; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; @@ -119,7 +119,7 @@ public class InputChannelTest { ResultPartitionID partitionId, Tuple2 initialAndMaxBackoff) { - super(inputGate, channelIndex, partitionId, initialAndMaxBackoff, new Counter()); + super(inputGate, channelIndex, partitionId, initialAndMaxBackoff, new SimpleCounter()); } @Override