提交 9bcbcf4a 编写于 作者: A Aljoscha Krettek

[FLINK-4246] Allow Specifying Multiple Metrics Reporters

This also updates documentation and tests.

Reporters can now be specified like this:

metrics.reporters: foo,bar

metrics.reporter.foo.class: JMXReporter.class
metrics.reporter.foo.port: 10

metrics.reporter.bar.class: GangliaReporter.class
metrics.reporter.bar.port: 11
metrics.reporter.bar.something: 42
上级 cd232e68
......@@ -227,14 +227,29 @@ or by assigning unique names to jobs and operators.
## Reporter
Metrics can be exposed to an external system by configuring a reporter in `conf/flink-conf.yaml`.
- `metrics.reporter.class`: The class of the reporter to use.
- Example: org.apache.flink.metrics.jmx.JMXReporter
- `metrics.reporter.arguments`: A list of named parameters that are passed to the reporter.
- Example: --host localhost --port 9010
- `metrics.reporter.interval`: The interval between reports.
- Example: 10 SECONDS
Metrics can be exposed to an external system by configuring one or several reporters in `conf/flink-conf.yaml`.
- `metrics.reporters`: The list of named reporters.
- `metrics.reporter.<name>.<config>`: Generic setting `<config>` for the reporter named `<name>`.
- `metrics.reporter.<name>.class`: The reporter class to use for the reporter named `<name>`.
- `metrics.reporter.<name>.interval`: The reporter interval to use for the reporter named `<name>`.
All reporters must at least have the `class` property, some allow specifying a reporting `interval`. Below,
we will list more settings specific to each reporter.
Example reporter configuration that specifies multiple reporters:
```
metrics.reporters: my_jmx_reporter,my_other_reporter
metrics.reporter.my_jmx_reporter.class: org.apache.flink.metrics.jmx.JMXReporter
metrics.reporter.my_jmx_reporter.port: 9020-9040
metrics.reporter.my_other_reporter.class: org.apache.flink.metrics.graphite.GraphiteReporter
metrics.reporter.my_other_reporter.host: 192.168.1.1
metrics.reporter.my_other_reporter.port: 10000
```
You can write your own `Reporter` by implementing the `org.apache.flink.metrics.reporter.MetricReporter` interface.
If the Reporter should send out reports regularly you have to implement the `Scheduled` interface as well.
......
......@@ -150,7 +150,7 @@ Default value is 1.
- `restart-strategy.fixed-delay.delay`: Delay between restart attempts, used if the default restart strategy is set to "fixed-delay".
Default value is the `akka.ask.timeout`.
- `restart-strategy.failure-rate.max-failures-per-interval`: Maximum number of restarts in given time interval before failing a job in "failure-rate" strategy.
- `restart-strategy.failure-rate.max-failures-per-interval`: Maximum number of restarts in given time interval before failing a job in "failure-rate" strategy.
Default value is 1.
- `restart-strategy.failure-rate.failure-rate-interval`: Time interval for measuring failure rate in "failure-rate" strategy.
......@@ -288,7 +288,7 @@ of the JobManager, because the same ActorSystem is used. Its not possible to use
- `recovery.zookeeper.path.root`: (Default '/flink') Defines the root dir under which the ZooKeeper recovery mode will create namespace directories.
- `recovery.zookeeper.path.namespace`: (Default '/default_ns' in standalone mode, or the <yarn-application-id> under Yarn) Defines the subdirectory under the root dir where the ZooKeeper recovery mode will create znodes. This allows to isolate multiple applications on the same ZooKeeper.
- `recovery.zookeeper.path.namespace`: (Default '/default_ns' in standalone mode, or the <yarn-application-id> under Yarn) Defines the subdirectory under the root dir where the ZooKeeper recovery mode will create znodes. This allows to isolate multiple applications on the same ZooKeeper.
- `recovery.zookeeper.path.latch`: (Default '/leaderlatch') Defines the znode of the leader latch which is used to elect the leader.
......@@ -312,7 +312,13 @@ of the JobManager, because the same ActorSystem is used. Its not possible to use
## Metrics
- `metrics.jmx.port`: (Default: 9010-9025) Defines the port used by JMX.
- `metrics.reporters`: The list of named reporters, i.e. "foo,bar".
- `metrics.reporter.<name>.<config>`: Generic setting `<config>` for the reporter named `<name>`.
- `metrics.reporter.<name>.class`: The reporter class to use for the reporter named `<name>`.
- `metrics.reporter.<name>.interval`: The reporter interval to use for the reporter named `<name>`.
- `metrics.scope.jm`: (Default: &lt;host&gt;.jobmanager) Defines the scope format string that is applied to all metrics scoped to a JobManager.
......
......@@ -668,14 +668,34 @@ public final class ConfigConstants {
// ---------------------------- Metrics -----------------------------------
/** The class of the reporter to use. */
public static final String METRICS_REPORTER_CLASS = "metrics.reporter.class";
/** A list of named parameters that are passed to the reporter. */
public static final String METRICS_REPORTER_ARGUMENTS = "metrics.reporter.arguments";
/**
* The list of named reporters. Names are defined here and per-reporter configs
* are given with the reporter config prefix and the reporter name.
*
* Example:
* <pre>{@code
* metrics.reporters = foo, bar
*
* metrics.reporter.foo.class = org.apache.flink.metrics.reporter.JMXReporter
* metrics.reporter.foo.interval = 10
*
* metrics.reporter.bar.class = org.apache.flink.metrics.graphite.GraphiteReporter
* metrics.reporter.bar.port = 1337
* }</pre>
*/
public static final String METRICS_REPORTERS_LIST = "metrics.reporters";
/**
* The prefix for per-reporter configs. Has to be combined with a reporter name and
* the configs mentioned below.
*/
public static final String METRICS_REPORTER_PREFIX = "metrics.reporter.";
/** The class of the reporter to use. This is used as a suffix in an actual reporter config */
public static final String METRICS_REPORTER_CLASS_SUFFIX = "class";
/** The interval between reports. */
public static final String METRICS_REPORTER_INTERVAL = "metrics.reporter.interval";
/** The interval between reports. This is used as a suffix in an actual reporter config */
public static final String METRICS_REPORTER_INTERVAL_SUFFIX = "interval";
/** The delimiter used to assemble the metric identifier. */
public static final String METRICS_SCOPE_DELIMITER = "metrics.scope.delimiter";
......
......@@ -23,6 +23,7 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.flink.annotation.Public;
......@@ -56,7 +57,7 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
/** Stores the concrete key/value pairs of this configuration object. */
private final HashMap<String, Object> confData;
protected final HashMap<String, Object> confData;
// --------------------------------------------------------------------------------------------
......@@ -420,6 +421,17 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
}
}
/**
* Adds all entries in this {@code Configuration} to the given {@link Properties}.
*/
public void addAllToProperties(Properties props) {
synchronized (this.confData) {
for (Map.Entry<String, Object> entry : this.confData.entrySet()) {
props.put(entry.getKey(), entry.getValue());
}
}
}
public void addAll(Configuration other) {
synchronized (this.confData) {
synchronized (other.confData) {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.configuration;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import java.io.IOException;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
/**
* A configuration that manages a subset of keys with a common prefix from a given configuration.
*/
public final class DelegatingConfiguration extends Configuration {
private static final long serialVersionUID = 1L;
private final Configuration backingConfig; // the configuration actually storing the data
private String prefix; // the prefix key by which keys for this config are marked
// --------------------------------------------------------------------------------------------
/**
* Default constructor for serialization. Creates an empty delegating configuration.
*/
public DelegatingConfiguration() {
this.backingConfig = new Configuration();
this.prefix = "";
}
/**
* Creates a new delegating configuration which stores its key/value pairs in the given
* configuration using the specifies key prefix.
*
* @param backingConfig The configuration holding the actual config data.
* @param prefix The prefix prepended to all config keys.
*/
public DelegatingConfiguration(Configuration backingConfig, String prefix)
{
this.backingConfig = backingConfig;
this.prefix = prefix;
}
// --------------------------------------------------------------------------------------------
@Override
public String getString(String key, String defaultValue) {
return this.backingConfig.getString(this.prefix + key, defaultValue);
}
@Override
public void setString(String key, String value) {
this.backingConfig.setString(this.prefix + key, value);
}
@Override
public <T> Class<T> getClass(String key, Class<? extends T> defaultValue, ClassLoader classLoader) throws ClassNotFoundException {
return this.backingConfig.getClass(this.prefix + key, defaultValue, classLoader);
}
@Override
public void setClass(String key, Class<?> klazz) {
this.backingConfig.setClass(this.prefix + key, klazz);
}
@Override
public int getInteger(String key, int defaultValue) {
return this.backingConfig.getInteger(this.prefix + key, defaultValue);
}
@Override
public void setInteger(String key, int value) {
this.backingConfig.setInteger(this.prefix + key, value);
}
@Override
public long getLong(String key, long defaultValue) {
return this.backingConfig.getLong(this.prefix + key, defaultValue);
}
@Override
public void setLong(String key, long value) {
this.backingConfig.setLong(this.prefix + key, value);
}
@Override
public boolean getBoolean(String key, boolean defaultValue) {
return this.backingConfig.getBoolean(this.prefix + key, defaultValue);
}
@Override
public void setBoolean(String key, boolean value) {
this.backingConfig.setBoolean(this.prefix + key, value);
}
@Override
public float getFloat(String key, float defaultValue) {
return this.backingConfig.getFloat(this.prefix + key, defaultValue);
}
@Override
public void setFloat(String key, float value) {
this.backingConfig.setFloat(this.prefix + key, value);
}
@Override
public double getDouble(String key, double defaultValue) {
return this.backingConfig.getDouble(this.prefix + key, defaultValue);
}
@Override
public void setDouble(String key, double value) {
this.backingConfig.setDouble(this.prefix + key, value);
}
@Override
public byte[] getBytes(final String key, final byte[] defaultValue) {
return this.backingConfig.getBytes(this.prefix + key, defaultValue);
}
@Override
public void setBytes(final String key, final byte[] bytes) {
this.backingConfig.setBytes(this.prefix + key, bytes);
}
@Override
public void addAllToProperties(Properties props) {
// only add keys with our prefix
synchronized (backingConfig.confData) {
for (Map.Entry<String, Object> entry : backingConfig.confData.entrySet()) {
if (entry.getKey().startsWith(prefix)) {
String keyWithoutPrefix =
entry.getKey().substring(prefix.length(),
entry.getKey().length());
props.put(keyWithoutPrefix, entry.getValue());
} else {
// don't add stuff that doesn't have our prefix
}
}
}
}
@Override
public void addAll(Configuration other) {
this.addAll(other, "");
}
@Override
public void addAll(Configuration other, String prefix) {
this.backingConfig.addAll(other, this.prefix + prefix);
}
@Override
public String toString() {
return backingConfig.toString();
}
@Override
public Set<String> keySet() {
final HashSet<String> set = new HashSet<String>();
final int prefixLen = this.prefix == null ? 0 : this.prefix.length();
for (String key : this.backingConfig.keySet()) {
if (key.startsWith(this.prefix)) {
set.add(key.substring(prefixLen));
}
}
return set;
}
// --------------------------------------------------------------------------------------------
@Override
public void read(DataInputView in) throws IOException {
this.prefix = in.readUTF();
this.backingConfig.read(in);
}
@Override
public void write(DataOutputView out) throws IOException {
out.writeUTF(this.prefix);
this.backingConfig.write(out);
}
// --------------------------------------------------------------------------------------------
@Override
public int hashCode() {
return this.prefix.hashCode() ^ this.backingConfig.hashCode();
}
@Override
public boolean equals(Object obj) {
if (obj instanceof DelegatingConfiguration) {
DelegatingConfiguration other = (DelegatingConfiguration) obj;
return this.prefix.equals(other.prefix) && this.backingConfig.equals(other.backingConfig);
} else {
return false;
}
}
}
......@@ -20,6 +20,8 @@ package org.apache.flink.configuration;
import org.apache.flink.annotation.Public;
import java.util.Properties;
/**
* Unmodifiable version of the Configuration class.
*/
......@@ -42,6 +44,13 @@ public class UnmodifiableConfiguration extends Configuration {
// All mutating methods must fail
// --------------------------------------------------------------------------------------------
@Override
public void addAllToProperties(Properties props) {
// override to make the UnmodifiableConfigurationTest happy
super.addAllToProperties(props);
}
@Override
public final void addAll(Configuration other) {
error();
......
......@@ -17,9 +17,9 @@
*/
package org.apache.flink.runtime.util;
package org.apache.flink.configuration;
import static org.junit.Assert.assertTrue;
import org.junit.Test;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
......@@ -27,9 +27,7 @@ import java.lang.reflect.Modifier;
import java.util.Arrays;
import java.util.Comparator;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.operators.util.TaskConfig.DelegatingConfiguration;
import org.junit.Test;
import static org.junit.Assert.assertTrue;
public class DelegatingConfigurationTest {
......
......@@ -31,10 +31,10 @@ import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.util.AbstractID;
import org.junit.Test;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertEquals;
......@@ -43,7 +43,7 @@ import static org.junit.Assert.assertTrue;
public class ScheduledDropwizardReporterTest {
@Test
public void testInvalidCharacterReplacement() {
public void testInvalidCharacterReplacement() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
ScheduledDropwizardReporter reporter = new ScheduledDropwizardReporter() {
@Override
public ScheduledReporter getReporter(MetricConfig config) {
......@@ -68,7 +68,11 @@ public class ScheduledDropwizardReporterTest {
String taskManagerId = "tas:kMana::ger";
String counterName = "testCounter";
configuration.setString(ConfigConstants.METRICS_REPORTER_CLASS, "org.apache.flink.dropwizard.ScheduledDropwizardReporterTest$TestingScheduledDropwizardReporter");
configuration.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test");
configuration.setString(
ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX,
"org.apache.flink.dropwizard.ScheduledDropwizardReporterTest$TestingScheduledDropwizardReporter");
configuration.setString(ConfigConstants.METRICS_SCOPE_NAMING_TASK, "<host>.<tm_id>.<job_name>");
configuration.setString(ConfigConstants.METRICS_SCOPE_DELIMITER, "_");
......@@ -84,10 +88,10 @@ public class ScheduledDropwizardReporterTest {
taskMetricGroup.counter(counterName, myCounter);
Field reporterField = MetricRegistry.class.getDeclaredField("reporter");
reporterField.setAccessible(true);
List<MetricReporter> reporters = metricRegistry.getReporters();
MetricReporter metricReporter = (MetricReporter) reporterField.get(metricRegistry);
assertTrue(reporters.size() == 1);
MetricReporter metricReporter = reporters.get(0);
assertTrue("Reporter should be of type ScheduledDropwizardReporter", metricReporter instanceof ScheduledDropwizardReporter);
......
......@@ -96,8 +96,9 @@ public class DropwizardFlinkHistogramWrapperTest extends TestLogger {
int size = 10;
String histogramMetricName = "histogram";
Configuration config = new Configuration();
config.setString(ConfigConstants.METRICS_REPORTER_CLASS, TestingReporter.class.getName());
config.setString(ConfigConstants.METRICS_REPORTER_INTERVAL, reportingInterval + " MILLISECONDS");
config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "my_reporter");
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "my_reporter." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestingReporter.class.getName());
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "my_reporter." + ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, reportingInterval + " MILLISECONDS");
MetricRegistry registry = null;
......@@ -112,10 +113,9 @@ public class DropwizardFlinkHistogramWrapperTest extends TestLogger {
String fullMetricName = metricGroup.getMetricIdentifier(histogramMetricName);
Field f = registry.getClass().getDeclaredField("reporter");
f.setAccessible(true);
assertTrue(registry.getReporters().size() == 1);
MetricReporter reporter = (MetricReporter) f.get(registry);
MetricReporter reporter = registry.getReporters().get(0);
assertTrue(reporter instanceof TestingReporter);
......
......@@ -23,9 +23,10 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.HistogramStatistics;
import org.apache.flink.metrics.MetricConfig;
import org.apache.flink.metrics.reporter.MetricReporter;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.runtime.metrics.util.TestReporter;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
......@@ -38,8 +39,10 @@ import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
import java.lang.management.ManagementFactory;
import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class JMXReporterTest extends TestLogger {
......@@ -79,18 +82,24 @@ public class JMXReporterTest extends TestLogger {
@Test
public void testPortConflictHandling() throws Exception {
Configuration cfg = new Configuration();
cfg.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test1,test2");
cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1.port", "9020-9035");
cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2.port", "9020-9035");
MetricRegistry reg = new MetricRegistry(cfg);
TaskManagerMetricGroup mg = new TaskManagerMetricGroup(reg, "host", "tm");
JMXReporter rep1 = new JMXReporter();
JMXReporter rep2 = new JMXReporter();
List<MetricReporter> reporters = reg.getReporters();
MetricConfig cfg1 = new MetricConfig();
cfg1.setProperty("port", "9020-9035");
assertTrue(reporters.size() == 2);
rep1.open(cfg1);
rep2.open(cfg1);
MetricReporter rep1 = reporters.get(0);
MetricReporter rep2 = reporters.get(1);
rep1.notifyOfAddedMetric(new Gauge<Integer>() {
@Override
......@@ -114,8 +123,6 @@ public class JMXReporterTest extends TestLogger {
assertEquals(1, mBeanServer.getAttribute(objectName1, "Value"));
assertEquals(2, mBeanServer.getAttribute(objectName2, "Value"));
rep1.close();
rep2.close();
reg.shutdown();
}
......@@ -127,17 +134,26 @@ public class JMXReporterTest extends TestLogger {
@Test
public void testJMXAvailability() throws Exception {
Configuration cfg = new Configuration();
cfg.setString(ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter.class.getName());
cfg.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test1,test2");
cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1.port", "9040-9055");
cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2.port", "9040-9055");
MetricRegistry reg = new MetricRegistry(cfg);
TaskManagerMetricGroup mg = new TaskManagerMetricGroup(reg, "host", "tm");
JMXReporter rep1 = new JMXReporter();
JMXReporter rep2 = new JMXReporter();
List<MetricReporter> reporters = reg.getReporters();
assertTrue(reporters.size() == 2);
MetricConfig cfg1 = new MetricConfig();
cfg1.setProperty("port", "9040-9055");
rep1.open(cfg1);
rep2.open(cfg1);
MetricReporter rep1 = reporters.get(0);
MetricReporter rep2 = reporters.get(1);
rep1.notifyOfAddedMetric(new Gauge<Integer>() {
@Override
......@@ -156,29 +172,23 @@ public class JMXReporterTest extends TestLogger {
ObjectName objectName1 = new ObjectName(JMXReporter.generateJmxName("rep1", mg.getScopeComponents()));
ObjectName objectName2 = new ObjectName(JMXReporter.generateJmxName("rep2", mg.getScopeComponents()));
JMXServiceURL url1 = new JMXServiceURL("service:jmx:rmi://localhost:" + rep1.getPort() + "/jndi/rmi://localhost:" + rep1.getPort() + "/jmxrmi");
JMXServiceURL url1 = new JMXServiceURL("service:jmx:rmi://localhost:" + ((JMXReporter)rep1).getPort() + "/jndi/rmi://localhost:" + ((JMXReporter)rep1).getPort() + "/jmxrmi");
JMXConnector jmxCon1 = JMXConnectorFactory.connect(url1);
MBeanServerConnection mCon1 = jmxCon1.getMBeanServerConnection();
assertEquals(1, mCon1.getAttribute(objectName1, "Value"));
assertEquals(2, mCon1.getAttribute(objectName2, "Value"));
url1 = null;
jmxCon1.close();
jmxCon1 = null;
mCon1 = null;
JMXServiceURL url2 = new JMXServiceURL("service:jmx:rmi://localhost:" + rep2.getPort() + "/jndi/rmi://localhost:" + rep2.getPort() + "/jmxrmi");
JMXServiceURL url2 = new JMXServiceURL("service:jmx:rmi://localhost:" + ((JMXReporter)rep2).getPort() + "/jndi/rmi://localhost:" + ((JMXReporter)rep2).getPort() + "/jmxrmi");
JMXConnector jmxCon2 = JMXConnectorFactory.connect(url2);
MBeanServerConnection mCon2 = jmxCon2.getMBeanServerConnection();
assertEquals(1, mCon2.getAttribute(objectName1, "Value"));
assertEquals(2, mCon2.getAttribute(objectName2, "Value"));
url2 = null;
jmxCon2.close();
jmxCon2 = null;
mCon2 = null;
rep1.close();
rep2.close();
......@@ -195,7 +205,8 @@ public class JMXReporterTest extends TestLogger {
try {
Configuration config = new Configuration();
config.setString(ConfigConstants.METRICS_REPORTER_CLASS, JMXReporter.class.getName());
config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "jmx_test");
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "jmx_test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
registry = new MetricRegistry(config);
......
......@@ -19,12 +19,12 @@ package org.apache.flink.runtime.jobmanager;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.jmx.JMXReporter;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
import org.apache.flink.metrics.jmx.JMXReporter;
import org.apache.flink.runtime.testingUtils.TestingCluster;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
import org.junit.Test;
......@@ -52,9 +52,11 @@ public class JMXJobManagerMetricTest {
Deadline deadline = new FiniteDuration(2, TimeUnit.MINUTES).fromNow();
Configuration flinkConfiguration = new Configuration();
flinkConfiguration.setString(ConfigConstants.METRICS_REPORTER_CLASS, JMXReporter.class.getName());
flinkConfiguration.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test");
flinkConfiguration.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
flinkConfiguration.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.port", "9060-9075");
flinkConfiguration.setString(ConfigConstants.METRICS_SCOPE_NAMING_JM_JOB, "jobmanager.<job_name>");
flinkConfiguration.setString(ConfigConstants.METRICS_REPORTER_ARGUMENTS, "--port 9060-9075");
TestingCluster flink = new TestingCluster(flinkConfiguration);
......
......@@ -36,12 +36,12 @@ import org.apache.flink.util.TestLogger;
import org.junit.Test;
import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.SocketException;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
......@@ -73,7 +73,11 @@ public class StatsDReporterTest extends TestLogger {
String taskManagerId = "tas:kMana::ger";
String counterName = "testCounter";
configuration.setString(ConfigConstants.METRICS_REPORTER_CLASS, "org.apache.flink.metrics.statsd.StatsDReporterTest$TestingStatsDReporter");
configuration.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test");
configuration.setString(
ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX,
"org.apache.flink.metrics.statsd.StatsDReporterTest$TestingStatsDReporter");
configuration.setString(ConfigConstants.METRICS_SCOPE_NAMING_TASK, "<host>.<tm_id>.<job_name>");
configuration.setString(ConfigConstants.METRICS_SCOPE_DELIMITER, "_");
......@@ -89,10 +93,11 @@ public class StatsDReporterTest extends TestLogger {
taskMetricGroup.counter(counterName, myCounter);
Field reporterField = MetricRegistry.class.getDeclaredField("reporter");
reporterField.setAccessible(true);
List<MetricReporter> reporters = metricRegistry.getReporters();
assertTrue(reporters.size() == 1);
MetricReporter metricReporter = (MetricReporter) reporterField.get(metricRegistry);
MetricReporter metricReporter = reporters.get(0);
assertTrue("Reporter should be of type StatsDReporter", metricReporter instanceof StatsDReporter);
......@@ -138,9 +143,11 @@ public class StatsDReporterTest extends TestLogger {
int port = receiver.getPort();
Configuration config = new Configuration();
config.setString(ConfigConstants.METRICS_REPORTER_CLASS, StatsDReporter.class.getName());
config.setString(ConfigConstants.METRICS_REPORTER_INTERVAL, "1 SECONDS");
config.setString(ConfigConstants.METRICS_REPORTER_ARGUMENTS, "--host localhost --port " + port);
config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test");
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, StatsDReporter.class.getName());
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, "1 SECONDS");
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.host", "localhost");
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.port", "" + port);
registry = new MetricRegistry(config);
......@@ -308,6 +315,7 @@ public class StatsDReporterTest extends TestLogger {
byte[] buffer = new byte[1024];
DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
socket.receive(packet);
String line = new String(packet.getData(), 0, packet.getLength());
......
......@@ -20,6 +20,7 @@ package org.apache.flink.runtime.metrics;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DelegatingConfiguration;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.MetricConfig;
import org.apache.flink.metrics.MetricGroup;
......@@ -30,6 +31,8 @@ import org.apache.flink.runtime.metrics.scope.ScopeFormats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.TimerTask;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
......@@ -41,9 +44,9 @@ import java.util.concurrent.TimeUnit;
*/
public class MetricRegistry {
static final Logger LOG = LoggerFactory.getLogger(MetricRegistry.class);
private final MetricReporter reporter;
private final ScheduledExecutorService executor;
private List<MetricReporter> reporters;
private ScheduledExecutorService executor;
private final ScopeFormats scopeFormats;
......@@ -74,56 +77,68 @@ public class MetricRegistry {
this.delimiter = delim;
// second, instantiate any custom configured reporters
this.reporters = new ArrayList<>();
final String definedReporters = config.getString(ConfigConstants.METRICS_REPORTERS_LIST, null);
final String className = config.getString(ConfigConstants.METRICS_REPORTER_CLASS, null);
if (className == null) {
if (definedReporters == null) {
// no reporters defined
// by default, don't report anything
LOG.info("No metrics reporter configured, no metrics will be exposed/reported.");
this.reporter = null;
this.executor = null;
}
else {
MetricReporter reporter;
ScheduledExecutorService executor = null;
try {
String configuredPeriod = config.getString(ConfigConstants.METRICS_REPORTER_INTERVAL, null);
TimeUnit timeunit = TimeUnit.SECONDS;
long period = 10;
if (configuredPeriod != null) {
try {
String[] interval = configuredPeriod.split(" ");
period = Long.parseLong(interval[0]);
timeunit = TimeUnit.valueOf(interval[1]);
}
catch (Exception e) {
LOG.error("Cannot parse report interval from config: " + configuredPeriod +
" - please use values like '10 SECONDS' or '500 MILLISECONDS'. " +
"Using default reporting interval.");
}
} else {
// we have some reporters so
String[] namedReporters = definedReporters.split("\\s*,\\s*");
for (String namedReporter : namedReporters) {
DelegatingConfiguration reporterConfig = new DelegatingConfiguration(config, ConfigConstants.METRICS_REPORTER_PREFIX + namedReporter + ".");
final String className = reporterConfig.getString(ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, null);
if (className == null) {
LOG.error("No reporter class set for reporter " + namedReporter + ". Metrics might not be exposed/reported.");
continue;
}
MetricConfig reporterConfig = createReporterConfig(config);
try {
String configuredPeriod = reporterConfig.getString(ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, null);
TimeUnit timeunit = TimeUnit.SECONDS;
long period = 10;
if (configuredPeriod != null) {
try {
String[] interval = configuredPeriod.split(" ");
period = Long.parseLong(interval[0]);
timeunit = TimeUnit.valueOf(interval[1]);
}
catch (Exception e) {
LOG.error("Cannot parse report interval from config: " + configuredPeriod +
" - please use values like '10 SECONDS' or '500 MILLISECONDS'. " +
"Using default reporting interval.");
}
}
Class<?> reporterClass = Class.forName(className);
reporter = (MetricReporter) reporterClass.newInstance();
reporter.open(reporterConfig);
Class<?> reporterClass = Class.forName(className);
MetricReporter reporterInstance = (MetricReporter) reporterClass.newInstance();
if (reporter instanceof Scheduled) {
executor = Executors.newSingleThreadScheduledExecutor();
LOG.info("Periodically reporting metrics in intervals of {} {}", period, timeunit.name());
MetricConfig metricConfig = new MetricConfig();
reporterConfig.addAllToProperties(metricConfig);
reporterInstance.open(metricConfig);
executor.scheduleWithFixedDelay(new ReporterTask((Scheduled) reporter), period, period, timeunit);
if (reporterInstance instanceof Scheduled) {
if (this.executor == null) {
executor = Executors.newSingleThreadScheduledExecutor();
}
LOG.info("Periodically reporting metrics in intervals of {} {} for reporter {} of type {}.", period, timeunit.name(), namedReporter, className);
executor.scheduleWithFixedDelay(
new ReporterTask((Scheduled) reporterInstance), period, period, timeunit);
}
reporters.add(reporterInstance);
}
catch (Throwable t) {
shutdownExecutor();
LOG.error("Could not instantiate metrics reporter" + namedReporter + ". Metrics might not be exposed/reported.", t);
}
}
catch (Throwable t) {
shutdownExecutor();
LOG.info("Could not instantiate metrics reporter. No metrics will be exposed/reported.", t);
reporter = null;
}
this.reporter = reporter;
this.executor = executor;
}
}
......@@ -131,24 +146,27 @@ public class MetricRegistry {
return this.delimiter;
}
public MetricReporter getReporter() {
return reporter;
public List<MetricReporter> getReporters() {
return reporters;
}
/**
* Shuts down this registry and the associated {@link org.apache.flink.metrics.reporter.MetricReporter}.
*/
public void shutdown() {
if (reporter != null) {
try {
reporter.close();
} catch (Throwable t) {
LOG.warn("Metrics reporter did not shut down cleanly", t);
if (reporters != null) {
for (MetricReporter reporter : reporters) {
try {
reporter.close();
} catch (Throwable t) {
LOG.warn("Metrics reporter did not shut down cleanly", t);
}
}
reporters = null;
}
shutdownExecutor();
}
private void shutdownExecutor() {
if (executor != null) {
executor.shutdown();
......@@ -180,8 +198,12 @@ public class MetricRegistry {
*/
public void register(Metric metric, String metricName, MetricGroup group) {
try {
if (reporter != null) {
reporter.notifyOfAddedMetric(metric, metricName, group);
if (reporters != null) {
for (MetricReporter reporter : reporters) {
if (reporter != null) {
reporter.notifyOfAddedMetric(metric, metricName, group);
}
}
}
} catch (Exception e) {
LOG.error("Error while registering metric.", e);
......@@ -197,8 +219,12 @@ public class MetricRegistry {
*/
public void unregister(Metric metric, String metricName, MetricGroup group) {
try {
if (reporter != null) {
reporter.notifyOfRemovedMetric(metric, metricName, group);
if (reporters != null) {
for (MetricReporter reporter : reporters) {
if (reporter != null) {
reporter.notifyOfRemovedMetric(metric, metricName, group);
}
}
}
} catch (Exception e) {
LOG.error("Error while registering metric.", e);
......@@ -208,17 +234,6 @@ public class MetricRegistry {
// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------
static MetricConfig createReporterConfig(Configuration config) {
MetricConfig reporterConfig = new MetricConfig();
String[] arguments = config.getString(ConfigConstants.METRICS_REPORTER_ARGUMENTS, "").split(" ");
if (arguments.length > 1) {
for (int x = 0; x < arguments.length; x += 2) {
reporterConfig.setProperty(arguments[x].replace("--", ""), arguments[x + 1]);
}
}
return reporterConfig;
}
static ScopeFormats createScopeConfig(Configuration config) {
String jmFormat = config.getString(
......
......@@ -25,9 +25,7 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.flink.api.common.aggregators.Aggregator;
import org.apache.flink.api.common.aggregators.AggregatorWithName;
......@@ -40,9 +38,8 @@ import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.configuration.DelegatingConfiguration;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.operators.DriverStrategy;
import org.apache.flink.runtime.operators.Driver;
......@@ -1162,184 +1159,5 @@ public class TaskConfig implements Serializable {
public boolean isSolutionSetUnmanaged() {
return config.getBoolean(SOLUTION_SET_OBJECTS, false);
}
// --------------------------------------------------------------------------------------------
// Utility class for nested Configurations
// --------------------------------------------------------------------------------------------
/**
* A configuration that manages a subset of keys with a common prefix from a given configuration.
*/
public static final class DelegatingConfiguration extends Configuration {
private static final long serialVersionUID = 1L;
private final Configuration backingConfig; // the configuration actually storing the data
private String prefix; // the prefix key by which keys for this config are marked
// --------------------------------------------------------------------------------------------
/**
* Default constructor for serialization. Creates an empty delegating configuration.
*/
public DelegatingConfiguration() {
this.backingConfig = new Configuration();
this.prefix = "";
}
/**
* Creates a new delegating configuration which stores its key/value pairs in the given
* configuration using the specifies key prefix.
*
* @param backingConfig The configuration holding the actual config data.
* @param prefix The prefix prepended to all config keys.
*/
public DelegatingConfiguration(Configuration backingConfig, String prefix)
{
this.backingConfig = backingConfig;
this.prefix = prefix;
}
// --------------------------------------------------------------------------------------------
@Override
public String getString(String key, String defaultValue) {
return this.backingConfig.getString(this.prefix + key, defaultValue);
}
@Override
public void setString(String key, String value) {
this.backingConfig.setString(this.prefix + key, value);
}
@Override
public <T> Class<T> getClass(String key, Class<? extends T> defaultValue, ClassLoader classLoader) throws ClassNotFoundException {
return this.backingConfig.getClass(this.prefix + key, defaultValue, classLoader);
}
@Override
public void setClass(String key, Class<?> klazz) {
this.backingConfig.setClass(this.prefix + key, klazz);
}
@Override
public int getInteger(String key, int defaultValue) {
return this.backingConfig.getInteger(this.prefix + key, defaultValue);
}
@Override
public void setInteger(String key, int value) {
this.backingConfig.setInteger(this.prefix + key, value);
}
@Override
public long getLong(String key, long defaultValue) {
return this.backingConfig.getLong(this.prefix + key, defaultValue);
}
@Override
public void setLong(String key, long value) {
this.backingConfig.setLong(this.prefix + key, value);
}
@Override
public boolean getBoolean(String key, boolean defaultValue) {
return this.backingConfig.getBoolean(this.prefix + key, defaultValue);
}
@Override
public void setBoolean(String key, boolean value) {
this.backingConfig.setBoolean(this.prefix + key, value);
}
@Override
public float getFloat(String key, float defaultValue) {
return this.backingConfig.getFloat(this.prefix + key, defaultValue);
}
@Override
public void setFloat(String key, float value) {
this.backingConfig.setFloat(this.prefix + key, value);
}
@Override
public double getDouble(String key, double defaultValue) {
return this.backingConfig.getDouble(this.prefix + key, defaultValue);
}
@Override
public void setDouble(String key, double value) {
this.backingConfig.setDouble(this.prefix + key, value);
}
@Override
public byte[] getBytes(final String key, final byte[] defaultValue) {
return this.backingConfig.getBytes(this.prefix + key, defaultValue);
}
@Override
public void setBytes(final String key, final byte[] bytes) {
this.backingConfig.setBytes(this.prefix + key, bytes);
}
@Override
public void addAll(Configuration other) {
this.addAll(other, "");
}
@Override
public void addAll(Configuration other, String prefix) {
this.backingConfig.addAll(other, this.prefix + prefix);
}
@Override
public String toString() {
return backingConfig.toString();
}
@Override
public Set<String> keySet() {
final HashSet<String> set = new HashSet<String>();
final int prefixLen = this.prefix == null ? 0 : this.prefix.length();
for (String key : this.backingConfig.keySet()) {
if (key.startsWith(this.prefix)) {
set.add(key.substring(prefixLen));
}
}
return set;
}
// --------------------------------------------------------------------------------------------
@Override
public void read(DataInputView in) throws IOException {
this.prefix = in.readUTF();
this.backingConfig.read(in);
}
@Override
public void write(DataOutputView out) throws IOException {
out.writeUTF(this.prefix);
this.backingConfig.write(out);
}
// --------------------------------------------------------------------------------------------
@Override
public int hashCode() {
return this.prefix.hashCode() ^ this.backingConfig.hashCode();
}
@Override
public boolean equals(Object obj) {
if (obj instanceof DelegatingConfiguration) {
DelegatingConfiguration other = (DelegatingConfiguration) obj;
return this.prefix.equals(other.prefix) && this.backingConfig.equals(other.backingConfig);
} else {
return false;
}
}
}
}
......@@ -85,7 +85,8 @@ public class ExecutionGraphMetricsTest extends TestLogger {
JobGraph jobGraph = new JobGraph("Test Job", jobVertex);
Configuration config = new Configuration();
config.setString(ConfigConstants.METRICS_REPORTER_CLASS, TestingReporter.class.getName());
config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test");
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestingReporter.class.getName());
Configuration jobConfig = new Configuration();
......@@ -93,7 +94,9 @@ public class ExecutionGraphMetricsTest extends TestLogger {
MetricRegistry metricRegistry = new MetricRegistry(config);
MetricReporter reporter = metricRegistry.getReporter();
assertTrue(metricRegistry.getReporters().size() == 1);
MetricReporter reporter = metricRegistry.getReporters().get(0);
assertTrue(reporter instanceof TestingReporter);
......
......@@ -45,9 +45,12 @@ public class MetricRegistryTest extends TestLogger {
public void testReporterInstantiation() {
Configuration config = new Configuration();
config.setString(ConfigConstants.METRICS_REPORTER_CLASS, TestReporter1.class.getName());
config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test");
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter1.class.getName());
new MetricRegistry(config);
MetricRegistry metricRegistry = new MetricRegistry(config);
assertTrue(metricRegistry.getReporters().size() == 1);
Assert.assertTrue(TestReporter1.wasOpened);
}
......@@ -61,6 +64,54 @@ public class MetricRegistryTest extends TestLogger {
}
}
/**
* Verifies that multiple reporters are instantiated correctly.
*/
@Test
public void testMultipleReporterInstantiation() {
Configuration config = new Configuration();
config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test1, test2,test3");
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter11.class.getName());
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter12.class.getName());
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test3." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter13.class.getName());
MetricRegistry metricRegistry = new MetricRegistry(config);
assertTrue(metricRegistry.getReporters().size() == 3);
Assert.assertTrue(TestReporter11.wasOpened);
Assert.assertTrue(TestReporter12.wasOpened);
Assert.assertTrue(TestReporter13.wasOpened);
}
protected static class TestReporter11 extends TestReporter {
public static boolean wasOpened = false;
@Override
public void open(MetricConfig config) {
wasOpened = true;
}
}
protected static class TestReporter12 extends TestReporter {
public static boolean wasOpened = false;
@Override
public void open(MetricConfig config) {
wasOpened = true;
}
}
protected static class TestReporter13 extends TestReporter {
public static boolean wasOpened = false;
@Override
public void open(MetricConfig config) {
wasOpened = true;
}
}
/**
* Verifies that configured arguments are properly forwarded to the reporter.
*/
......@@ -68,8 +119,10 @@ public class MetricRegistryTest extends TestLogger {
public void testReporterArgumentForwarding() {
Configuration config = new Configuration();
config.setString(ConfigConstants.METRICS_REPORTER_CLASS, TestReporter2.class.getName());
config.setString(ConfigConstants.METRICS_REPORTER_ARGUMENTS, "--arg1 hello --arg2 world");
config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test");
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter2.class.getName());
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.arg1", "hello");
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.arg2", "world");
new MetricRegistry(config);
}
......@@ -91,8 +144,10 @@ public class MetricRegistryTest extends TestLogger {
public void testReporterScheduling() throws InterruptedException {
Configuration config = new Configuration();
config.setString(ConfigConstants.METRICS_REPORTER_CLASS, TestReporter3.class.getName());
config.setString(ConfigConstants.METRICS_REPORTER_INTERVAL, "50 MILLISECONDS");
config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test");
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter3.class.getName());
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.arg1", "hello");
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, "50 MILLISECONDS");
new MetricRegistry(config);
......@@ -125,12 +180,14 @@ public class MetricRegistryTest extends TestLogger {
}
/**
* Verifies that reporters implementing the Listener interface are notified when Metrics are added or removed.
* Verifies that reporters are notified of added/removed metrics.
*/
@Test
public void testListener() {
public void testReporterNotifications() {
Configuration config = new Configuration();
config.setString(ConfigConstants.METRICS_REPORTER_CLASS, TestReporter6.class.getName());
config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test1,test2");
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter6.class.getName());
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter7.class.getName());
MetricRegistry registry = new MetricRegistry(config);
......@@ -140,6 +197,8 @@ public class MetricRegistryTest extends TestLogger {
assertTrue(TestReporter6.addCalled);
assertTrue(TestReporter6.removeCalled);
assertTrue(TestReporter7.addCalled);
assertTrue(TestReporter7.removeCalled);
}
protected static class TestReporter6 extends TestReporter {
......@@ -161,6 +220,25 @@ public class MetricRegistryTest extends TestLogger {
}
}
protected static class TestReporter7 extends TestReporter {
public static boolean addCalled = false;
public static boolean removeCalled = false;
@Override
public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {
addCalled = true;
assertTrue(metric instanceof Counter);
assertEquals("rootCounter", metricName);
}
@Override
public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) {
removeCalled = true;
Assert.assertTrue(metric instanceof Counter);
Assert.assertEquals("rootCounter", metricName);
}
}
/**
* Verifies that the scope configuration is properly extracted.
*/
......
......@@ -40,7 +40,8 @@ public class MetricGroupRegistrationTest {
@Test
public void testMetricInstantiation() {
Configuration config = new Configuration();
config.setString(ConfigConstants.METRICS_REPORTER_CLASS, TestReporter1.class.getName());
config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test");
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter1.class.getName());
MetricRegistry registry = new MetricRegistry(config);
......
......@@ -102,7 +102,8 @@ public abstract class KafkaTestBase extends TestLogger {
flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16);
flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");
flinkConfig.setString(ConfigConstants.METRICS_REPORTER_CLASS, JMXReporter.class.getName());
flinkConfig.setString(ConfigConstants.METRICS_REPORTERS_LIST, "my_reporter");
flinkConfig.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "my_reporter." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
flink = new ForkableFlinkMiniCluster(flinkConfig, false);
flink.start();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册