提交 94d1b63c 编写于 作者: Z zentol

[FLINK-4245] JMXReporter exposes all defined variables

This closes #2418.
上级 344fe94d
......@@ -49,20 +49,20 @@ under the License.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-metrics-core</artifactId>
<artifactId>flink-runtime_2.10</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<!-- test dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime_2.10</artifactId>
<artifactId>flink-metrics-core</artifactId>
<version>${project.version}</version>
<scope>test</scope>
<scope>provided</scope>
</dependency>
<!-- test dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime_2.10</artifactId>
......
......@@ -18,6 +18,7 @@
package org.apache.flink.metrics.jmx;
import org.apache.flink.metrics.CharacterFilter;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Histogram;
......@@ -25,8 +26,9 @@ import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.MetricConfig;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.reporter.MetricReporter;
import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
import org.apache.flink.util.NetUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -48,6 +50,7 @@ import java.rmi.registry.LocateRegistry;
import java.rmi.registry.Registry;
import java.rmi.server.UnicastRemoteObject;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.Map;
......@@ -59,13 +62,19 @@ import java.util.Map;
*/
public class JMXReporter implements MetricReporter {
private static final String PREFIX = "org.apache.flink.metrics:";
private static final String KEY_PREFIX = "key";
static final String JMX_DOMAIN_PREFIX = "org.apache.flink.";
public static final String ARG_PORT = "port";
private static final Logger LOG = LoggerFactory.getLogger(JMXReporter.class);
private static final CharacterFilter CHARACTER_FILTER = new CharacterFilter() {
@Override
public String filterCharacters(String input) {
return replaceInvalidChars(input);
}
};
// ------------------------------------------------------------------------
/** The server where the management beans are registered and deregistered */
......@@ -144,14 +153,19 @@ public class JMXReporter implements MetricReporter {
@Override
public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {
final String name = generateJmxName(metricName, group.getScopeComponents());
final String domain = generateJmxDomain(metricName, group);
final Hashtable<String, String> table = generateJmxTable(group.getAllVariables());
AbstractBean jmxMetric;
ObjectName jmxName;
try {
jmxName = new ObjectName(name);
jmxName = new ObjectName(domain, table);
} catch (MalformedObjectNameException e) {
LOG.error("Metric name did not conform to JMX ObjectName rules: " + name, e);
/**
* There is an implementation error on our side if this occurs. Either the domain was modified and no longer
* conforms to the JMX domain rules or the table wasn't properly generated.
*/
LOG.debug("Implementation error. The domain or table does not conform to JMX rules." , e);
return;
}
......@@ -176,12 +190,11 @@ public class JMXReporter implements MetricReporter {
}
} catch (NotCompliantMBeanException e) {
// implementation error on our side
LOG.error("Metric did not comply with JMX MBean naming rules.", e);
LOG.debug("Metric did not comply with JMX MBean rules.", e);
} catch (InstanceAlreadyExistsException e) {
LOG.debug("A metric with the name " + jmxName + " was already registered.", e);
LOG.error("A metric with the name " + jmxName + " was already registered.");
LOG.warn("A metric with the name " + jmxName + " was already registered.", e);
} catch (Throwable t) {
LOG.error("Failed to register metric", t);
LOG.warn("Failed to register metric", t);
}
}
......@@ -209,27 +222,18 @@ public class JMXReporter implements MetricReporter {
// Utilities
// ------------------------------------------------------------------------
static String generateJmxName(String metricName, String[] scopeComponents) {
final StringBuilder nameBuilder = new StringBuilder(128);
nameBuilder.append(PREFIX);
for (int x = 0; x < scopeComponents.length; x++) {
// write keyX=
nameBuilder.append(KEY_PREFIX);
nameBuilder.append(x);
nameBuilder.append("=");
// write scope component
nameBuilder.append(replaceInvalidChars(scopeComponents[x]));
nameBuilder.append(",");
static Hashtable<String, String> generateJmxTable(Map<String, String> variables) {
Hashtable<String, String> ht = new Hashtable<>(variables.size());
for (Map.Entry<String, String> variable : variables.entrySet()) {
ht.put(replaceInvalidChars(variable.getKey()), replaceInvalidChars(variable.getValue()));
}
return ht;
}
// write the name
nameBuilder.append("name=").append(replaceInvalidChars(metricName));
return nameBuilder.toString();
static String generateJmxDomain(String metricName, MetricGroup group) {
return JMX_DOMAIN_PREFIX + ((FrontMetricGroup<AbstractMetricGroup<?>>) group).getLogicalScope(CHARACTER_FILTER, '.') + '.' + metricName;
}
/**
* Lightweight method to replace unsupported characters.
* If the string does not contain any unsupported characters, this method creates no
......@@ -251,6 +255,8 @@ public class JMXReporter implements MetricReporter {
for (int i = 0; i < strLen; i++) {
final char c = str.charAt(i);
switch (c) {
case '>':
case '<':
case '"':
// remove character by not moving cursor
if (chars == null) {
......
......@@ -25,6 +25,7 @@ import org.apache.flink.metrics.util.TestMeter;
import org.apache.flink.metrics.reporter.MetricReporter;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.runtime.metrics.util.TestReporter;
import org.apache.flink.runtime.metrics.util.TestingHistogram;
......@@ -40,8 +41,12 @@ import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
import java.lang.management.ManagementFactory;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
import static org.apache.flink.metrics.jmx.JMXReporter.JMX_DOMAIN_PREFIX;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
......@@ -65,14 +70,20 @@ public class JMXReporterTest extends TestLogger {
}
/**
* Verifies that the JMXReporter properly generates the JMX name.
* Verifies that the JMXReporter properly generates the JMX table.
*/
@Test
public void testGenerateName() {
String[] scope = {"value0", "value1", "\"value2 (test),=;:?'"};
String jmxName = JMXReporter.generateJmxName("TestMetric", scope);
public void testGenerateTable() {
Map<String, String> vars = new HashMap<>();
vars.put("key0", "value0");
vars.put("key1", "value1");
vars.put("\"key2,=;:?'", "\"value2 (test),=;:?'");
assertEquals("org.apache.flink.metrics:key0=value0,key1=value1,key2=value2_(test)------,name=TestMetric", jmxName);
Hashtable<String, String> jmxTable = JMXReporter.generateJmxTable(vars);
assertEquals("value0", jmxTable.get("key0"));
assertEquals("value1", jmxTable.get("key1"));
assertEquals("value2_(test)------", jmxTable.get("key2------"));
}
/**
......@@ -102,28 +113,34 @@ public class JMXReporterTest extends TestLogger {
MetricReporter rep1 = reporters.get(0);
MetricReporter rep2 = reporters.get(1);
rep1.notifyOfAddedMetric(new Gauge<Integer>() {
Gauge<Integer> g1 = new Gauge<Integer>() {
@Override
public Integer getValue() {
return 1;
}
}, "rep1", new TaskManagerMetricGroup(reg, "host", "tm"));
rep2.notifyOfAddedMetric(new Gauge<Integer>() {
};
Gauge<Integer> g2 = new Gauge<Integer>() {
@Override
public Integer getValue() {
return 2;
}
}, "rep2", new TaskManagerMetricGroup(reg, "host", "tm"));
};
rep1.notifyOfAddedMetric(g1, "rep1", new FrontMetricGroup<>(0, new TaskManagerMetricGroup(reg, "host", "tm")));
rep2.notifyOfAddedMetric(g2, "rep2", new FrontMetricGroup<>(0, new TaskManagerMetricGroup(reg, "host", "tm")));
MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
ObjectName objectName1 = new ObjectName(JMXReporter.generateJmxName("rep1", mg.getScopeComponents()));
ObjectName objectName2 = new ObjectName(JMXReporter.generateJmxName("rep2", mg.getScopeComponents()));
ObjectName objectName1 = new ObjectName(JMX_DOMAIN_PREFIX + "taskmanager.rep1", JMXReporter.generateJmxTable(mg.getAllVariables()));
ObjectName objectName2 = new ObjectName(JMX_DOMAIN_PREFIX + "taskmanager.rep2", JMXReporter.generateJmxTable(mg.getAllVariables()));
assertEquals(1, mBeanServer.getAttribute(objectName1, "Value"));
assertEquals(2, mBeanServer.getAttribute(objectName2, "Value"));
rep1.notifyOfRemovedMetric(g1, "rep1", null);
rep1.notifyOfRemovedMetric(g2, "rep2", null);
mg.close();
reg.shutdown();
}
......@@ -156,22 +173,25 @@ public class JMXReporterTest extends TestLogger {
MetricReporter rep1 = reporters.get(0);
MetricReporter rep2 = reporters.get(1);
rep1.notifyOfAddedMetric(new Gauge<Integer>() {
Gauge<Integer> g1 = new Gauge<Integer>() {
@Override
public Integer getValue() {
return 1;
}
}, "rep1", new TaskManagerMetricGroup(reg, "host", "tm"));
rep2.notifyOfAddedMetric(new Gauge<Integer>() {
};
Gauge<Integer> g2 = new Gauge<Integer>() {
@Override
public Integer getValue() {
return 2;
}
}, "rep2", new TaskManagerMetricGroup(reg, "host", "tm"));
};
ObjectName objectName1 = new ObjectName(JMXReporter.generateJmxName("rep1", mg.getScopeComponents()));
ObjectName objectName2 = new ObjectName(JMXReporter.generateJmxName("rep2", mg.getScopeComponents()));
rep1.notifyOfAddedMetric(g1, "rep1", new FrontMetricGroup<>(0, new TaskManagerMetricGroup(reg, "host", "tm")));
rep2.notifyOfAddedMetric(g2, "rep2", new FrontMetricGroup<>(1, new TaskManagerMetricGroup(reg, "host", "tm")));
ObjectName objectName1 = new ObjectName(JMX_DOMAIN_PREFIX + "taskmanager.rep1", JMXReporter.generateJmxTable(mg.getAllVariables()));
ObjectName objectName2 = new ObjectName(JMX_DOMAIN_PREFIX + "taskmanager.rep2", JMXReporter.generateJmxTable(mg.getAllVariables()));
JMXServiceURL url1 = new JMXServiceURL("service:jmx:rmi://localhost:" + ((JMXReporter)rep1).getPort() + "/jndi/rmi://localhost:" + ((JMXReporter)rep1).getPort() + "/jmxrmi");
JMXConnector jmxCon1 = JMXConnectorFactory.connect(url1);
......@@ -189,10 +209,14 @@ public class JMXReporterTest extends TestLogger {
assertEquals(1, mCon2.getAttribute(objectName1, "Value"));
assertEquals(2, mCon2.getAttribute(objectName2, "Value"));
rep1.notifyOfRemovedMetric(g1, "rep1", null);
rep1.notifyOfRemovedMetric(g2, "rep2", null);
jmxCon2.close();
rep1.close();
rep2.close();
mg.close();
reg.shutdown();
}
......@@ -219,7 +243,7 @@ public class JMXReporterTest extends TestLogger {
MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
ObjectName objectName = new ObjectName(JMXReporter.generateJmxName(histogramName, metricGroup.getScopeComponents()));
ObjectName objectName = new ObjectName(JMX_DOMAIN_PREFIX + "taskmanager." + histogramName, JMXReporter.generateJmxTable(metricGroup.getAllVariables()));
MBeanInfo info = mBeanServer.getMBeanInfo(objectName);
......@@ -269,7 +293,7 @@ public class JMXReporterTest extends TestLogger {
MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
ObjectName objectName = new ObjectName(JMXReporter.generateJmxName(meterName, metricGroup.getScopeComponents()));
ObjectName objectName = new ObjectName(JMX_DOMAIN_PREFIX + "taskmanager." + meterName, JMXReporter.generateJmxTable(metricGroup.getAllVariables()));
MBeanInfo info = mBeanServer.getMBeanInfo(objectName);
......
......@@ -28,6 +28,7 @@ import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
import org.apache.flink.runtime.testingUtils.TestingCluster;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
import org.junit.Assert;
import org.junit.Test;
import scala.concurrent.Await;
import scala.concurrent.Future;
......@@ -38,6 +39,7 @@ import javax.management.MBeanServer;
import javax.management.ObjectName;
import java.lang.management.ManagementFactory;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
......@@ -83,8 +85,9 @@ public class JMXJobManagerMetricTest {
Await.ready(jobRunning, deadline.timeLeft());
MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
ObjectName objectName1 = new ObjectName("org.apache.flink.metrics:key0=jobmanager,key1=TestingJob,name=lastCheckpointSize");
assertEquals(-1L, mBeanServer.getAttribute(objectName1, "Value"));
Set<ObjectName> nameSet = mBeanServer.queryNames(new ObjectName("org.apache.flink.jobmanager.job.lastCheckpointSize:job_name=TestingJob,*"), null);
Assert.assertEquals(1, nameSet.size());
assertEquals(-1L, mBeanServer.getAttribute(nameSet.iterator().next(), "Value"));
Future<Object> jobFinished = flink.getLeaderGateway(deadline.timeLeft())
.ask(new TestingJobManagerMessages.NotifyWhenJobRemoved(jobGraph.getJobID()), deadline.timeLeft());
......
......@@ -89,6 +89,10 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl
* For example: "host-7.taskmanager-2.window_word_count.my-mapper" */
private String scopeString;
/** The logical metrics scope represented by this group, as a concatenated string, lazily computed.
* For example: "taskmanager.job.task" */
private String logicalScopeString;
/** The metrics query service scope represented by this group, lazily computed. */
protected QueryScopeInfo queryServiceScopeInfo;
......@@ -118,6 +122,43 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl
return variables;
}
/**
* Returns the logical scope of this group, for example
* {@code "taskmanager.job.task"}
*
* @param filter character filter which is applied to the scope components
* @return logical scope
*/
public String getLogicalScope(CharacterFilter filter) {
return getLogicalScope(filter, registry.getDelimiter());
}
/**
* Returns the logical scope of this group, for example
* {@code "taskmanager.job.task"}
*
* @param filter character filter which is applied to the scope components
* @return logical scope
*/
public String getLogicalScope(CharacterFilter filter, char delimiter) {
if (logicalScopeString == null) {
if (parent == null) {
logicalScopeString = getGroupName(filter);
} else {
logicalScopeString = parent.getLogicalScope(filter, delimiter) + delimiter + getGroupName(filter);
}
}
return logicalScopeString;
}
/**
* Returns the name for this group, meaning what kind of entity it represents, for example "taskmanager".
*
* @param filter character filter which is applied to the name
* @return logical name for this group
*/
protected abstract String getGroupName(CharacterFilter filter);
/**
* Gets the scope as an array of the scope components, for example
* {@code ["host-7", "taskmanager-2", "window_word_count", "my-mapper"]}
......
......@@ -46,4 +46,12 @@ public class FrontMetricGroup<P extends AbstractMetricGroup<?>> extends ProxyMet
public String getMetricIdentifier(String metricName, CharacterFilter filter) {
return parentMetricGroup.getMetricIdentifier(metricName, filter, this.reporterIndex);
}
public String getLogicalScope(CharacterFilter filter) {
return parentMetricGroup.getLogicalScope(filter);
}
public String getLogicalScope(CharacterFilter filter, char delimiter) {
return parentMetricGroup.getLogicalScope(filter, delimiter);
}
}
......@@ -56,4 +56,9 @@ public class GenericMetricGroup extends AbstractMetricGroup<AbstractMetricGroup<
}
return new String[] { name };
}
@Override
protected String getGroupName(CharacterFilter filter) {
return filter.filterCharacters(name);
}
}
......@@ -107,5 +107,10 @@ public class JobManagerMetricGroup extends ComponentMetricGroup<JobManagerMetric
protected Iterable<? extends ComponentMetricGroup> subComponents() {
return jobs.values();
}
@Override
protected String getGroupName(CharacterFilter filter) {
return "jobmanager";
}
}
......@@ -81,4 +81,9 @@ public abstract class JobMetricGroup<C extends ComponentMetricGroup<C>> extends
variables.put(ScopeFormat.SCOPE_JOB_ID, jobId.toString());
variables.put(ScopeFormat.SCOPE_JOB_NAME, jobName);
}
@Override
protected String getGroupName(CharacterFilter filter) {
return "job";
}
}
......@@ -83,4 +83,9 @@ public class OperatorMetricGroup extends ComponentMetricGroup<TaskMetricGroup> {
protected Iterable<? extends ComponentMetricGroup> subComponents() {
return Collections.emptyList();
}
@Override
protected String getGroupName(CharacterFilter filter) {
return "operator";
}
}
......@@ -135,5 +135,10 @@ public class TaskManagerMetricGroup extends ComponentMetricGroup<TaskManagerMetr
protected Iterable<? extends ComponentMetricGroup> subComponents() {
return jobs.values();
}
@Override
protected String getGroupName(CharacterFilter filter) {
return "taskmanager";
}
}
......@@ -169,4 +169,9 @@ public class TaskMetricGroup extends ComponentMetricGroup<TaskManagerJobMetricGr
protected Iterable<? extends ComponentMetricGroup> subComponents() {
return operators.values();
}
@Override
protected String getGroupName(CharacterFilter filter) {
return "task";
}
}
......@@ -39,6 +39,11 @@ public class AbstractMetricGroupTest {
protected QueryScopeInfo createQueryServiceMetricInfo(CharacterFilter filter) {
return null;
}
@Override
protected String getGroupName(CharacterFilter filter) {
return "";
}
};
assertTrue(group.getAllVariables().isEmpty());
......
......@@ -176,6 +176,11 @@ public class MetricGroupTest extends TestLogger {
return null;
}
@Override
protected String getGroupName(CharacterFilter filter) {
return "";
}
@Override
protected void addMetric(String name, Metric metric) {}
......
......@@ -1504,13 +1504,13 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
// connect to JMX
MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
// wait until we've found all 5 offset metrics
Set<ObjectName> offsetMetrics = mBeanServer.queryNames(new ObjectName("*:key7=current-offsets,*"), null);
Set<ObjectName> offsetMetrics = mBeanServer.queryNames(new ObjectName("*current-offsets*:*"), null);
while (offsetMetrics.size() < 5) { // test will time out if metrics are not properly working
if (error.f0 != null) {
// fail test early
throw error.f0;
}
offsetMetrics = mBeanServer.queryNames(new ObjectName("*:key7=current-offsets,*"), null);
offsetMetrics = mBeanServer.queryNames(new ObjectName("*current-offsets*:*"), null);
Thread.sleep(50);
}
Assert.assertEquals(5, offsetMetrics.size());
......@@ -1534,7 +1534,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
}
// check if producer metrics are also available.
Set<ObjectName> producerMetrics = mBeanServer.queryNames(new ObjectName("*:key6=KafkaProducer,*"), null);
Set<ObjectName> producerMetrics = mBeanServer.queryNames(new ObjectName("*KafkaProducer*:*"), null);
Assert.assertTrue("No producer metrics found", producerMetrics.size() > 30);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册