diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md index 7963e1a015468aaab10cd03e3b0662232f280a5f..e4b5161010f7fe0160a504e5f18315cce8e72540 100644 --- a/docs/monitoring/metrics.md +++ b/docs/monitoring/metrics.md @@ -424,14 +424,17 @@ class MyMapper extends RichMapFunction[Long,Long] { ## Scope -Every metric is assigned an identifier under which it will be reported that is based on 3 components: the user-provided name when registering the metric, an optional user-defined scope and a system-provided scope. +Every metric is assigned an identifier and a set of key-value pairs under which the metric will be reported. + +THe identifier is based on 3 components: the user-defined name when registering the metric, an optional user-defined scope and a system-provided scope. For example, if `A.B` is the system scope, `C.D` the user scope and `E` the name, then the identifier for the metric will be `A.B.C.D.E`. You can configure which delimiter to use for the identifier (default: `.`) by setting the `metrics.scope.delimiter` key in `conf/flink-conf.yaml`. ### User Scope -You can define a user scope by calling either `MetricGroup#addGroup(String name)` or `MetricGroup#addGroup(int name)`. +You can define a user scope by calling `MetricGroup#addGroup(String name)`, `MetricGroup#addGroup(int name)` or `Metric#addGroup(String key, String value)`. +These methods affect what `MetricGroup#getMetricIdentifier` and `MetricGroup#getScopeComponents` return.
@@ -442,6 +445,11 @@ counter = getRuntimeContext() .addGroup("MyMetrics") .counter("myCounter"); +counter = getRuntimeContext() + .getMetricGroup() + .addGroup("MyMetricsKey", "MyMetricsValue") + .counter("myCounter"); + {% endhighlight %}
@@ -453,6 +461,11 @@ counter = getRuntimeContext() .addGroup("MyMetrics") .counter("myCounter") +counter = getRuntimeContext() + .getMetricGroup() + .addGroup("MyMetricsKey", "MyMetricsValue") + .counter("myCounter") + {% endhighlight %}
@@ -508,6 +521,40 @@ or by assigning unique names to jobs and operators. **Important:** For the Batch API, <operator_id> is always equal to <task_id>. +### User Variables + +You can define a user variable by calling `MetricGroup#addGroup(String key, String value)`. +This method affects what `MetricGroup#getMetricIdentifier`, `MetricGroup#getScopeComponents` and `MetricGroup#getAllVariables()` returns. + +**Important:** User variables cannot be used in scope formats. + +{% highlight java %} + +
+
+{% highlight java %} + +counter = getRuntimeContext() + .getMetricGroup() + .addGroup("MyMetricsKey", "MyMetricsValue") + .counter("myCounter"); + +{% endhighlight %} +
+ +
+{% highlight scala %} + +counter = getRuntimeContext() + .getMetricGroup() + .addGroup("MyMetricsKey", "MyMetricsValue") + .counter("myCounter") + +{% endhighlight %} +
+ +
+ ## Reporter Metrics can be exposed to an external system by configuring one or several reporters in `conf/flink-conf.yaml`. These diff --git a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MetricGroup.java b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MetricGroup.java index 39ab3b60f2c934050f623f2d27c4f6bfa6e4c0d3..f1f19813b1658b884c92bee39b41c8f52d4bfa5c 100644 --- a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MetricGroup.java +++ b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MetricGroup.java @@ -150,6 +150,19 @@ public interface MetricGroup { */ MetricGroup addGroup(String name); + /** + * Creates a new key-value MetricGroup pair. The key group is added to this groups sub-groups, while the value group + * is added to the key group's sub-groups. This method returns the value group. + * + *

The only difference between calling this method and {@code group.addGroup(key).addGroup(value)} is that + * {@link #getAllVariables()} of the value group return an additional {@code ""="value"} pair. + * + * @param key name of the first group + * @param value name of the second group + * @return the second created group + */ + MetricGroup addGroup(String key, String value); + // ------------------------------------------------------------------------ // Scope // ------------------------------------------------------------------------ diff --git a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java index ea11b436027411a305a9fb835f594d689b7db026..e004124f491832c0a1742ccb37ac03cebace9093 100644 --- a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java +++ b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java @@ -95,6 +95,11 @@ public class UnregisteredMetricsGroup implements MetricGroup { return new UnregisteredMetricsGroup(); } + @Override + public MetricGroup addGroup(String key, String value) { + return new UnregisteredMetricsGroup(); + } + @Override public String[] getScopeComponents() { return new String[0]; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java index 66eace588582a9382dc8e8a0a1b9c45e12b05e29..e6df3a47c0c884947b69791dca19e18192005f63 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java @@ -113,10 +113,10 @@ public abstract class AbstractMetricGroup> impl if (variables == null) { // avoid synchronization for common case synchronized (this) { if (variables == null) { - if (parent != null) { - variables = parent.getAllVariables(); - } else { // this case should only be true for mock groups - variables = new HashMap<>(); + variables = new HashMap<>(); + putVariables(variables); + if (parent != null) { // not true for Job-/TaskManagerMetricGroup and mocks + variables.putAll(parent.getAllVariables()); } } } @@ -124,6 +124,14 @@ public abstract class AbstractMetricGroup> impl return variables; } + /** + * Enters all variables specific to this {@link AbstractMetricGroup} and their associated values into the map. + * + * @param variables map to enter variables and their values into + */ + protected void putVariables(Map variables) { + } + /** * Returns the logical scope of this group, for example * {@code "taskmanager.job.task"}. @@ -388,11 +396,20 @@ public abstract class AbstractMetricGroup> impl @Override public MetricGroup addGroup(int name) { - return addGroup(String.valueOf(name)); + return addGroup(String.valueOf(name), ChildType.GENERIC); } @Override public MetricGroup addGroup(String name) { + return addGroup(name, ChildType.GENERIC); + } + + @Override + public MetricGroup addGroup(String key, String value) { + return addGroup(key, ChildType.KEY).addGroup(value, ChildType.VALUE); + } + + private AbstractMetricGroup addGroup(String name, ChildType childType) { synchronized (this) { if (!closed) { // adding a group with the same name as a metric creates problems in many reporters/dashboards @@ -403,7 +420,7 @@ public abstract class AbstractMetricGroup> impl name + "'. Metric might not get properly reported. " + Arrays.toString(scopeComponents)); } - AbstractMetricGroup newGroup = new GenericMetricGroup(registry, this, name); + AbstractMetricGroup newGroup = createChildGroup(name, childType); AbstractMetricGroup prior = groups.put(name, newGroup); if (prior == null) { // no prior group with that name @@ -422,4 +439,25 @@ public abstract class AbstractMetricGroup> impl } } } + + protected GenericMetricGroup createChildGroup(String name, ChildType childType) { + switch (childType) { + case KEY: + return new GenericKeyMetricGroup(registry, this, name); + default: + return new GenericMetricGroup(registry, this, name); + } + } + + /** + * Enum for indicating which child group should be created. + * `KEY` is used to create {@link GenericKeyMetricGroup}. + * `VALUE` is used to create {@link GenericValueMetricGroup}. + * `GENERIC` is used to create {@link GenericMetricGroup}. + */ + protected enum ChildType { + KEY, + VALUE, + GENERIC + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/GenericKeyMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/GenericKeyMetricGroup.java new file mode 100644 index 0000000000000000000000000000000000000000..09119355bcb8a9790eae478fc739c33a77407d63 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/GenericKeyMetricGroup.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.metrics.groups; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.metrics.MetricRegistry; + +/** + * A {@link GenericMetricGroup} for representing the key part of a key-value metric group pair. + * + * @see GenericValueMetricGroup + * @see MetricGroup#addGroup(String, String) + */ +@Internal +public class GenericKeyMetricGroup extends GenericMetricGroup { + + GenericKeyMetricGroup(MetricRegistry registry, AbstractMetricGroup parent, String name) { + super(registry, parent, name); + } + + @Override + public MetricGroup addGroup(String key, String value) { + return addGroup(key).addGroup(value); + } + + @Override + protected GenericMetricGroup createChildGroup(String name, ChildType childType) { + switch (childType) { + case VALUE: + return new GenericValueMetricGroup(registry, this, name); + default: + return new GenericMetricGroup(registry, this, name); + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/GenericValueMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/GenericValueMetricGroup.java new file mode 100644 index 0000000000000000000000000000000000000000..ef8e6e8bee8a2c7d2a97fd0d61ae2a43caa74aa7 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/GenericValueMetricGroup.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.metrics.groups; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.metrics.CharacterFilter; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.scope.ScopeFormat; + +import java.util.Map; + +/** + * A {@link GenericMetricGroup} for representing the value part of a key-value metric group pair. + * + * @see GenericKeyMetricGroup + * @see MetricGroup#addGroup(String, String) + */ +@Internal +public class GenericValueMetricGroup extends GenericMetricGroup { + private String key; + private final String value; + + GenericValueMetricGroup(MetricRegistry registry, GenericKeyMetricGroup parent, String value) { + super(registry, parent, value); + this.key = parent.getGroupName(name -> name); + this.value = value; + } + + // ------------------------------------------------------------------------ + + @Override + protected void putVariables(Map variables) { + variables.put(ScopeFormat.asVariable(this.key), value); + } + + @Override + public String getLogicalScope(CharacterFilter filter, char delimiter) { + return parent.getLogicalScope(filter, delimiter); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ProxyMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ProxyMetricGroup.java index 2d499133f931b25bf288afe0073b43443c73910f..ea1ba418b1af8417802dbb9215cd5ca4e8c40acf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ProxyMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ProxyMetricGroup.java @@ -102,6 +102,11 @@ public class ProxyMetricGroup

implements MetricGroup { return parentMetricGroup.addGroup(name); } + @Override + public final MetricGroup addGroup(String key, String value) { + return parentMetricGroup.addGroup(key, value); + } + @Override public String[] getScopeComponents() { return parentMetricGroup.getScopeComponents(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java index a47b497b4859f13886b86bc5736fbc741493ac13..e7a1d7b20e9ae49e71be7fe176c6ccfc93960762 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java @@ -384,6 +384,11 @@ public class CheckpointStatsTrackerTest { throw new UnsupportedOperationException("Not expected in this test"); } + @Override + public MetricGroup addGroup(String key, String value) { + throw new UnsupportedOperationException("Not expected in this test"); + } + @Override public String[] getScopeComponents() { throw new UnsupportedOperationException("Not expected in this test"); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java index 94760e614cb8ab33013efb4e69bc779aaf979515..0fced3316f96d714877c01dc8c425b98e3b3c1bb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java @@ -27,7 +27,9 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; import org.apache.flink.runtime.metrics.MetricRegistryImpl; +import org.apache.flink.runtime.metrics.NoOpMetricRegistry; import org.apache.flink.runtime.metrics.dump.QueryScopeInfo; +import org.apache.flink.runtime.metrics.scope.ScopeFormat; import org.apache.flink.runtime.metrics.util.DummyCharacterFilter; import org.apache.flink.util.AbstractID; import org.apache.flink.util.TestLogger; @@ -39,6 +41,7 @@ import org.junit.Test; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -78,6 +81,141 @@ public class MetricGroupTest extends TestLogger { assertTrue(subgroup1 == subgroup2); } + /** + * Verifies the basic behavior when defining user-defined variables. + */ + @Test + public void testUserDefinedVariable() { + MetricRegistry registry = new NoOpMetricRegistry(); + GenericMetricGroup root = new GenericMetricGroup(registry, new DummyAbstractMetricGroup(registry), "root"); + + String key = "key"; + String value = "value"; + MetricGroup group = root.addGroup(key, value); + + String variableValue = group.getAllVariables().get(ScopeFormat.asVariable("key")); + assertEquals(value, variableValue); + + String identifier = group.getMetricIdentifier("metric"); + assertTrue("Key is missing from metric identifier.", identifier.contains("key")); + assertTrue("Value is missing from metric identifier.", identifier.contains("value")); + + String logicalScope = ((AbstractMetricGroup) group).getLogicalScope(new DummyCharacterFilter()); + assertTrue("Key is missing from logical scope.", logicalScope.contains(key)); + assertFalse("Value is present in logical scope.", logicalScope.contains(value)); + } + + /** + * Verifies that calling {@link MetricGroup#addGroup(String, String)} on a {@link GenericKeyMetricGroup} goes + * through the generic code path. + */ + @Test + public void testUserDefinedVariableOnKeyGroup() { + MetricRegistry registry = new NoOpMetricRegistry(); + GenericMetricGroup root = new GenericMetricGroup(registry, new DummyAbstractMetricGroup(registry), "root"); + + String key1 = "key1"; + String value1 = "value1"; + root.addGroup(key1, value1); + + String key2 = "key2"; + String value2 = "value2"; + MetricGroup group = root.addGroup(key1).addGroup(key2, value2); + + String variableValue = group.getAllVariables().get("value2"); + assertNull(variableValue); + + String identifier = group.getMetricIdentifier("metric"); + assertTrue("Key1 is missing from metric identifier.", identifier.contains("key1")); + assertTrue("Key2 is missing from metric identifier.", identifier.contains("key2")); + assertTrue("Value2 is missing from metric identifier.", identifier.contains("value2")); + + String logicalScope = ((AbstractMetricGroup) group).getLogicalScope(new DummyCharacterFilter()); + assertTrue("Key1 is missing from logical scope.", logicalScope.contains(key1)); + assertTrue("Key2 is missing from logical scope.", logicalScope.contains(key2)); + assertTrue("Value2 is missing from logical scope.", logicalScope.contains(value2)); + } + + /** + * Verifies that calling {@link MetricGroup#addGroup(String, String)} if a generic group with the key name already + * exists goes through the generic code path. + */ + @Test + public void testNameCollisionForKeyAfterGenericGroup() { + MetricRegistry registry = new NoOpMetricRegistry(); + GenericMetricGroup root = new GenericMetricGroup(registry, new DummyAbstractMetricGroup(registry), "root"); + + String key = "key"; + String value = "value"; + + root.addGroup(key); + MetricGroup group = root.addGroup(key, value); + + String variableValue = group.getAllVariables().get(ScopeFormat.asVariable("key")); + assertNull(variableValue); + + String identifier = group.getMetricIdentifier("metric"); + assertTrue("Key is missing from metric identifier.", identifier.contains("key")); + assertTrue("Value is missing from metric identifier.", identifier.contains("value")); + + String logicalScope = ((AbstractMetricGroup) group).getLogicalScope(new DummyCharacterFilter()); + assertTrue("Key is missing from logical scope.", logicalScope.contains(key)); + assertTrue("Value is missing from logical scope.", logicalScope.contains(value)); + } + + /** + * Verifies that calling {@link MetricGroup#addGroup(String, String)} if a generic group with the key and value name + * already exists goes through the generic code path. + */ + @Test + public void testNameCollisionForKeyAndValueAfterGenericGroup() { + MetricRegistry registry = new NoOpMetricRegistry(); + GenericMetricGroup root = new GenericMetricGroup(registry, new DummyAbstractMetricGroup(registry), "root"); + + String key = "key"; + String value = "value"; + + root.addGroup(key).addGroup(value); + MetricGroup group = root.addGroup(key, value); + + String variableValue = group.getAllVariables().get(ScopeFormat.asVariable("key")); + assertNull(variableValue); + + String identifier = group.getMetricIdentifier("metric"); + assertTrue("Key is missing from metric identifier.", identifier.contains("key")); + assertTrue("Value is missing from metric identifier.", identifier.contains("value")); + + String logicalScope = ((AbstractMetricGroup) group).getLogicalScope(new DummyCharacterFilter()); + assertTrue("Key is missing from logical scope.", logicalScope.contains(key)); + assertTrue("Value is missing from logical scope.", logicalScope.contains(value)); + } + + /** + * Verifies that existing key/value groups are returned when calling {@link MetricGroup#addGroup(String)}. + */ + @Test + public void testNameCollisionAfterKeyValueGroup() { + MetricRegistry registry = new NoOpMetricRegistry(); + GenericMetricGroup root = new GenericMetricGroup(registry, new DummyAbstractMetricGroup(registry), "root"); + + String key = "key"; + String value = "value"; + + root.addGroup(key, value); + MetricGroup group = root.addGroup(key).addGroup(value); + + String variableValue = group.getAllVariables().get(ScopeFormat.asVariable("key")); + assertEquals(value, variableValue); + + String identifier = group.getMetricIdentifier("metric"); + assertTrue("Key is missing from metric identifier.", identifier.contains("key")); + assertTrue("Value is missing from metric identifier.", identifier.contains("value")); + + String logicalScope = ((AbstractMetricGroup) group).getLogicalScope(new DummyCharacterFilter()); + assertTrue("Key is missing from logical scope.", logicalScope.contains(key)); + assertFalse("Value is present in logical scope.", logicalScope.contains(value)); + } + @Test public void closedGroupDoesNotRegisterMetrics() { GenericMetricGroup group = new GenericMetricGroup(