提交 d43bf8d9 编写于 作者: Z zentol

[FLINK-4093] Expose metric interfaces

This closes #2134
上级 62cb954d
......@@ -24,48 +24,36 @@ import org.apache.flink.annotation.PublicEvolving;
* A Counter is a {@link Metric} that measures a count.
*/
@PublicEvolving
public final class Counter implements Metric {
private long count;
public interface Counter extends Metric {
/**
* Increment the current count by 1.
*/
public void inc() {
count++;
}
void inc();
/**
* Increment the current count by the given value.
*
* @param n value to increment the current count by
*/
public void inc(long n) {
count += n;
}
void inc(long n);
/**
* Decrement the current count by 1.
*/
public void dec() {
count--;
}
void dec();
/**
* Decrement the current count by the given value.
*
* @param n value to decrement the current count by
*/
public void dec(long n) {
count -= n;
}
void dec(long n);
/**
* Returns the current count.
*
* @return current count
*/
public long getCount() {
return count;
}
long getCount();
}
......@@ -24,12 +24,11 @@ import org.apache.flink.annotation.PublicEvolving;
* A Gauge is a {@link Metric} that calculates a specific value at a point in time.
*/
@PublicEvolving
public abstract class Gauge<T> implements Metric {
public interface Gauge<T> extends Metric {
/**
* Calculates and returns the measured value.
*
* @return calculated value
*/
public abstract T getValue();
T getValue();
}
......@@ -63,7 +63,7 @@ public interface MetricGroup {
* Creates and registers a new {@link org.apache.flink.metrics.Counter} with Flink.
*
* @param name name of the counter
* @return the registered counter
* @return the created counter
*/
Counter counter(int name);
......@@ -71,19 +71,39 @@ public interface MetricGroup {
* Creates and registers a new {@link org.apache.flink.metrics.Counter} with Flink.
*
* @param name name of the counter
* @return the registered counter
* @return the created counter
*/
Counter counter(String name);
/**
* Registers a {@link org.apache.flink.metrics.Counter} with Flink.
*
* @param name name of the counter
* @param counter counter to register
* @param <C> counter type
* @return the given counter
*/
<C extends Counter> C counter(int name, C counter);
/**
* Registers a {@link org.apache.flink.metrics.Counter} with Flink.
*
* @param name name of the counter
* @param counter counter to register
* @param <C> counter type
* @return the given counter
*/
<C extends Counter> C counter(String name, C counter);
/**
* Registers a new {@link org.apache.flink.metrics.Gauge} with Flink.
*
* @param name name of the gauge
* @param gauge gauge to register
* @param <T> return type of the gauge
* @return the registered gauge
* @return the given gauge
*/
<T> Gauge<T> gauge(int name, Gauge<T> gauge);
<T, G extends Gauge<T>> G gauge(int name, G gauge);
/**
* Registers a new {@link org.apache.flink.metrics.Gauge} with Flink.
......@@ -91,9 +111,9 @@ public interface MetricGroup {
* @param name name of the gauge
* @param gauge gauge to register
* @param <T> return type of the gauge
* @return the registered gauge
* @return the given gauge
*/
<T> Gauge<T> gauge(String name, Gauge<T> gauge);
<T, G extends Gauge<T>> G gauge(String name, G gauge);
// ------------------------------------------------------------------------
// Groups
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.metrics;
/**
* A simple low-overhead {@link org.apache.flink.metrics.Counter} that is not thread-safe.
*/
public class SimpleCounter implements Counter {
private long count;
/**
* Increment the current count by 1.
*/
@Override
public void inc() {
count++;
}
/**
* Increment the current count by the given value.
*
* @param n value to increment the current count by
*/
@Override
public void inc(long n) {
count += n;
}
/**
* Decrement the current count by 1.
*/
@Override
public void dec() {
count--;
}
/**
* Decrement the current count by the given value.
*
* @param n value to decrement the current count by
*/
@Override
public void dec(long n) {
count -= n;
}
/**
* Returns the current count.
*
* @return current count
*/
@Override
public long getCount() {
return count;
}
}
......@@ -24,6 +24,7 @@ import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.MetricRegistry;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.metrics.groups.scope.ScopeFormat;
import org.slf4j.Logger;
......@@ -146,18 +147,27 @@ public abstract class AbstractMetricGroup implements MetricGroup {
@Override
public Counter counter(String name) {
Counter counter = new Counter();
return counter(name, new SimpleCounter());
}
@Override
public <C extends Counter> C counter(int name, C counter) {
return counter(String.valueOf(name), counter);
}
@Override
public <C extends Counter> C counter(String name, C counter) {
addMetric(name, counter);
return counter;
}
@Override
public <T> Gauge<T> gauge(int name, Gauge<T> gauge) {
public <T, G extends Gauge<T>> G gauge(int name, G gauge) {
return gauge(String.valueOf(name), gauge);
}
@Override
public <T> Gauge<T> gauge(String name, Gauge<T> gauge) {
public <T, G extends Gauge<T>> G gauge(String name, G gauge) {
addMetric(name, gauge);
return gauge;
}
......
......@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.SimpleCounter;
/**
* A special {@link MetricGroup} that does not register any metrics at the metrics registry
......@@ -42,21 +43,31 @@ public class UnregisteredMetricsGroup implements MetricGroup {
@Override
public Counter counter(int name) {
return new Counter();
return new SimpleCounter();
}
@Override
public Counter counter(String name) {
return new Counter();
return new SimpleCounter();
}
@Override
public <T> Gauge<T> gauge(int name, Gauge<T> gauge) {
public <C extends Counter> C counter(int name, C counter) {
return counter;
}
@Override
public <C extends Counter> C counter(String name, C counter) {
return counter;
}
@Override
public <T, G extends Gauge<T>> G gauge(int name, G gauge) {
return gauge;
}
@Override
public <T> Gauge<T> gauge(String name, Gauge<T> gauge) {
public <T, G extends Gauge<T>> G gauge(String name, G gauge) {
return gauge;
}
......
......@@ -2294,11 +2294,10 @@ object TaskManager {
private def instantiateClassLoaderMetrics(metrics: MetricGroup) {
val mxBean = ManagementFactory.getClassLoadingMXBean
metrics
.gauge("ClassesLoaded", new FlinkGauge[Long] {
metrics.gauge[Long, FlinkGauge[Long]]("ClassesLoaded", new FlinkGauge[Long] {
override def getValue: Long = mxBean.getTotalLoadedClassCount
})
metrics.gauge("ClassesUnloaded", new FlinkGauge[Long] {
metrics.gauge[Long, FlinkGauge[Long]]("ClassesUnloaded", new FlinkGauge[Long] {
override def getValue: Long = mxBean.getUnloadedClassCount
})
}
......@@ -2308,10 +2307,10 @@ object TaskManager {
for (garbageCollector <- garbageCollectors) {
val gcGroup = metrics.addGroup("\"" + garbageCollector.getName + "\"")
gcGroup.gauge("Count", new FlinkGauge[Long] {
gcGroup.gauge[Long, FlinkGauge[Long]]("Count", new FlinkGauge[Long] {
override def getValue: Long = garbageCollector.getCollectionCount
})
gcGroup.gauge("Time", new FlinkGauge[Long] {
gcGroup.gauge[Long, FlinkGauge[Long]]("Time", new FlinkGauge[Long] {
override def getValue: Long = garbageCollector.getCollectionTime
})
}
......@@ -2320,24 +2319,24 @@ object TaskManager {
private def instantiateMemoryMetrics(metrics: MetricGroup) {
val mxBean = ManagementFactory.getMemoryMXBean
val heap = metrics.addGroup("Heap")
heap.gauge("Used", new FlinkGauge[Long] {
heap.gauge[Long, FlinkGauge[Long]]("Used", new FlinkGauge[Long] {
override def getValue: Long = mxBean.getHeapMemoryUsage.getUsed
})
heap.gauge("Committed", new FlinkGauge[Long] {
heap.gauge[Long, FlinkGauge[Long]]("Committed", new FlinkGauge[Long] {
override def getValue: Long = mxBean.getHeapMemoryUsage.getCommitted
})
heap.gauge("Max", new FlinkGauge[Long] {
heap.gauge[Long, FlinkGauge[Long]]("Max", new FlinkGauge[Long] {
override def getValue: Long = mxBean.getHeapMemoryUsage.getMax
})
val nonHeap = metrics.addGroup("NonHeap")
nonHeap.gauge("Used", new FlinkGauge[Long] {
nonHeap.gauge[Long, FlinkGauge[Long]]("Used", new FlinkGauge[Long] {
override def getValue: Long = mxBean.getNonHeapMemoryUsage.getUsed
})
nonHeap.gauge("Committed", new FlinkGauge[Long] {
nonHeap.gauge[Long, FlinkGauge[Long]]("Committed", new FlinkGauge[Long] {
override def getValue: Long = mxBean.getNonHeapMemoryUsage.getCommitted
})
nonHeap.gauge("Max", new FlinkGauge[Long] {
nonHeap.gauge[Long, FlinkGauge[Long]]("Max", new FlinkGauge[Long] {
override def getValue: Long = mxBean.getNonHeapMemoryUsage.getMax
})
......@@ -2346,15 +2345,15 @@ object TaskManager {
val directObjectName = new ObjectName("java.nio:type=BufferPool,name=direct")
val direct = metrics.addGroup("Direct")
direct.gauge("Count", new FlinkGauge[Long] {
direct.gauge[Long, FlinkGauge[Long]]("Count", new FlinkGauge[Long] {
override def getValue: Long = con
.getAttribute(directObjectName, "Count").asInstanceOf[Long]
})
direct.gauge("MemoryUsed", new FlinkGauge[Long] {
direct.gauge[Long, FlinkGauge[Long]]("MemoryUsed", new FlinkGauge[Long] {
override def getValue: Long = con
.getAttribute(directObjectName, "MemoryUsed").asInstanceOf[Long]
})
direct.gauge("TotalCapacity", new FlinkGauge[Long] {
direct.gauge[Long, FlinkGauge[Long]]("TotalCapacity", new FlinkGauge[Long] {
override def getValue: Long = con
.getAttribute(directObjectName, "TotalCapacity").asInstanceOf[Long]
})
......@@ -2362,15 +2361,15 @@ object TaskManager {
val mappedObjectName = new ObjectName("java.nio:type=BufferPool,name=direct")
val mapped = metrics.addGroup("Mapped")
mapped.gauge("Count", new FlinkGauge[Long] {
mapped.gauge[Long, FlinkGauge[Long]]("Count", new FlinkGauge[Long] {
override def getValue: Long = con
.getAttribute(mappedObjectName, "Count").asInstanceOf[Long]
})
mapped.gauge("MemoryUsed", new FlinkGauge[Long] {
mapped.gauge[Long, FlinkGauge[Long]]("MemoryUsed", new FlinkGauge[Long] {
override def getValue: Long = con
.getAttribute(mappedObjectName, "MemoryUsed").asInstanceOf[Long]
})
mapped.gauge("TotalCapacity", new FlinkGauge[Long] {
mapped.gauge[Long, FlinkGauge[Long]]("TotalCapacity", new FlinkGauge[Long] {
override def getValue: Long = con
.getAttribute(mappedObjectName, "TotalCapacity").asInstanceOf[Long]
})
......@@ -2379,8 +2378,7 @@ object TaskManager {
private def instantiateThreadMetrics(metrics: MetricGroup): Unit = {
val mxBean = ManagementFactory.getThreadMXBean
metrics
.gauge("Count", new FlinkGauge[Int] {
metrics.gauge[Int, FlinkGauge[Int]]("Count", new FlinkGauge[Int] {
override def getValue: Int = mxBean.getThreadCount
})
}
......@@ -2390,11 +2388,10 @@ object TaskManager {
val mxBean = ManagementFactory.getOperatingSystemMXBean
.asInstanceOf[com.sun.management.OperatingSystemMXBean]
metrics
.gauge("Load", new FlinkGauge[Double] {
metrics.gauge[Double, FlinkGauge[Double]]("Load", new FlinkGauge[Double] {
override def getValue: Double = mxBean.getProcessCpuLoad
})
metrics.gauge("Time", new FlinkGauge[Long] {
metrics.gauge[Long, FlinkGauge[Long]]("Time", new FlinkGauge[Long] {
override def getValue: Long = mxBean.getProcessCpuTime
})
}
......
......@@ -18,7 +18,7 @@
package org.apache.flink.runtime.io.network.partition.consumer;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
......@@ -119,7 +119,7 @@ public class InputChannelTest {
ResultPartitionID partitionId,
Tuple2<Integer, Integer> initialAndMaxBackoff) {
super(inputGate, channelIndex, partitionId, initialAndMaxBackoff, new Counter());
super(inputGate, channelIndex, partitionId, initialAndMaxBackoff, new SimpleCounter());
}
@Override
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册