提交 784dbbee 编写于 作者: T Tony Wei 提交者: zentol

[FLINK-7692][metrics] Support user-defined variables

This closes #5115.
上级 30fc069a
......@@ -424,14 +424,17 @@ class MyMapper extends RichMapFunction[Long,Long] {
## Scope
Every metric is assigned an identifier under which it will be reported that is based on 3 components: the user-provided name when registering the metric, an optional user-defined scope and a system-provided scope.
Every metric is assigned an identifier and a set of key-value pairs under which the metric will be reported.
THe identifier is based on 3 components: the user-defined name when registering the metric, an optional user-defined scope and a system-provided scope.
For example, if `A.B` is the system scope, `C.D` the user scope and `E` the name, then the identifier for the metric will be `A.B.C.D.E`.
You can configure which delimiter to use for the identifier (default: `.`) by setting the `metrics.scope.delimiter` key in `conf/flink-conf.yaml`.
### User Scope
You can define a user scope by calling either `MetricGroup#addGroup(String name)` or `MetricGroup#addGroup(int name)`.
You can define a user scope by calling `MetricGroup#addGroup(String name)`, `MetricGroup#addGroup(int name)` or `Metric#addGroup(String key, String value)`.
These methods affect what `MetricGroup#getMetricIdentifier` and `MetricGroup#getScopeComponents` return.
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
......@@ -442,6 +445,11 @@ counter = getRuntimeContext()
.addGroup("MyMetrics")
.counter("myCounter");
counter = getRuntimeContext()
.getMetricGroup()
.addGroup("MyMetricsKey", "MyMetricsValue")
.counter("myCounter");
{% endhighlight %}
</div>
......@@ -453,6 +461,11 @@ counter = getRuntimeContext()
.addGroup("MyMetrics")
.counter("myCounter")
counter = getRuntimeContext()
.getMetricGroup()
.addGroup("MyMetricsKey", "MyMetricsValue")
.counter("myCounter")
{% endhighlight %}
</div>
......@@ -508,6 +521,40 @@ or by assigning unique names to jobs and operators.
**Important:** For the Batch API, &lt;operator_id&gt; is always equal to &lt;task_id&gt;.
### User Variables
You can define a user variable by calling `MetricGroup#addGroup(String key, String value)`.
This method affects what `MetricGroup#getMetricIdentifier`, `MetricGroup#getScopeComponents` and `MetricGroup#getAllVariables()` returns.
**Important:** User variables cannot be used in scope formats.
{% highlight java %}
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
counter = getRuntimeContext()
.getMetricGroup()
.addGroup("MyMetricsKey", "MyMetricsValue")
.counter("myCounter");
{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
{% highlight scala %}
counter = getRuntimeContext()
.getMetricGroup()
.addGroup("MyMetricsKey", "MyMetricsValue")
.counter("myCounter")
{% endhighlight %}
</div>
</div>
## Reporter
Metrics can be exposed to an external system by configuring one or several reporters in `conf/flink-conf.yaml`. These
......
......@@ -150,6 +150,19 @@ public interface MetricGroup {
*/
MetricGroup addGroup(String name);
/**
* Creates a new key-value MetricGroup pair. The key group is added to this groups sub-groups, while the value group
* is added to the key group's sub-groups. This method returns the value group.
*
* <p>The only difference between calling this method and {@code group.addGroup(key).addGroup(value)} is that
* {@link #getAllVariables()} of the value group return an additional {@code "<key>"="value"} pair.
*
* @param key name of the first group
* @param value name of the second group
* @return the second created group
*/
MetricGroup addGroup(String key, String value);
// ------------------------------------------------------------------------
// Scope
// ------------------------------------------------------------------------
......
......@@ -95,6 +95,11 @@ public class UnregisteredMetricsGroup implements MetricGroup {
return new UnregisteredMetricsGroup();
}
@Override
public MetricGroup addGroup(String key, String value) {
return new UnregisteredMetricsGroup();
}
@Override
public String[] getScopeComponents() {
return new String[0];
......
......@@ -113,10 +113,10 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl
if (variables == null) { // avoid synchronization for common case
synchronized (this) {
if (variables == null) {
if (parent != null) {
variables = parent.getAllVariables();
} else { // this case should only be true for mock groups
variables = new HashMap<>();
variables = new HashMap<>();
putVariables(variables);
if (parent != null) { // not true for Job-/TaskManagerMetricGroup and mocks
variables.putAll(parent.getAllVariables());
}
}
}
......@@ -124,6 +124,14 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl
return variables;
}
/**
* Enters all variables specific to this {@link AbstractMetricGroup} and their associated values into the map.
*
* @param variables map to enter variables and their values into
*/
protected void putVariables(Map<String, String> variables) {
}
/**
* Returns the logical scope of this group, for example
* {@code "taskmanager.job.task"}.
......@@ -388,11 +396,20 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl
@Override
public MetricGroup addGroup(int name) {
return addGroup(String.valueOf(name));
return addGroup(String.valueOf(name), ChildType.GENERIC);
}
@Override
public MetricGroup addGroup(String name) {
return addGroup(name, ChildType.GENERIC);
}
@Override
public MetricGroup addGroup(String key, String value) {
return addGroup(key, ChildType.KEY).addGroup(value, ChildType.VALUE);
}
private AbstractMetricGroup<?> addGroup(String name, ChildType childType) {
synchronized (this) {
if (!closed) {
// adding a group with the same name as a metric creates problems in many reporters/dashboards
......@@ -403,7 +420,7 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl
name + "'. Metric might not get properly reported. " + Arrays.toString(scopeComponents));
}
AbstractMetricGroup newGroup = new GenericMetricGroup(registry, this, name);
AbstractMetricGroup newGroup = createChildGroup(name, childType);
AbstractMetricGroup prior = groups.put(name, newGroup);
if (prior == null) {
// no prior group with that name
......@@ -422,4 +439,25 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl
}
}
}
protected GenericMetricGroup createChildGroup(String name, ChildType childType) {
switch (childType) {
case KEY:
return new GenericKeyMetricGroup(registry, this, name);
default:
return new GenericMetricGroup(registry, this, name);
}
}
/**
* Enum for indicating which child group should be created.
* `KEY` is used to create {@link GenericKeyMetricGroup}.
* `VALUE` is used to create {@link GenericValueMetricGroup}.
* `GENERIC` is used to create {@link GenericMetricGroup}.
*/
protected enum ChildType {
KEY,
VALUE,
GENERIC
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.metrics.groups;
import org.apache.flink.annotation.Internal;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.metrics.MetricRegistry;
/**
* A {@link GenericMetricGroup} for representing the key part of a key-value metric group pair.
*
* @see GenericValueMetricGroup
* @see MetricGroup#addGroup(String, String)
*/
@Internal
public class GenericKeyMetricGroup extends GenericMetricGroup {
GenericKeyMetricGroup(MetricRegistry registry, AbstractMetricGroup parent, String name) {
super(registry, parent, name);
}
@Override
public MetricGroup addGroup(String key, String value) {
return addGroup(key).addGroup(value);
}
@Override
protected GenericMetricGroup createChildGroup(String name, ChildType childType) {
switch (childType) {
case VALUE:
return new GenericValueMetricGroup(registry, this, name);
default:
return new GenericMetricGroup(registry, this, name);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.metrics.groups;
import org.apache.flink.annotation.Internal;
import org.apache.flink.metrics.CharacterFilter;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.scope.ScopeFormat;
import java.util.Map;
/**
* A {@link GenericMetricGroup} for representing the value part of a key-value metric group pair.
*
* @see GenericKeyMetricGroup
* @see MetricGroup#addGroup(String, String)
*/
@Internal
public class GenericValueMetricGroup extends GenericMetricGroup {
private String key;
private final String value;
GenericValueMetricGroup(MetricRegistry registry, GenericKeyMetricGroup parent, String value) {
super(registry, parent, value);
this.key = parent.getGroupName(name -> name);
this.value = value;
}
// ------------------------------------------------------------------------
@Override
protected void putVariables(Map<String, String> variables) {
variables.put(ScopeFormat.asVariable(this.key), value);
}
@Override
public String getLogicalScope(CharacterFilter filter, char delimiter) {
return parent.getLogicalScope(filter, delimiter);
}
}
......@@ -102,6 +102,11 @@ public class ProxyMetricGroup<P extends MetricGroup> implements MetricGroup {
return parentMetricGroup.addGroup(name);
}
@Override
public final MetricGroup addGroup(String key, String value) {
return parentMetricGroup.addGroup(key, value);
}
@Override
public String[] getScopeComponents() {
return parentMetricGroup.getScopeComponents();
......
......@@ -384,6 +384,11 @@ public class CheckpointStatsTrackerTest {
throw new UnsupportedOperationException("Not expected in this test");
}
@Override
public MetricGroup addGroup(String key, String value) {
throw new UnsupportedOperationException("Not expected in this test");
}
@Override
public String[] getScopeComponents() {
throw new UnsupportedOperationException("Not expected in this test");
......
......@@ -27,7 +27,9 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
import org.apache.flink.runtime.metrics.MetricRegistryImpl;
import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
import org.apache.flink.runtime.metrics.scope.ScopeFormat;
import org.apache.flink.runtime.metrics.util.DummyCharacterFilter;
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.TestLogger;
......@@ -39,6 +41,7 @@ import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
......@@ -78,6 +81,141 @@ public class MetricGroupTest extends TestLogger {
assertTrue(subgroup1 == subgroup2);
}
/**
* Verifies the basic behavior when defining user-defined variables.
*/
@Test
public void testUserDefinedVariable() {
MetricRegistry registry = new NoOpMetricRegistry();
GenericMetricGroup root = new GenericMetricGroup(registry, new DummyAbstractMetricGroup(registry), "root");
String key = "key";
String value = "value";
MetricGroup group = root.addGroup(key, value);
String variableValue = group.getAllVariables().get(ScopeFormat.asVariable("key"));
assertEquals(value, variableValue);
String identifier = group.getMetricIdentifier("metric");
assertTrue("Key is missing from metric identifier.", identifier.contains("key"));
assertTrue("Value is missing from metric identifier.", identifier.contains("value"));
String logicalScope = ((AbstractMetricGroup) group).getLogicalScope(new DummyCharacterFilter());
assertTrue("Key is missing from logical scope.", logicalScope.contains(key));
assertFalse("Value is present in logical scope.", logicalScope.contains(value));
}
/**
* Verifies that calling {@link MetricGroup#addGroup(String, String)} on a {@link GenericKeyMetricGroup} goes
* through the generic code path.
*/
@Test
public void testUserDefinedVariableOnKeyGroup() {
MetricRegistry registry = new NoOpMetricRegistry();
GenericMetricGroup root = new GenericMetricGroup(registry, new DummyAbstractMetricGroup(registry), "root");
String key1 = "key1";
String value1 = "value1";
root.addGroup(key1, value1);
String key2 = "key2";
String value2 = "value2";
MetricGroup group = root.addGroup(key1).addGroup(key2, value2);
String variableValue = group.getAllVariables().get("value2");
assertNull(variableValue);
String identifier = group.getMetricIdentifier("metric");
assertTrue("Key1 is missing from metric identifier.", identifier.contains("key1"));
assertTrue("Key2 is missing from metric identifier.", identifier.contains("key2"));
assertTrue("Value2 is missing from metric identifier.", identifier.contains("value2"));
String logicalScope = ((AbstractMetricGroup) group).getLogicalScope(new DummyCharacterFilter());
assertTrue("Key1 is missing from logical scope.", logicalScope.contains(key1));
assertTrue("Key2 is missing from logical scope.", logicalScope.contains(key2));
assertTrue("Value2 is missing from logical scope.", logicalScope.contains(value2));
}
/**
* Verifies that calling {@link MetricGroup#addGroup(String, String)} if a generic group with the key name already
* exists goes through the generic code path.
*/
@Test
public void testNameCollisionForKeyAfterGenericGroup() {
MetricRegistry registry = new NoOpMetricRegistry();
GenericMetricGroup root = new GenericMetricGroup(registry, new DummyAbstractMetricGroup(registry), "root");
String key = "key";
String value = "value";
root.addGroup(key);
MetricGroup group = root.addGroup(key, value);
String variableValue = group.getAllVariables().get(ScopeFormat.asVariable("key"));
assertNull(variableValue);
String identifier = group.getMetricIdentifier("metric");
assertTrue("Key is missing from metric identifier.", identifier.contains("key"));
assertTrue("Value is missing from metric identifier.", identifier.contains("value"));
String logicalScope = ((AbstractMetricGroup) group).getLogicalScope(new DummyCharacterFilter());
assertTrue("Key is missing from logical scope.", logicalScope.contains(key));
assertTrue("Value is missing from logical scope.", logicalScope.contains(value));
}
/**
* Verifies that calling {@link MetricGroup#addGroup(String, String)} if a generic group with the key and value name
* already exists goes through the generic code path.
*/
@Test
public void testNameCollisionForKeyAndValueAfterGenericGroup() {
MetricRegistry registry = new NoOpMetricRegistry();
GenericMetricGroup root = new GenericMetricGroup(registry, new DummyAbstractMetricGroup(registry), "root");
String key = "key";
String value = "value";
root.addGroup(key).addGroup(value);
MetricGroup group = root.addGroup(key, value);
String variableValue = group.getAllVariables().get(ScopeFormat.asVariable("key"));
assertNull(variableValue);
String identifier = group.getMetricIdentifier("metric");
assertTrue("Key is missing from metric identifier.", identifier.contains("key"));
assertTrue("Value is missing from metric identifier.", identifier.contains("value"));
String logicalScope = ((AbstractMetricGroup) group).getLogicalScope(new DummyCharacterFilter());
assertTrue("Key is missing from logical scope.", logicalScope.contains(key));
assertTrue("Value is missing from logical scope.", logicalScope.contains(value));
}
/**
* Verifies that existing key/value groups are returned when calling {@link MetricGroup#addGroup(String)}.
*/
@Test
public void testNameCollisionAfterKeyValueGroup() {
MetricRegistry registry = new NoOpMetricRegistry();
GenericMetricGroup root = new GenericMetricGroup(registry, new DummyAbstractMetricGroup(registry), "root");
String key = "key";
String value = "value";
root.addGroup(key, value);
MetricGroup group = root.addGroup(key).addGroup(value);
String variableValue = group.getAllVariables().get(ScopeFormat.asVariable("key"));
assertEquals(value, variableValue);
String identifier = group.getMetricIdentifier("metric");
assertTrue("Key is missing from metric identifier.", identifier.contains("key"));
assertTrue("Value is missing from metric identifier.", identifier.contains("value"));
String logicalScope = ((AbstractMetricGroup) group).getLogicalScope(new DummyCharacterFilter());
assertTrue("Key is missing from logical scope.", logicalScope.contains(key));
assertFalse("Value is present in logical scope.", logicalScope.contains(value));
}
@Test
public void closedGroupDoesNotRegisterMetrics() {
GenericMetricGroup group = new GenericMetricGroup(
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册