提交 e4fe89d6 编写于 作者: Z zentol

[FLINK-4210][metrics] Move close()/isClosed() out of MetricGroup

This closes #2286
上级 0b4c04d7
...@@ -27,34 +27,10 @@ import org.apache.flink.annotation.PublicEvolving; ...@@ -27,34 +27,10 @@ import org.apache.flink.annotation.PublicEvolving;
* hierarchy based on the group names. * hierarchy based on the group names.
* *
* <p>A MetricGroup is uniquely identified by it's place in the hierarchy and name. * <p>A MetricGroup is uniquely identified by it's place in the hierarchy and name.
*
* <p>Metrics groups can be {@link #close() closed}. Upon closing, the group de-register all metrics
* from any metrics reporter and any internal maps. Note that even closed metrics groups
* return Counters, Gauges, etc to the code, to prevent exceptions in the monitored code.
* These metrics simply do not get reported any more, when created on a closed group.
*/ */
@PublicEvolving @PublicEvolving
public interface MetricGroup { public interface MetricGroup {
// ------------------------------------------------------------------------
// Closing
// ------------------------------------------------------------------------
/**
* Marks the group as closed.
* Recursively unregisters all {@link Metric Metrics} contained in this group.
*
* <p>Any metrics created after the call to this function will not be registered in
* the {@link MetricRegistry} and not be reported to any reporter (like JMX).
*/
void close();
/**
* Checks whether this MetricGroup has been closed.
* @return True if the group has been closed, false is the group is still open.
*/
boolean isClosed();
// ------------------------------------------------------------------------ // ------------------------------------------------------------------------
// Metrics // Metrics
// ------------------------------------------------------------------------ // ------------------------------------------------------------------------
......
...@@ -49,6 +49,11 @@ import static org.apache.flink.util.Preconditions.checkNotNull; ...@@ -49,6 +49,11 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* be strictly from parent group to subgroup. If at any point, a subgroup holds its group * be strictly from parent group to subgroup. If at any point, a subgroup holds its group
* lock and calls a parent method that also acquires the lock, it will create a deadlock * lock and calls a parent method that also acquires the lock, it will create a deadlock
* condition. * condition.
*
* <p>An AbstractMetricGroup can be {@link #close() closed}. Upon closing, the group de-register all metrics
* from any metrics reporter and any internal maps. Note that even closed metrics groups
* return Counters, Gauges, etc to the code, to prevent exceptions in the monitored code.
* These metrics simply do not get reported any more, when created on a closed group.
*/ */
@Internal @Internal
public abstract class AbstractMetricGroup implements MetricGroup { public abstract class AbstractMetricGroup implements MetricGroup {
...@@ -65,7 +70,7 @@ public abstract class AbstractMetricGroup implements MetricGroup { ...@@ -65,7 +70,7 @@ public abstract class AbstractMetricGroup implements MetricGroup {
private final Map<String, Metric> metrics = new HashMap<>(); private final Map<String, Metric> metrics = new HashMap<>();
/** All metric subgroups of this group */ /** All metric subgroups of this group */
private final Map<String, MetricGroup> groups = new HashMap<>(); private final Map<String, AbstractMetricGroup> groups = new HashMap<>();
/** The metrics scope represented by this group. /** The metrics scope represented by this group.
* For example ["host-7", "taskmanager-2", "window_word_count", "my-mapper" ]. */ * For example ["host-7", "taskmanager-2", "window_word_count", "my-mapper" ]. */
...@@ -132,14 +137,13 @@ public abstract class AbstractMetricGroup implements MetricGroup { ...@@ -132,14 +137,13 @@ public abstract class AbstractMetricGroup implements MetricGroup {
// Closing // Closing
// ------------------------------------------------------------------------ // ------------------------------------------------------------------------
@Override
public void close() { public void close() {
synchronized (this) { synchronized (this) {
if (!closed) { if (!closed) {
closed = true; closed = true;
// close all subgroups // close all subgroups
for (MetricGroup group : groups.values()) { for (AbstractMetricGroup group : groups.values()) {
group.close(); group.close();
} }
groups.clear(); groups.clear();
...@@ -153,7 +157,6 @@ public abstract class AbstractMetricGroup implements MetricGroup { ...@@ -153,7 +157,6 @@ public abstract class AbstractMetricGroup implements MetricGroup {
} }
} }
@Override
public final boolean isClosed() { public final boolean isClosed() {
return closed; return closed;
} }
...@@ -267,8 +270,8 @@ public abstract class AbstractMetricGroup implements MetricGroup { ...@@ -267,8 +270,8 @@ public abstract class AbstractMetricGroup implements MetricGroup {
name + "'. Metric might not get properly reported. (" + scopeString + ')'); name + "'. Metric might not get properly reported. (" + scopeString + ')');
} }
MetricGroup newGroup = new GenericMetricGroup(registry, this, name); AbstractMetricGroup newGroup = new GenericMetricGroup(registry, this, name);
MetricGroup prior = groups.put(name, newGroup); AbstractMetricGroup prior = groups.put(name, newGroup);
if (prior == null) { if (prior == null) {
// no prior group with that name // no prior group with that name
return newGroup; return newGroup;
......
...@@ -38,16 +38,6 @@ public class ProxyMetricGroup<P extends MetricGroup> implements MetricGroup { ...@@ -38,16 +38,6 @@ public class ProxyMetricGroup<P extends MetricGroup> implements MetricGroup {
this.parentMetricGroup = Preconditions.checkNotNull(parentMetricGroup); this.parentMetricGroup = Preconditions.checkNotNull(parentMetricGroup);
} }
@Override
public final void close() {
// don't close the parent metric group because it can also contain other metrics
}
@Override
public final boolean isClosed() {
return parentMetricGroup.isClosed();
}
@Override @Override
public final Counter counter(int name) { public final Counter counter(int name) {
return parentMetricGroup.counter(name); return parentMetricGroup.counter(name);
......
...@@ -28,20 +28,10 @@ import org.apache.flink.metrics.SimpleCounter; ...@@ -28,20 +28,10 @@ import org.apache.flink.metrics.SimpleCounter;
/** /**
* A special {@link MetricGroup} that does not register any metrics at the metrics registry * A special {@link MetricGroup} that does not register any metrics at the metrics registry
* and any reporters. * and any reporters.
*
* <p>This metrics group appears always closed ({@link #isClosed()}).
*/ */
@Internal @Internal
public class UnregisteredMetricsGroup implements MetricGroup { public class UnregisteredMetricsGroup implements MetricGroup {
@Override
public void close() {}
@Override
public boolean isClosed() {
return true;
}
@Override @Override
public Counter counter(int name) { public Counter counter(int name) {
return new SimpleCounter(); return new SimpleCounter();
......
...@@ -131,7 +131,7 @@ public class MetricRegistryTest extends TestLogger { ...@@ -131,7 +131,7 @@ public class MetricRegistryTest extends TestLogger {
MetricRegistry registry = new MetricRegistry(config); MetricRegistry registry = new MetricRegistry(config);
MetricGroup root = new TaskManagerMetricGroup(registry, "host", "id"); TaskManagerMetricGroup root = new TaskManagerMetricGroup(registry, "host", "id");
root.counter("rootCounter"); root.counter("rootCounter");
root.close(); root.close();
......
...@@ -26,7 +26,6 @@ import org.apache.flink.metrics.MetricRegistry; ...@@ -26,7 +26,6 @@ import org.apache.flink.metrics.MetricRegistry;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import static org.junit.Assert.*; import static org.junit.Assert.*;
...@@ -88,7 +87,7 @@ public class MetricGroupTest { ...@@ -88,7 +87,7 @@ public class MetricGroupTest {
group.close(); group.close();
assertTrue(group.isClosed()); assertTrue(group.isClosed());
MetricGroup subgroup = group.addGroup("test subgroup"); AbstractMetricGroup subgroup = (AbstractMetricGroup) group.addGroup("test subgroup");
assertTrue(subgroup.isClosed()); assertTrue(subgroup.isClosed());
} }
......
...@@ -131,15 +131,11 @@ public class TaskMetricGroupTest { ...@@ -131,15 +131,11 @@ public class TaskMetricGroupTest {
TaskManagerJobMetricGroup taskManagerJobMetricGroup = new TaskManagerJobMetricGroup(registry, taskManagerMetricGroup, new JobID(), "job"); TaskManagerJobMetricGroup taskManagerJobMetricGroup = new TaskManagerJobMetricGroup(registry, taskManagerMetricGroup, new JobID(), "job");
TaskMetricGroup taskMetricGroup = new TaskMetricGroup(registry, taskManagerJobMetricGroup, new AbstractID(), new AbstractID(), "task", 0, 0); TaskMetricGroup taskMetricGroup = new TaskMetricGroup(registry, taskManagerJobMetricGroup, new AbstractID(), new AbstractID(), "task", 0, 0);
IOMetricGroup ioMetricGroup = taskMetricGroup.getIOMetricGroup();
// the io metric should have registered predefined metrics // the io metric should have registered predefined metrics
assertTrue(registry.getNumberRegisteredMetrics() > 0); assertTrue(registry.getNumberRegisteredMetrics() > 0);
taskMetricGroup.close(); taskMetricGroup.close();
assertTrue(ioMetricGroup.isClosed());
// now alle registered metrics should have been unregistered // now alle registered metrics should have been unregistered
assertEquals(0, registry.getNumberRegisteredMetrics()); assertEquals(0, registry.getNumberRegisteredMetrics());
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册