diff --git a/flink-metrics/flink-metrics-jmx/pom.xml b/flink-metrics/flink-metrics-jmx/pom.xml index 626f063f6a1d8621942f400a5d903ed584e98b55..8ee983bb4ba562046349908ec0e58563ffefe668 100644 --- a/flink-metrics/flink-metrics-jmx/pom.xml +++ b/flink-metrics/flink-metrics-jmx/pom.xml @@ -49,20 +49,20 @@ under the License. org.apache.flink - flink-metrics-core + flink-runtime_2.10 ${project.version} provided - - org.apache.flink - flink-runtime_2.10 + flink-metrics-core ${project.version} - test + provided + + org.apache.flink flink-runtime_2.10 diff --git a/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporter.java b/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporter.java index 39a5aa29bf6c15fb24e175c7c1ed77252966798e..9c1fabb689313284cf745ae74acc7437b7bb69de 100644 --- a/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporter.java +++ b/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporter.java @@ -18,6 +18,7 @@ package org.apache.flink.metrics.jmx; +import org.apache.flink.metrics.CharacterFilter; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.Histogram; @@ -25,8 +26,9 @@ import org.apache.flink.metrics.Meter; import org.apache.flink.metrics.Metric; import org.apache.flink.metrics.MetricConfig; import org.apache.flink.metrics.MetricGroup; - import org.apache.flink.metrics.reporter.MetricReporter; +import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup; +import org.apache.flink.runtime.metrics.groups.FrontMetricGroup; import org.apache.flink.util.NetUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,6 +50,7 @@ import java.rmi.registry.LocateRegistry; import java.rmi.registry.Registry; import java.rmi.server.UnicastRemoteObject; import java.util.HashMap; +import java.util.Hashtable; import java.util.Iterator; import java.util.Map; @@ -59,13 +62,19 @@ import java.util.Map; */ public class JMXReporter implements MetricReporter { - private static final String PREFIX = "org.apache.flink.metrics:"; - private static final String KEY_PREFIX = "key"; + static final String JMX_DOMAIN_PREFIX = "org.apache.flink."; public static final String ARG_PORT = "port"; private static final Logger LOG = LoggerFactory.getLogger(JMXReporter.class); + private static final CharacterFilter CHARACTER_FILTER = new CharacterFilter() { + @Override + public String filterCharacters(String input) { + return replaceInvalidChars(input); + } + }; + // ------------------------------------------------------------------------ /** The server where the management beans are registered and deregistered */ @@ -144,14 +153,19 @@ public class JMXReporter implements MetricReporter { @Override public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) { - final String name = generateJmxName(metricName, group.getScopeComponents()); + final String domain = generateJmxDomain(metricName, group); + final Hashtable table = generateJmxTable(group.getAllVariables()); AbstractBean jmxMetric; ObjectName jmxName; try { - jmxName = new ObjectName(name); + jmxName = new ObjectName(domain, table); } catch (MalformedObjectNameException e) { - LOG.error("Metric name did not conform to JMX ObjectName rules: " + name, e); + /** + * There is an implementation error on our side if this occurs. Either the domain was modified and no longer + * conforms to the JMX domain rules or the table wasn't properly generated. + */ + LOG.debug("Implementation error. The domain or table does not conform to JMX rules." , e); return; } @@ -176,12 +190,11 @@ public class JMXReporter implements MetricReporter { } } catch (NotCompliantMBeanException e) { // implementation error on our side - LOG.error("Metric did not comply with JMX MBean naming rules.", e); + LOG.debug("Metric did not comply with JMX MBean rules.", e); } catch (InstanceAlreadyExistsException e) { - LOG.debug("A metric with the name " + jmxName + " was already registered.", e); - LOG.error("A metric with the name " + jmxName + " was already registered."); + LOG.warn("A metric with the name " + jmxName + " was already registered.", e); } catch (Throwable t) { - LOG.error("Failed to register metric", t); + LOG.warn("Failed to register metric", t); } } @@ -209,27 +222,18 @@ public class JMXReporter implements MetricReporter { // Utilities // ------------------------------------------------------------------------ - static String generateJmxName(String metricName, String[] scopeComponents) { - final StringBuilder nameBuilder = new StringBuilder(128); - nameBuilder.append(PREFIX); - - for (int x = 0; x < scopeComponents.length; x++) { - // write keyX= - nameBuilder.append(KEY_PREFIX); - nameBuilder.append(x); - nameBuilder.append("="); - - // write scope component - nameBuilder.append(replaceInvalidChars(scopeComponents[x])); - nameBuilder.append(","); + static Hashtable generateJmxTable(Map variables) { + Hashtable ht = new Hashtable<>(variables.size()); + for (Map.Entry variable : variables.entrySet()) { + ht.put(replaceInvalidChars(variable.getKey()), replaceInvalidChars(variable.getValue())); } + return ht; + } - // write the name - nameBuilder.append("name=").append(replaceInvalidChars(metricName)); - - return nameBuilder.toString(); + static String generateJmxDomain(String metricName, MetricGroup group) { + return JMX_DOMAIN_PREFIX + ((FrontMetricGroup>) group).getLogicalScope(CHARACTER_FILTER, '.') + '.' + metricName; } - + /** * Lightweight method to replace unsupported characters. * If the string does not contain any unsupported characters, this method creates no @@ -251,6 +255,8 @@ public class JMXReporter implements MetricReporter { for (int i = 0; i < strLen; i++) { final char c = str.charAt(i); switch (c) { + case '>': + case '<': case '"': // remove character by not moving cursor if (chars == null) { diff --git a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java index 8968587b64c3f06b19ec534bdc0ba7f52bab7510..1a9628766101d3d812ce1e97ebc8993d162be4fe 100644 --- a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java +++ b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java @@ -25,6 +25,7 @@ import org.apache.flink.metrics.util.TestMeter; import org.apache.flink.metrics.reporter.MetricReporter; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.metrics.groups.FrontMetricGroup; import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; import org.apache.flink.runtime.metrics.util.TestReporter; import org.apache.flink.runtime.metrics.util.TestingHistogram; @@ -40,8 +41,12 @@ import javax.management.remote.JMXConnector; import javax.management.remote.JMXConnectorFactory; import javax.management.remote.JMXServiceURL; import java.lang.management.ManagementFactory; +import java.util.HashMap; +import java.util.Hashtable; import java.util.List; +import java.util.Map; +import static org.apache.flink.metrics.jmx.JMXReporter.JMX_DOMAIN_PREFIX; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -65,14 +70,20 @@ public class JMXReporterTest extends TestLogger { } /** - * Verifies that the JMXReporter properly generates the JMX name. + * Verifies that the JMXReporter properly generates the JMX table. */ @Test - public void testGenerateName() { - String[] scope = {"value0", "value1", "\"value2 (test),=;:?'"}; - String jmxName = JMXReporter.generateJmxName("TestMetric", scope); + public void testGenerateTable() { + Map vars = new HashMap<>(); + vars.put("key0", "value0"); + vars.put("key1", "value1"); + vars.put("\"key2,=;:?'", "\"value2 (test),=;:?'"); - assertEquals("org.apache.flink.metrics:key0=value0,key1=value1,key2=value2_(test)------,name=TestMetric", jmxName); + Hashtable jmxTable = JMXReporter.generateJmxTable(vars); + + assertEquals("value0", jmxTable.get("key0")); + assertEquals("value1", jmxTable.get("key1")); + assertEquals("value2_(test)------", jmxTable.get("key2------")); } /** @@ -102,28 +113,34 @@ public class JMXReporterTest extends TestLogger { MetricReporter rep1 = reporters.get(0); MetricReporter rep2 = reporters.get(1); - rep1.notifyOfAddedMetric(new Gauge() { + Gauge g1 = new Gauge() { @Override public Integer getValue() { return 1; } - }, "rep1", new TaskManagerMetricGroup(reg, "host", "tm")); - - rep2.notifyOfAddedMetric(new Gauge() { + }; + Gauge g2 = new Gauge() { @Override public Integer getValue() { return 2; } - }, "rep2", new TaskManagerMetricGroup(reg, "host", "tm")); + }; + + rep1.notifyOfAddedMetric(g1, "rep1", new FrontMetricGroup<>(0, new TaskManagerMetricGroup(reg, "host", "tm"))); + rep2.notifyOfAddedMetric(g2, "rep2", new FrontMetricGroup<>(0, new TaskManagerMetricGroup(reg, "host", "tm"))); MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); - ObjectName objectName1 = new ObjectName(JMXReporter.generateJmxName("rep1", mg.getScopeComponents())); - ObjectName objectName2 = new ObjectName(JMXReporter.generateJmxName("rep2", mg.getScopeComponents())); + ObjectName objectName1 = new ObjectName(JMX_DOMAIN_PREFIX + "taskmanager.rep1", JMXReporter.generateJmxTable(mg.getAllVariables())); + ObjectName objectName2 = new ObjectName(JMX_DOMAIN_PREFIX + "taskmanager.rep2", JMXReporter.generateJmxTable(mg.getAllVariables())); assertEquals(1, mBeanServer.getAttribute(objectName1, "Value")); assertEquals(2, mBeanServer.getAttribute(objectName2, "Value")); + rep1.notifyOfRemovedMetric(g1, "rep1", null); + rep1.notifyOfRemovedMetric(g2, "rep2", null); + + mg.close(); reg.shutdown(); } @@ -156,22 +173,25 @@ public class JMXReporterTest extends TestLogger { MetricReporter rep1 = reporters.get(0); MetricReporter rep2 = reporters.get(1); - rep1.notifyOfAddedMetric(new Gauge() { + Gauge g1 = new Gauge() { @Override public Integer getValue() { return 1; } - }, "rep1", new TaskManagerMetricGroup(reg, "host", "tm")); - - rep2.notifyOfAddedMetric(new Gauge() { + }; + Gauge g2 = new Gauge() { @Override public Integer getValue() { return 2; } - }, "rep2", new TaskManagerMetricGroup(reg, "host", "tm")); + }; - ObjectName objectName1 = new ObjectName(JMXReporter.generateJmxName("rep1", mg.getScopeComponents())); - ObjectName objectName2 = new ObjectName(JMXReporter.generateJmxName("rep2", mg.getScopeComponents())); + rep1.notifyOfAddedMetric(g1, "rep1", new FrontMetricGroup<>(0, new TaskManagerMetricGroup(reg, "host", "tm"))); + + rep2.notifyOfAddedMetric(g2, "rep2", new FrontMetricGroup<>(1, new TaskManagerMetricGroup(reg, "host", "tm"))); + + ObjectName objectName1 = new ObjectName(JMX_DOMAIN_PREFIX + "taskmanager.rep1", JMXReporter.generateJmxTable(mg.getAllVariables())); + ObjectName objectName2 = new ObjectName(JMX_DOMAIN_PREFIX + "taskmanager.rep2", JMXReporter.generateJmxTable(mg.getAllVariables())); JMXServiceURL url1 = new JMXServiceURL("service:jmx:rmi://localhost:" + ((JMXReporter)rep1).getPort() + "/jndi/rmi://localhost:" + ((JMXReporter)rep1).getPort() + "/jmxrmi"); JMXConnector jmxCon1 = JMXConnectorFactory.connect(url1); @@ -189,10 +209,14 @@ public class JMXReporterTest extends TestLogger { assertEquals(1, mCon2.getAttribute(objectName1, "Value")); assertEquals(2, mCon2.getAttribute(objectName2, "Value")); + rep1.notifyOfRemovedMetric(g1, "rep1", null); + rep1.notifyOfRemovedMetric(g2, "rep2", null); + jmxCon2.close(); rep1.close(); rep2.close(); + mg.close(); reg.shutdown(); } @@ -219,7 +243,7 @@ public class JMXReporterTest extends TestLogger { MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); - ObjectName objectName = new ObjectName(JMXReporter.generateJmxName(histogramName, metricGroup.getScopeComponents())); + ObjectName objectName = new ObjectName(JMX_DOMAIN_PREFIX + "taskmanager." + histogramName, JMXReporter.generateJmxTable(metricGroup.getAllVariables())); MBeanInfo info = mBeanServer.getMBeanInfo(objectName); @@ -269,7 +293,7 @@ public class JMXReporterTest extends TestLogger { MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); - ObjectName objectName = new ObjectName(JMXReporter.generateJmxName(meterName, metricGroup.getScopeComponents())); + ObjectName objectName = new ObjectName(JMX_DOMAIN_PREFIX + "taskmanager." + meterName, JMXReporter.generateJmxTable(metricGroup.getAllVariables())); MBeanInfo info = mBeanServer.getMBeanInfo(objectName); diff --git a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java index 0954241395acaafb6713e3e8a3d7c84b7639066d..3ae224f1574d2277175ec6862adc6605df0dbd78 100644 --- a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java +++ b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java @@ -28,6 +28,7 @@ import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings; import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings; import org.apache.flink.runtime.testingUtils.TestingCluster; import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages; +import org.junit.Assert; import org.junit.Test; import scala.concurrent.Await; import scala.concurrent.Future; @@ -38,6 +39,7 @@ import javax.management.MBeanServer; import javax.management.ObjectName; import java.lang.management.ManagementFactory; import java.util.Collections; +import java.util.Set; import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertEquals; @@ -83,8 +85,9 @@ public class JMXJobManagerMetricTest { Await.ready(jobRunning, deadline.timeLeft()); MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); - ObjectName objectName1 = new ObjectName("org.apache.flink.metrics:key0=jobmanager,key1=TestingJob,name=lastCheckpointSize"); - assertEquals(-1L, mBeanServer.getAttribute(objectName1, "Value")); + Set nameSet = mBeanServer.queryNames(new ObjectName("org.apache.flink.jobmanager.job.lastCheckpointSize:job_name=TestingJob,*"), null); + Assert.assertEquals(1, nameSet.size()); + assertEquals(-1L, mBeanServer.getAttribute(nameSet.iterator().next(), "Value")); Future jobFinished = flink.getLeaderGateway(deadline.timeLeft()) .ask(new TestingJobManagerMessages.NotifyWhenJobRemoved(jobGraph.getJobID()), deadline.timeLeft()); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java index 907c6558f13ad54ba579b4e206d2220ea45fcd1f..04b8158e6f243c5822f5803b60f0903ef7a3d461 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java @@ -89,6 +89,10 @@ public abstract class AbstractMetricGroup> impl * For example: "host-7.taskmanager-2.window_word_count.my-mapper" */ private String scopeString; + /** The logical metrics scope represented by this group, as a concatenated string, lazily computed. + * For example: "taskmanager.job.task" */ + private String logicalScopeString; + /** The metrics query service scope represented by this group, lazily computed. */ protected QueryScopeInfo queryServiceScopeInfo; @@ -118,6 +122,43 @@ public abstract class AbstractMetricGroup> impl return variables; } + /** + * Returns the logical scope of this group, for example + * {@code "taskmanager.job.task"} + * + * @param filter character filter which is applied to the scope components + * @return logical scope + */ + public String getLogicalScope(CharacterFilter filter) { + return getLogicalScope(filter, registry.getDelimiter()); + } + + /** + * Returns the logical scope of this group, for example + * {@code "taskmanager.job.task"} + * + * @param filter character filter which is applied to the scope components + * @return logical scope + */ + public String getLogicalScope(CharacterFilter filter, char delimiter) { + if (logicalScopeString == null) { + if (parent == null) { + logicalScopeString = getGroupName(filter); + } else { + logicalScopeString = parent.getLogicalScope(filter, delimiter) + delimiter + getGroupName(filter); + } + } + return logicalScopeString; + } + + /** + * Returns the name for this group, meaning what kind of entity it represents, for example "taskmanager". + * + * @param filter character filter which is applied to the name + * @return logical name for this group + */ + protected abstract String getGroupName(CharacterFilter filter); + /** * Gets the scope as an array of the scope components, for example * {@code ["host-7", "taskmanager-2", "window_word_count", "my-mapper"]} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/FrontMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/FrontMetricGroup.java index 885e6d6db193d7e60e08730c19d203b32f7432a0..63842fef9d66e56f9f7e1dfe3b675adb7fe27f1c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/FrontMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/FrontMetricGroup.java @@ -46,4 +46,12 @@ public class FrontMetricGroup

> extends ProxyMet public String getMetricIdentifier(String metricName, CharacterFilter filter) { return parentMetricGroup.getMetricIdentifier(metricName, filter, this.reporterIndex); } + + public String getLogicalScope(CharacterFilter filter) { + return parentMetricGroup.getLogicalScope(filter); + } + + public String getLogicalScope(CharacterFilter filter, char delimiter) { + return parentMetricGroup.getLogicalScope(filter, delimiter); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/GenericMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/GenericMetricGroup.java index 54fbed46765bd5af893cd60f47353e0d3f1bc8d9..5978f2d950f3644c38d75d7fed229a45109aadbb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/GenericMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/GenericMetricGroup.java @@ -56,4 +56,9 @@ public class GenericMetricGroup extends AbstractMetricGroup subComponents() { return jobs.values(); } + + @Override + protected String getGroupName(CharacterFilter filter) { + return "jobmanager"; + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobMetricGroup.java index 091807f5bcc7196dcc411b4920f6f8fdf4052ae2..17f6189102299c75e5fe64dfac3b71ad694f0ef4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobMetricGroup.java @@ -81,4 +81,9 @@ public abstract class JobMetricGroup> extends variables.put(ScopeFormat.SCOPE_JOB_ID, jobId.toString()); variables.put(ScopeFormat.SCOPE_JOB_NAME, jobName); } + + @Override + protected String getGroupName(CharacterFilter filter) { + return "job"; + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorMetricGroup.java index 0c823ea8ab546fe648baa25dfbf2124a07f2f170..37c9dd83004005591fa279130dc8201ef0eb4ef1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorMetricGroup.java @@ -83,4 +83,9 @@ public class OperatorMetricGroup extends ComponentMetricGroup { protected Iterable subComponents() { return Collections.emptyList(); } + + @Override + protected String getGroupName(CharacterFilter filter) { + return "operator"; + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerMetricGroup.java index a8fa82825610a2ec934e780c434c54986705c469..c3ca5fe1ea6e07b40bc82fc5a5d7e22a85421fae 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerMetricGroup.java @@ -135,5 +135,10 @@ public class TaskManagerMetricGroup extends ComponentMetricGroup subComponents() { return jobs.values(); } + + @Override + protected String getGroupName(CharacterFilter filter) { + return "taskmanager"; + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java index 75b8bd86cc5b71dedde9dcdb29192344156a72f3..43e8e1ba0c4b51b1d13c3d0ab9d14733a23dd821 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java @@ -169,4 +169,9 @@ public class TaskMetricGroup extends ComponentMetricGroup subComponents() { return operators.values(); } + + @Override + protected String getGroupName(CharacterFilter filter) { + return "task"; + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java index ca3810a81b85c694fa3ada1d1669f876691cecb6..c7b392fa7df6bac0d448aa04c5cc8de6d9a6cf6f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java @@ -39,6 +39,11 @@ public class AbstractMetricGroupTest { protected QueryScopeInfo createQueryServiceMetricInfo(CharacterFilter filter) { return null; } + + @Override + protected String getGroupName(CharacterFilter filter) { + return ""; + } }; assertTrue(group.getAllVariables().isEmpty()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java index 8a1a006bf595069da73ad574aa78c00c2ebc6f01..665abb1defb8c3e24c3a8f29d01b41e793c08ec1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java @@ -176,6 +176,11 @@ public class MetricGroupTest extends TestLogger { return null; } + @Override + protected String getGroupName(CharacterFilter filter) { + return ""; + } + @Override protected void addMetric(String name, Metric metric) {} diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java index 7b06cfd95936e1dea420562d08b73f4d5baf96a4..aa7ea49da646b0c4c04ab8709f9b71aa0c9dcb44 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java @@ -1504,13 +1504,13 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { // connect to JMX MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); // wait until we've found all 5 offset metrics - Set offsetMetrics = mBeanServer.queryNames(new ObjectName("*:key7=current-offsets,*"), null); + Set offsetMetrics = mBeanServer.queryNames(new ObjectName("*current-offsets*:*"), null); while (offsetMetrics.size() < 5) { // test will time out if metrics are not properly working if (error.f0 != null) { // fail test early throw error.f0; } - offsetMetrics = mBeanServer.queryNames(new ObjectName("*:key7=current-offsets,*"), null); + offsetMetrics = mBeanServer.queryNames(new ObjectName("*current-offsets*:*"), null); Thread.sleep(50); } Assert.assertEquals(5, offsetMetrics.size()); @@ -1534,7 +1534,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { } // check if producer metrics are also available. - Set producerMetrics = mBeanServer.queryNames(new ObjectName("*:key6=KafkaProducer,*"), null); + Set producerMetrics = mBeanServer.queryNames(new ObjectName("*KafkaProducer*:*"), null); Assert.assertTrue("No producer metrics found", producerMetrics.size() > 30);