提交 e3fec1f9 编写于 作者: Z zentol

[FLINK-4192] [metrics] Move metrics classes out of 'flink-core'

- moved user-facing API to 'flink-metrics/flink-metrics-core'
- moved JMXReporter to 'flink-metrics/flink-metrics-jmx'
- moved remaining metric classes to 'flink-runtime'

This closes #2226
上级 e4fe89d6
......@@ -230,7 +230,7 @@ or by assigning unique names to jobs and operators.
Metrics can be exposed to an external system by configuring a reporter in `conf/flink-conf.yaml`.
- `metrics.reporter.class`: The class of the reporter to use.
- Example: org.apache.flink.metrics.reporter.JMXReporter
- Example: org.apache.flink.metrics.jmx.JMXReporter
- `metrics.reporter.arguments`: A list of named parameters that are passed to the reporter.
- Example: --host localhost --port 9010
- `metrics.reporter.interval`: The interval between reports.
......@@ -241,7 +241,7 @@ If the Reporter should send out reports regularly you have to implement the `Sch
The following sections list the supported reporters.
### JMX (org.apache.flink.metrics.reporter.JMXReporter)
### JMX (org.apache.flink.metrics.jmx.JMXReporter)
You don't have to include an additional dependency since the JMX reporter is available by default
but not activated.
......
......@@ -41,6 +41,12 @@ under the License.
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-metrics-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.esotericsoftware.kryo</groupId>
<artifactId>kryo</artifactId>
......
......@@ -108,6 +108,13 @@ under the License.
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-metrics-jmx</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
<!-- See main pom.xml for explanation of profiles -->
......
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.flink</groupId>
<artifactId>flink-metrics</artifactId>
<version>1.1-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
<artifactId>flink-metrics-core</artifactId>
<name>flink-metrics-core</name>
</project>
......@@ -18,12 +18,9 @@
package org.apache.flink.metrics;
import org.apache.flink.annotation.PublicEvolving;
/**
* A Counter is a {@link Metric} that measures a count.
*/
@PublicEvolving
public interface Counter extends Metric {
/**
......
......@@ -18,13 +18,11 @@
package org.apache.flink.metrics;
import org.apache.flink.annotation.PublicEvolving;
/**
* A Gauge is a {@link Metric} that calculates a specific value at a point in time.
*/
@PublicEvolving
public interface Gauge<T> extends Metric {
/**
* Calculates and returns the measured value.
*
......
......@@ -18,15 +18,12 @@
package org.apache.flink.metrics;
import org.apache.flink.annotation.PublicEvolving;
/**
* Histogram interface to be used with Flink's metrics system.
*
* The histogram allows to record values, get the current count of recorded values and create
* histogram statistics for the currently seen elements.
*/
@PublicEvolving
public interface Histogram extends Metric {
/**
......
......@@ -18,15 +18,12 @@
package org.apache.flink.metrics;
import org.apache.flink.annotation.PublicEvolving;
/**
* Histogram statistics represent the current snapshot of elements recorded in the histogram.
*
* The histogram statistics allow to calculate values for quantiles, the mean, the standard
* deviation, the minimum and the maximum.
*/
@PublicEvolving
public abstract class HistogramStatistics {
/**
......
......@@ -18,11 +18,8 @@
package org.apache.flink.metrics;
import org.apache.flink.annotation.PublicEvolving;
/**
* Common super interface for all metrics.
*/
@PublicEvolving
public interface Metric {
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.metrics;
import java.util.Properties;
public class MetricConfig extends Properties {
public String getString(String key, String defaultValue) {
return getProperty(key, defaultValue);
}
public int getInteger(String key, int defaultValue) {
String argument = getProperty(key, null);
return argument == null
? defaultValue
: Integer.parseInt(argument);
}
public long getLong(String key, long defaultValue) {
String argument = getProperty(key, null);
return argument == null
? defaultValue
: Long.parseLong(argument);
}
public float getFloat(String key, float defaultValue) {
String argument = getProperty(key, null);
return argument == null
? defaultValue
: Float.parseFloat(argument);
}
public double getDouble(String key, double defaultValue) {
String argument = getProperty(key, null);
return argument == null
? defaultValue
: Double.parseDouble(argument);
}
public boolean getBoolean(String key, boolean defaultValue) {
String argument = getProperty(key, null);
return argument == null
? defaultValue
: Boolean.parseBoolean(argument);
}
}
......@@ -18,8 +18,6 @@
package org.apache.flink.metrics;
import org.apache.flink.annotation.PublicEvolving;
/**
* A MetricGroup is a named container for {@link Metric Metrics} and further metric subgroups.
*
......@@ -28,7 +26,6 @@ import org.apache.flink.annotation.PublicEvolving;
*
* <p>A MetricGroup is uniquely identified by it's place in the hierarchy and name.
*/
@PublicEvolving
public interface MetricGroup {
// ------------------------------------------------------------------------
......@@ -130,4 +127,36 @@ public interface MetricGroup {
* @return the created group
*/
MetricGroup addGroup(String name);
// ------------------------------------------------------------------------
// Scope
// ------------------------------------------------------------------------
/**
* Gets the scope as an array of the scope components, for example
* {@code ["host-7", "taskmanager-2", "window_word_count", "my-mapper"]}
*
* @see #getMetricIdentifier(String)
* @see #getMetricIdentifier(String, CharacterFilter)
*/
String[] getScopeComponents();
/**
* Returns the fully qualified metric name, for example
* {@code "host-7.taskmanager-2.window_word_count.my-mapper.metricName"}
*
* @param metricName metric name
* @return fully qualified metric name
*/
String getMetricIdentifier(String metricName);
/**
* Returns the fully qualified metric name, for example
* {@code "host-7.taskmanager-2.window_word_count.my-mapper.metricName"}
*
* @param metricName metric name
* @param filter character filter which is applied to the scope components if not null.
* @return fully qualified metric name
*/
String getMetricIdentifier(String metricName, CharacterFilter filter);
}
......@@ -21,6 +21,8 @@ package org.apache.flink.metrics;
* A simple low-overhead {@link org.apache.flink.metrics.Counter} that is not thread-safe.
*/
public class SimpleCounter implements Counter {
/** the current count */
private long count;
/**
......
......@@ -18,7 +18,7 @@
package org.apache.flink.metrics.groups;
import org.apache.flink.annotation.Internal;
import org.apache.flink.metrics.CharacterFilter;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Histogram;
......@@ -29,7 +29,6 @@ import org.apache.flink.metrics.SimpleCounter;
* A special {@link MetricGroup} that does not register any metrics at the metrics registry
* and any reporters.
*/
@Internal
public class UnregisteredMetricsGroup implements MetricGroup {
@Override
......@@ -81,4 +80,19 @@ public class UnregisteredMetricsGroup implements MetricGroup {
public MetricGroup addGroup(String name) {
return new UnregisteredMetricsGroup();
}
@Override
public String[] getScopeComponents() {
return new String[0];
}
@Override
public String getMetricIdentifier(String metricName) {
return metricName;
}
@Override
public String getMetricIdentifier(String metricName, CharacterFilter filter) {
return metricName;
}
}
......@@ -18,13 +18,12 @@
package org.apache.flink.metrics.reporter;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.metrics.CharacterFilter;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.groups.AbstractMetricGroup;
import org.apache.flink.metrics.MetricGroup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -34,7 +33,6 @@ import java.util.Map;
/**
* Base interface for custom metric reporters.
*/
@PublicEvolving
public abstract class AbstractReporter implements MetricReporter, CharacterFilter {
protected final Logger log = LoggerFactory.getLogger(getClass());
......@@ -43,7 +41,7 @@ public abstract class AbstractReporter implements MetricReporter, CharacterFilte
protected final Map<Histogram, String> histograms = new HashMap<>();
@Override
public void notifyOfAddedMetric(Metric metric, String metricName, AbstractMetricGroup group) {
public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {
final String name = group.getMetricIdentifier(metricName, this);
synchronized (this) {
......@@ -61,7 +59,7 @@ public abstract class AbstractReporter implements MetricReporter, CharacterFilte
}
@Override
public void notifyOfRemovedMetric(Metric metric, String metricName, AbstractMetricGroup group) {
public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) {
synchronized (this) {
if (metric instanceof Counter) {
counters.remove(metric);
......
......@@ -18,10 +18,9 @@
package org.apache.flink.metrics.reporter;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.groups.AbstractMetricGroup;
import org.apache.flink.metrics.MetricConfig;
import org.apache.flink.metrics.MetricGroup;
/**
* Reporters are used to export {@link Metric Metrics} to an external backend.
......@@ -29,7 +28,6 @@ import org.apache.flink.metrics.groups.AbstractMetricGroup;
* <p>Reporters are instantiated via reflection and must be public, non-abstract, and have a
* public no-argument constructor.
*/
@PublicEvolving
public interface MetricReporter {
// ------------------------------------------------------------------------
......@@ -44,7 +42,7 @@ public interface MetricReporter {
*
* @param config The configuration with all parameters.
*/
void open(Configuration config);
void open(MetricConfig config);
/**
* Closes this reporter. Should be used to close channels, streams and release resources.
......@@ -62,7 +60,7 @@ public interface MetricReporter {
* @param metricName the name of the metric
* @param group the group that contains the metric
*/
void notifyOfAddedMetric(Metric metric, String metricName, AbstractMetricGroup group);
void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group);
/**
* Called when a {@link Metric} was should be removed.
......@@ -71,5 +69,5 @@ public interface MetricReporter {
* @param metricName the name of the metric
* @param group the group that contains the metric
*/
void notifyOfRemovedMetric(Metric metric, String metricName, AbstractMetricGroup group);
void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group);
}
......@@ -18,17 +18,14 @@
package org.apache.flink.metrics.reporter;
import org.apache.flink.annotation.PublicEvolving;
/**
* Interface for reporters that actively send out data periodically.
*/
@PublicEvolving
public interface Scheduled {
/**
* Report the current measurements. This method is called periodically by the
* metrics registry that uses the reoprter.
* metrics registry that uses the reporter.
*/
void report();
}
......@@ -35,7 +35,14 @@ under the License.
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<artifactId>flink-annotations</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-metrics-core</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
......@@ -49,6 +56,13 @@ under the License.
<!-- test dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime_2.10</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils-junit</artifactId>
......
......@@ -23,7 +23,6 @@ import com.codahale.metrics.Reporter;
import com.codahale.metrics.ScheduledReporter;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.dropwizard.metrics.FlinkCounterWrapper;
import org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper;
import org.apache.flink.dropwizard.metrics.FlinkGaugeWrapper;
......@@ -33,7 +32,8 @@ import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.groups.AbstractMetricGroup;
import org.apache.flink.metrics.MetricConfig;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.reporter.MetricReporter;
import org.apache.flink.metrics.reporter.Scheduled;
import org.slf4j.Logger;
......@@ -88,7 +88,7 @@ public abstract class ScheduledDropwizardReporter implements MetricReporter, Sch
// ------------------------------------------------------------------------
@Override
public void open(Configuration config) {
public void open(MetricConfig config) {
this.reporter = getReporter(config);
}
......@@ -102,7 +102,7 @@ public abstract class ScheduledDropwizardReporter implements MetricReporter, Sch
// ------------------------------------------------------------------------
@Override
public void notifyOfAddedMetric(Metric metric, String metricName, AbstractMetricGroup group) {
public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {
final String fullName = group.getMetricIdentifier(metricName, this);
synchronized (this) {
......@@ -130,7 +130,7 @@ public abstract class ScheduledDropwizardReporter implements MetricReporter, Sch
}
@Override
public void notifyOfRemovedMetric(Metric metric, String metricName, AbstractMetricGroup group) {
public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) {
synchronized (this) {
String fullName;
......@@ -200,5 +200,5 @@ public abstract class ScheduledDropwizardReporter implements MetricReporter, Sch
this.reporter.report(gauges, counters, histograms, meters, timers);
}
public abstract ScheduledReporter getReporter(Configuration config);
public abstract ScheduledReporter getReporter(MetricConfig config);
}
......@@ -23,17 +23,18 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MetricRegistry;
import org.apache.flink.metrics.MetricConfig;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.metrics.groups.TaskManagerJobMetricGroup;
import org.apache.flink.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.metrics.groups.TaskMetricGroup;
import org.apache.flink.metrics.reporter.MetricReporter;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.util.AbstractID;
import org.junit.Test;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.util.Map;
import static org.junit.Assert.assertEquals;
......@@ -42,10 +43,10 @@ import static org.junit.Assert.assertTrue;
public class ScheduledDropwizardReporterTest {
@Test
public void testInvalidCharacterReplacement() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
public void testInvalidCharacterReplacement() {
ScheduledDropwizardReporter reporter = new ScheduledDropwizardReporter() {
@Override
public ScheduledReporter getReporter(Configuration config) {
public ScheduledReporter getReporter(MetricConfig config) {
return null;
}
};
......@@ -112,7 +113,7 @@ public class ScheduledDropwizardReporterTest {
public static class TestingScheduledDropwizardReporter extends ScheduledDropwizardReporter {
@Override
public ScheduledReporter getReporter(Configuration config) {
public ScheduledReporter getReporter(MetricConfig config) {
return null;
}
......
......@@ -29,9 +29,10 @@ import com.codahale.metrics.Timer;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.dropwizard.ScheduledDropwizardReporter;
import org.apache.flink.metrics.MetricRegistry;
import org.apache.flink.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.metrics.MetricConfig;
import org.apache.flink.metrics.reporter.MetricReporter;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
......@@ -153,7 +154,7 @@ public class DropwizardFlinkHistogramWrapperTest extends TestLogger {
TestingScheduledReporter scheduledReporter = null;
@Override
public ScheduledReporter getReporter(Configuration config) {
public ScheduledReporter getReporter(MetricConfig config) {
scheduledReporter = new TestingScheduledReporter(
registry,
getClass().getName(),
......
......@@ -42,7 +42,7 @@ under the License.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<artifactId>flink-metrics-core</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
......
......@@ -23,8 +23,8 @@ import com.codahale.metrics.ScheduledReporter;
import info.ganglia.gmetric4j.gmetric.GMetric;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.dropwizard.ScheduledDropwizardReporter;
import org.apache.flink.metrics.MetricConfig;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
......@@ -38,7 +38,7 @@ public class GangliaReporter extends ScheduledDropwizardReporter {
public static final String ARG_MODE_ADDRESSING = "addressingMode";
@Override
public ScheduledReporter getReporter(Configuration config) {
public ScheduledReporter getReporter(MetricConfig config) {
try {
String host = config.getString(ARG_HOST, null);
......
......@@ -42,7 +42,7 @@ under the License.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<artifactId>flink-metrics-core</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
......
......@@ -22,8 +22,8 @@ import com.codahale.metrics.ScheduledReporter;
import com.codahale.metrics.graphite.Graphite;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.dropwizard.ScheduledDropwizardReporter;
import org.apache.flink.metrics.MetricConfig;
import java.util.concurrent.TimeUnit;
......@@ -31,7 +31,7 @@ import java.util.concurrent.TimeUnit;
public class GraphiteReporter extends ScheduledDropwizardReporter {
@Override
public ScheduledReporter getReporter(Configuration config) {
public ScheduledReporter getReporter(MetricConfig config) {
String host = config.getString(ARG_HOST, null);
int port = config.getInteger(ARG_PORT, -1);
......
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.flink</groupId>
<artifactId>flink-metrics</artifactId>
<version>1.1-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
<artifactId>flink-metrics-jmx</artifactId>
<name>flink-metrics-jmx</name>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-annotations</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-metrics-core</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<!-- test dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime_2.10</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime_2.10</artifactId>
<version>${project.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils-junit</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
......@@ -16,15 +16,16 @@
* limitations under the License.
*/
package org.apache.flink.metrics.reporter;
package org.apache.flink.metrics.jmx;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.groups.AbstractMetricGroup;
import org.apache.flink.metrics.MetricConfig;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.reporter.MetricReporter;
import org.apache.flink.util.NetUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -55,7 +56,6 @@ import java.util.Map;
* Largely based on the JmxReporter class of the dropwizard metrics library
* https://github.com/dropwizard/metrics/blob/master/metrics-core/src/main/java/io/dropwizard/metrics/JmxReporter.java
*/
@Internal
public class JMXReporter implements MetricReporter {
private static final String PREFIX = "org.apache.flink.metrics:";
......@@ -89,7 +89,7 @@ public class JMXReporter implements MetricReporter {
// ------------------------------------------------------------------------
@Override
public void open(Configuration config) {
public void open(MetricConfig config) {
String portsConfig = config.getString(ARG_PORT, null);
if (portsConfig != null) {
......@@ -142,7 +142,7 @@ public class JMXReporter implements MetricReporter {
// ------------------------------------------------------------------------
@Override
public void notifyOfAddedMetric(Metric metric, String metricName, AbstractMetricGroup group) {
public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {
final String name = generateJmxName(metricName, group.getScopeComponents());
AbstractBean jmxMetric;
......@@ -183,7 +183,7 @@ public class JMXReporter implements MetricReporter {
}
@Override
public void notifyOfRemovedMetric(Metric metric, String metricName, AbstractMetricGroup group) {
public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) {
try {
synchronized (this) {
final ObjectName jmxName = registeredMetrics.remove(metric);
......
......@@ -16,16 +16,16 @@
* limitations under the License.
*/
package org.apache.flink.metrics.reporter;
package org.apache.flink.metrics.jmx;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.HistogramStatistics;
import org.apache.flink.metrics.MetricRegistry;
import org.apache.flink.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.metrics.util.TestReporter;
import org.apache.flink.metrics.MetricConfig;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
......@@ -79,7 +79,6 @@ public class JMXReporterTest extends TestLogger {
@Test
public void testPortConflictHandling() throws Exception {
Configuration cfg = new Configuration();
cfg.setString(ConfigConstants.METRICS_REPORTER_CLASS, TestReporter.class.getName());
MetricRegistry reg = new MetricRegistry(cfg);
TaskManagerMetricGroup mg = new TaskManagerMetricGroup(reg, "host", "tm");
......@@ -87,8 +86,8 @@ public class JMXReporterTest extends TestLogger {
JMXReporter rep1 = new JMXReporter();
JMXReporter rep2 = new JMXReporter();
Configuration cfg1 = new Configuration();
cfg1.setString("port", "9020-9035");
MetricConfig cfg1 = new MetricConfig();
cfg1.setProperty("port", "9020-9035");
rep1.open(cfg1);
rep2.open(cfg1);
......@@ -128,7 +127,6 @@ public class JMXReporterTest extends TestLogger {
@Test
public void testJMXAvailability() throws Exception {
Configuration cfg = new Configuration();
cfg.setString(ConfigConstants.METRICS_REPORTER_CLASS, TestReporter.class.getName());
MetricRegistry reg = new MetricRegistry(cfg);
TaskManagerMetricGroup mg = new TaskManagerMetricGroup(reg, "host", "tm");
......@@ -136,8 +134,8 @@ public class JMXReporterTest extends TestLogger {
JMXReporter rep1 = new JMXReporter();
JMXReporter rep2 = new JMXReporter();
Configuration cfg1 = new Configuration();
cfg1.setString("port", "9040-9055");
MetricConfig cfg1 = new MetricConfig();
cfg1.setProperty("port", "9040-9055");
rep1.open(cfg1);
rep2.open(cfg1);
......@@ -197,7 +195,7 @@ public class JMXReporterTest extends TestLogger {
try {
Configuration config = new Configuration();
config.setString(ConfigConstants.METRICS_REPORTER_CLASS, "org.apache.flink.metrics.reporter.JMXReporter");
config.setString(ConfigConstants.METRICS_REPORTER_CLASS, JMXReporter.class.getName());
registry = new MetricRegistry(config);
......
......@@ -24,6 +24,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
import org.apache.flink.metrics.jmx.JMXReporter;
import org.apache.flink.runtime.testingUtils.TestingCluster;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
import org.junit.Test;
......@@ -40,18 +41,18 @@ import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
public class JobManagerMetricTest {
public class JMXJobManagerMetricTest {
/**
* Tests that metrics registered on the JobManager are actually accessible.
* Tests that metrics registered on the JobManager are actually accessible via JMX.
*
* @throws Exception
*/
@Test
public void testJobManagerMetricAccess() throws Exception {
public void testJobManagerJMXMetricAccess() throws Exception {
Deadline deadline = new FiniteDuration(2, TimeUnit.MINUTES).fromNow();
Configuration flinkConfiguration = new Configuration();
flinkConfiguration.setString(ConfigConstants.METRICS_REPORTER_CLASS, "org.apache.flink.metrics.reporter.JMXReporter");
flinkConfiguration.setString(ConfigConstants.METRICS_REPORTER_CLASS, JMXReporter.class.getName());
flinkConfiguration.setString(ConfigConstants.METRICS_SCOPE_NAMING_JM_JOB, "jobmanager.<job_name>");
flinkConfiguration.setString(ConfigConstants.METRICS_REPORTER_ARGUMENTS, "--port 9060-9075");
......
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Set root logger level to OFF to not flood build logs
# set manually to INFO for debugging purposes
log4j.rootLogger=OFF, testlogger
# A1 is set to be a ConsoleAppender.
log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
log4j.appender.testlogger.target = System.err
log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
</encoder>
</appender>
<root level="WARN">
<appender-ref ref="STDOUT"/>
</root>
<logger name="org.apache.flink.api.common.io.DelimitedInputFormat" level="OFF"/>
<logger name="org.apache.flink.api.common.io.FileInputFormat" level="OFF"/>
<logger name="org.apache.flink.configuration.GlobalConfiguration" level="OFF"/>
<logger name="org.apache.flink.configuration.Configuration" level="OFF"/>
</configuration>
\ No newline at end of file
......@@ -42,13 +42,20 @@ under the License.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<artifactId>flink-metrics-core</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<!-- test dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime_2.10</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils-junit</artifactId>
......
......@@ -19,11 +19,11 @@
package org.apache.flink.metrics.statsd;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.HistogramStatistics;
import org.apache.flink.metrics.MetricConfig;
import org.apache.flink.metrics.reporter.AbstractReporter;
import org.apache.flink.metrics.reporter.Scheduled;
......@@ -61,7 +61,7 @@ public class StatsDReporter extends AbstractReporter implements Scheduled {
private InetSocketAddress address;
@Override
public void open(Configuration config) {
public void open(MetricConfig config) {
String host = config.getString(ARG_HOST, null);
int port = config.getInteger(ARG_PORT, -1);
......
......@@ -24,12 +24,13 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.HistogramStatistics;
import org.apache.flink.metrics.MetricRegistry;
import org.apache.flink.metrics.MetricConfig;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.metrics.groups.TaskManagerJobMetricGroup;
import org.apache.flink.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.metrics.groups.TaskMetricGroup;
import org.apache.flink.metrics.reporter.MetricReporter;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
......@@ -191,7 +192,7 @@ public class StatsDReporterTest extends TestLogger {
*/
public static class TestingStatsDReporter extends StatsDReporter {
@Override
public void open(Configuration configuration) {
public void open(MetricConfig configuration) {
// disable the socket creation
}
......
......@@ -34,9 +34,11 @@ under the License.
<packaging>pom</packaging>
<modules>
<module>flink-metrics-core</module>
<module>flink-metrics-dropwizard</module>
<module>flink-metrics-ganglia</module>
<module>flink-metrics-graphite</module>
<module>flink-metrics-jmx</module>
<module>flink-metrics-statsd</module>
</modules>
</project>
......@@ -22,7 +22,7 @@ import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
......
......@@ -22,7 +22,7 @@ package org.apache.flink.runtime.io.network.api.serialization;
import java.io.IOException;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.metrics.groups.IOMetricGroup;
import org.apache.flink.runtime.metrics.groups.IOMetricGroup;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.io.network.buffer.Buffer;
......
......@@ -25,7 +25,7 @@ import java.nio.ByteOrder;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.groups.IOMetricGroup;
import org.apache.flink.runtime.metrics.groups.IOMetricGroup;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.util.DataOutputSerializer;
......
......@@ -19,7 +19,7 @@
package org.apache.flink.runtime.io.network.api.writer;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.metrics.groups.IOMetricGroup;
import org.apache.flink.runtime.metrics.groups.IOMetricGroup;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
......
......@@ -18,7 +18,7 @@
package org.apache.flink.runtime.io.network.partition.consumer;
import org.apache.flink.metrics.groups.IOMetricGroup;
import org.apache.flink.runtime.metrics.groups.IOMetricGroup;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
......
......@@ -18,7 +18,7 @@
package org.apache.flink.runtime.io.network.partition.consumer;
import org.apache.flink.metrics.groups.IOMetricGroup;
import org.apache.flink.runtime.metrics.groups.IOMetricGroup;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.ConnectionManager;
......
......@@ -20,7 +20,7 @@ package org.apache.flink.runtime.io.network.partition.consumer;
import com.google.common.collect.Maps;
import org.apache.flink.api.common.JobID;
import org.apache.flink.metrics.groups.IOMetricGroup;
import org.apache.flink.runtime.metrics.groups.IOMetricGroup;
import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionLocation;
......
......@@ -18,7 +18,7 @@
package org.apache.flink.runtime.io.network.partition.consumer;
import org.apache.flink.metrics.groups.IOMetricGroup;
import org.apache.flink.runtime.metrics.groups.IOMetricGroup;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.ConnectionManager;
......
......@@ -16,17 +16,17 @@
* limitations under the License.
*/
package org.apache.flink.metrics;
package org.apache.flink.runtime.metrics;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.groups.AbstractMetricGroup;
import org.apache.flink.metrics.groups.scope.ScopeFormat;
import org.apache.flink.metrics.groups.scope.ScopeFormats;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.MetricConfig;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.reporter.MetricReporter;
import org.apache.flink.metrics.reporter.Scheduled;
import org.apache.flink.runtime.metrics.scope.ScopeFormat;
import org.apache.flink.runtime.metrics.scope.ScopeFormats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -39,10 +39,9 @@ import java.util.concurrent.TimeUnit;
* A MetricRegistry keeps track of all registered {@link Metric Metrics}. It serves as the
* connection between {@link MetricGroup MetricGroups} and {@link MetricReporter MetricReporters}.
*/
@Internal
public class MetricRegistry {
static final Logger LOG = LoggerFactory.getLogger(MetricRegistry.class);
private final MetricReporter reporter;
private final ScheduledExecutorService executor;
......@@ -75,7 +74,7 @@ public class MetricRegistry {
this.delimiter = delim;
// second, instantiate any custom configured reporters
final String className = config.getString(ConfigConstants.METRICS_REPORTER_CLASS, null);
if (className == null) {
// by default, don't report anything
......@@ -90,7 +89,7 @@ public class MetricRegistry {
String configuredPeriod = config.getString(ConfigConstants.METRICS_REPORTER_INTERVAL, null);
TimeUnit timeunit = TimeUnit.SECONDS;
long period = 10;
if (configuredPeriod != null) {
try {
String[] interval = configuredPeriod.split(" ");
......@@ -99,13 +98,13 @@ public class MetricRegistry {
}
catch (Exception e) {
LOG.error("Cannot parse report interval from config: " + configuredPeriod +
" - please use values like '10 SECONDS' or '500 MILLISECONDS'. " +
"Using default reporting interval.");
" - please use values like '10 SECONDS' or '500 MILLISECONDS'. " +
"Using default reporting interval.");
}
}
Configuration reporterConfig = createReporterConfig(config, timeunit, period);
MetricConfig reporterConfig = createReporterConfig(config);
Class<?> reporterClass = Class.forName(className);
reporter = (MetricReporter) reporterClass.newInstance();
reporter.open(reporterConfig);
......@@ -113,13 +112,13 @@ public class MetricRegistry {
if (reporter instanceof Scheduled) {
executor = Executors.newSingleThreadScheduledExecutor();
LOG.info("Periodically reporting metrics in intervals of {} {}", period, timeunit.name());
executor.scheduleWithFixedDelay(new ReporterTask((Scheduled) reporter), period, period, timeunit);
}
}
catch (Throwable t) {
shutdownExecutor();
LOG.error("Could not instantiate metrics reporter. No metrics will be exposed/reported.", t);
LOG.info("Could not instantiate metrics reporter. No metrics will be exposed/reported.", t);
reporter = null;
}
......@@ -149,7 +148,7 @@ public class MetricRegistry {
}
shutdownExecutor();
}
private void shutdownExecutor() {
if (executor != null) {
executor.shutdown();
......@@ -179,7 +178,7 @@ public class MetricRegistry {
* @param metricName the name of the metric
* @param group the group that contains the metric
*/
public void register(Metric metric, String metricName, AbstractMetricGroup group) {
public void register(Metric metric, String metricName, MetricGroup group) {
try {
if (reporter != null) {
reporter.notifyOfAddedMetric(metric, metricName, group);
......@@ -196,7 +195,7 @@ public class MetricRegistry {
* @param metricName the name of the metric
* @param group the group that contains the metric
*/
public void unregister(Metric metric, String metricName, AbstractMetricGroup group) {
public void unregister(Metric metric, String metricName, MetricGroup group) {
try {
if (reporter != null) {
reporter.notifyOfRemovedMetric(metric, metricName, group);
......@@ -209,16 +208,13 @@ public class MetricRegistry {
// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------
static Configuration createReporterConfig(Configuration config, TimeUnit timeunit, long period) {
Configuration reporterConfig = new Configuration();
reporterConfig.setLong("period", period);
reporterConfig.setString("timeunit", timeunit.name());
static MetricConfig createReporterConfig(Configuration config) {
MetricConfig reporterConfig = new MetricConfig();
String[] arguments = config.getString(ConfigConstants.METRICS_REPORTER_ARGUMENTS, "").split(" ");
if (arguments.length > 1) {
for (int x = 0; x < arguments.length; x += 2) {
reporterConfig.setString(arguments[x].replace("--", ""), arguments[x + 1]);
reporterConfig.setProperty(arguments[x].replace("--", ""), arguments[x + 1]);
}
}
return reporterConfig;
......@@ -226,18 +222,18 @@ public class MetricRegistry {
static ScopeFormats createScopeConfig(Configuration config) {
String jmFormat = config.getString(
ConfigConstants.METRICS_SCOPE_NAMING_JM, ScopeFormat.DEFAULT_SCOPE_JOBMANAGER_GROUP);
ConfigConstants.METRICS_SCOPE_NAMING_JM, ScopeFormat.DEFAULT_SCOPE_JOBMANAGER_GROUP);
String jmJobFormat = config.getString(
ConfigConstants.METRICS_SCOPE_NAMING_JM_JOB, ScopeFormat.DEFAULT_SCOPE_JOBMANAGER_JOB_GROUP);
String tmFormat = config.getString(
ConfigConstants.METRICS_SCOPE_NAMING_TM, ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_GROUP);
ConfigConstants.METRICS_SCOPE_NAMING_TM, ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_GROUP);
String tmJobFormat = config.getString(
ConfigConstants.METRICS_SCOPE_NAMING_TM_JOB, ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP);
ConfigConstants.METRICS_SCOPE_NAMING_TM_JOB, ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP);
String taskFormat = config.getString(
ConfigConstants.METRICS_SCOPE_NAMING_TASK, ScopeFormat.DEFAULT_SCOPE_TASK_GROUP);
ConfigConstants.METRICS_SCOPE_NAMING_TASK, ScopeFormat.DEFAULT_SCOPE_TASK_GROUP);
String operatorFormat = config.getString(
ConfigConstants.METRICS_SCOPE_NAMING_OPERATOR, ScopeFormat.DEFAULT_SCOPE_OPERATOR_GROUP);
ConfigConstants.METRICS_SCOPE_NAMING_OPERATOR, ScopeFormat.DEFAULT_SCOPE_OPERATOR_GROUP);
return new ScopeFormats(jmFormat, jmJobFormat, tmFormat, tmJobFormat, taskFormat, operatorFormat);
}
......@@ -246,7 +242,7 @@ public class MetricRegistry {
/**
* This task is explicitly a static class, so that it does not hold any references to the enclosing
* MetricsRegistry instance.
*
*
* This is a subtle difference, but very important: With this static class, the enclosing class instance
* may become garbage-collectible, whereas with an anonymous inner class, the timer thread
* (which is a GC root) will hold a reference via the timer task and its enclosing instance pointer.
......
......@@ -16,19 +16,18 @@
* limitations under the License.
*/
package org.apache.flink.metrics.groups;
package org.apache.flink.runtime.metrics.groups;
import org.apache.flink.annotation.Internal;
import org.apache.flink.metrics.CharacterFilter;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.metrics.groups.scope.ScopeFormat;
import org.apache.flink.runtime.metrics.scope.ScopeFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -55,7 +54,6 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* return Counters, Gauges, etc to the code, to prevent exceptions in the monitored code.
* These metrics simply do not get reported any more, when created on a closed group.
*/
@Internal
public abstract class AbstractMetricGroup implements MetricGroup {
/** shared logger */
......@@ -93,6 +91,8 @@ public abstract class AbstractMetricGroup implements MetricGroup {
/**
* Gets the scope as an array of the scope components, for example
* {@code ["host-7", "taskmanager-2", "window_word_count", "my-mapper"]}
*
* @see #getMetricIdentifier(String)
*/
public String[] getScopeComponents() {
return scopeComponents;
......@@ -104,7 +104,7 @@ public abstract class AbstractMetricGroup implements MetricGroup {
*
* @param metricName metric name
* @return fully qualified metric name
*/
*/
public String getMetricIdentifier(String metricName) {
return getMetricIdentifier(metricName, null);
}
......
......@@ -16,10 +16,9 @@
* limitations under the License.
*/
package org.apache.flink.metrics.groups;
package org.apache.flink.runtime.metrics.groups;
import org.apache.flink.annotation.Internal;
import org.apache.flink.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistry;
/**
* Abstract {@link org.apache.flink.metrics.MetricGroup} for system components (e.g.,
......@@ -34,7 +33,6 @@ import org.apache.flink.metrics.MetricRegistry;
* group could for example include the task attempt number (more fine grained identification), or
* exclude it (for continuity of the namespace across failure and recovery).
*/
@Internal
public abstract class ComponentMetricGroup extends AbstractMetricGroup {
/**
......@@ -43,10 +41,7 @@ public abstract class ComponentMetricGroup extends AbstractMetricGroup {
* @param registry registry to register new metrics with
* @param scope the scope of the group
*/
public ComponentMetricGroup(
MetricRegistry registry,
String[] scope) {
public ComponentMetricGroup(MetricRegistry registry, String[] scope) {
super(registry, scope);
}
......@@ -71,8 +66,13 @@ public abstract class ComponentMetricGroup extends AbstractMetricGroup {
}
// ------------------------------------------------------------------------
// sub components
// Component Metric Group Specifics
// ------------------------------------------------------------------------
/**
* Gets all component metric groups that are contained in this component metric group.
*
* @return All component metric groups that are contained in this component metric group.
*/
protected abstract Iterable<? extends ComponentMetricGroup> subComponents();
}
......@@ -16,16 +16,14 @@
* limitations under the License.
*/
package org.apache.flink.metrics.groups;
package org.apache.flink.runtime.metrics.groups;
import org.apache.flink.annotation.Internal;
import org.apache.flink.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistry;
/**
* A simple named {@link org.apache.flink.metrics.MetricGroup} that is used to hold
* subgroups of metrics.
*/
@Internal
public class GenericMetricGroup extends AbstractMetricGroup {
public GenericMetricGroup(MetricRegistry registry, AbstractMetricGroup parent, String name) {
......
......@@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.flink.metrics.groups;
package org.apache.flink.runtime.metrics.groups;
import org.apache.flink.metrics.Counter;
......
......@@ -15,12 +15,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.metrics.groups;
package org.apache.flink.runtime.metrics.groups;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.JobID;
import org.apache.flink.metrics.MetricRegistry;
import org.apache.flink.metrics.groups.scope.ScopeFormat.JobManagerJobScopeFormat;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.scope.JobManagerJobScopeFormat;
import javax.annotation.Nullable;
import java.util.Collections;
......@@ -31,7 +30,6 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* Special {@link org.apache.flink.metrics.MetricGroup} representing everything belonging to
* a specific job, running on the JobManager.
*/
@Internal
public class JobManagerJobMetricGroup extends JobMetricGroup {
/** The metrics group that contains this group */
......@@ -62,6 +60,10 @@ public class JobManagerJobMetricGroup extends JobMetricGroup {
return parent;
}
// ------------------------------------------------------------------------
// Component Metric Group Specifics
// ------------------------------------------------------------------------
@Override
protected Iterable<? extends ComponentMetricGroup> subComponents() {
return Collections.emptyList();
......
......@@ -15,12 +15,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.metrics.groups;
package org.apache.flink.runtime.metrics.groups;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.JobID;
import org.apache.flink.metrics.MetricRegistry;
import org.apache.flink.metrics.groups.scope.ScopeFormat.JobManagerScopeFormat;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.scope.JobManagerScopeFormat;
import java.util.HashMap;
import java.util.Map;
......@@ -31,7 +31,6 @@ import java.util.Map;
* <p>Contains extra logic for adding jobs with tasks, and removing jobs when they do
* not contain tasks any more
*/
@Internal
public class JobManagerMetricGroup extends ComponentMetricGroup {
private final Map<JobID, JobManagerJobMetricGroup> jobs = new HashMap<>();
......@@ -59,9 +58,9 @@ public class JobManagerMetricGroup extends ComponentMetricGroup {
// job groups
// ------------------------------------------------------------------------
public JobManagerJobMetricGroup addJob(
JobID jobId,
String jobName) {
public JobManagerJobMetricGroup addJob(JobGraph job) {
JobID jobId = job.getJobID();
String jobName = job.getName();
// get or create a jobs metric group
JobManagerJobMetricGroup currentJobGroup;
synchronized (this) {
......
......@@ -16,11 +16,11 @@
* limitations under the License.
*/
package org.apache.flink.metrics.groups;
package org.apache.flink.runtime.metrics.groups;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.JobID;
import org.apache.flink.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistry;
import javax.annotation.Nullable;
......
......@@ -16,11 +16,10 @@
* limitations under the License.
*/
package org.apache.flink.metrics.groups;
package org.apache.flink.runtime.metrics.groups;
import org.apache.flink.annotation.Internal;
import org.apache.flink.metrics.MetricRegistry;
import org.apache.flink.metrics.groups.scope.ScopeFormat.OperatorScopeFormat;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.scope.OperatorScopeFormat;
import java.util.Collections;
......@@ -29,7 +28,6 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* Special {@link org.apache.flink.metrics.MetricGroup} representing an Operator.
*/
@Internal
public class OperatorMetricGroup extends ComponentMetricGroup {
/** The task metric group that contains this operator metric groups */
......@@ -55,6 +53,10 @@ public class OperatorMetricGroup extends ComponentMetricGroup {
return parent;
}
// ------------------------------------------------------------------------
// Component Metric Group Specifics
// ------------------------------------------------------------------------
@Override
protected Iterable<? extends ComponentMetricGroup> subComponents() {
return Collections.emptyList();
......
......@@ -16,26 +16,27 @@
* limitations under the License.
*/
package org.apache.flink.metrics.groups;
package org.apache.flink.runtime.metrics.groups;
import org.apache.flink.annotation.Internal;
import org.apache.flink.metrics.CharacterFilter;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.util.Preconditions;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* Metric group which forwards all registration calls to its parent metric group.
*
* @param <P> Type of the parent metric group
*/
@Internal
public class ProxyMetricGroup<P extends MetricGroup> implements MetricGroup {
private final P parentMetricGroup;
public ProxyMetricGroup(P parentMetricGroup) {
this.parentMetricGroup = Preconditions.checkNotNull(parentMetricGroup);
this.parentMetricGroup = checkNotNull(parentMetricGroup);
}
@Override
......@@ -87,4 +88,19 @@ public class ProxyMetricGroup<P extends MetricGroup> implements MetricGroup {
public final MetricGroup addGroup(String name) {
return parentMetricGroup.addGroup(name);
}
@Override
public String[] getScopeComponents() {
return parentMetricGroup.getScopeComponents();
}
@Override
public String getMetricIdentifier(String metricName) {
return parentMetricGroup.getMetricIdentifier(metricName);
}
@Override
public String getMetricIdentifier(String metricName, CharacterFilter filter) {
return parentMetricGroup.getMetricIdentifier(metricName, filter);
}
}
......@@ -15,12 +15,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.metrics.groups;
package org.apache.flink.runtime.metrics.groups;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.JobID;
import org.apache.flink.metrics.MetricRegistry;
import org.apache.flink.metrics.groups.scope.ScopeFormat.TaskManagerJobScopeFormat;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.scope.TaskManagerJobScopeFormat;
import org.apache.flink.util.AbstractID;
import javax.annotation.Nullable;
......@@ -35,7 +35,6 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
*
* <p>Contains extra logic for adding Tasks ({@link TaskMetricGroup}).
*/
@Internal
public class TaskManagerJobMetricGroup extends JobMetricGroup {
/** The metrics group that contains this group */
......@@ -47,20 +46,20 @@ public class TaskManagerJobMetricGroup extends JobMetricGroup {
// ------------------------------------------------------------------------
public TaskManagerJobMetricGroup(
MetricRegistry registry,
TaskManagerMetricGroup parent,
JobID jobId,
@Nullable String jobName) {
MetricRegistry registry,
TaskManagerMetricGroup parent,
JobID jobId,
@Nullable String jobName) {
this(registry, checkNotNull(parent), registry.getScopeFormats().getTaskManagerJobFormat(), jobId, jobName);
}
public TaskManagerJobMetricGroup(
MetricRegistry registry,
TaskManagerMetricGroup parent,
TaskManagerJobScopeFormat scopeFormat,
JobID jobId,
@Nullable String jobName) {
MetricRegistry registry,
TaskManagerMetricGroup parent,
TaskManagerJobScopeFormat scopeFormat,
JobID jobId,
@Nullable String jobName) {
super(registry, jobId, jobName, scopeFormat.formatScope(parent, jobId, jobName));
......@@ -75,12 +74,12 @@ public class TaskManagerJobMetricGroup extends JobMetricGroup {
// adding / removing tasks
// ------------------------------------------------------------------------
public TaskMetricGroup addTask(
AbstractID vertexId,
AbstractID executionId,
String taskName,
int subtaskIndex,
int attemptNumber) {
public TaskMetricGroup addTask(TaskDeploymentDescriptor tdd) {
AbstractID vertexId = tdd.getVertexID();
AbstractID executionId = tdd.getExecutionId();
String taskName = tdd.getTaskName();
int subtaskIndex = tdd.getIndexInSubtaskGroup();
int attemptNumber = tdd.getAttemptNumber();
checkNotNull(executionId);
......@@ -115,6 +114,10 @@ public class TaskManagerJobMetricGroup extends JobMetricGroup {
}
}
// ------------------------------------------------------------------------
// Component Metric Group Specifics
// ------------------------------------------------------------------------
@Override
protected Iterable<? extends ComponentMetricGroup> subComponents() {
return tasks.values();
......
......@@ -16,13 +16,12 @@
* limitations under the License.
*/
package org.apache.flink.metrics.groups;
package org.apache.flink.runtime.metrics.groups;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.JobID;
import org.apache.flink.metrics.MetricRegistry;
import org.apache.flink.metrics.groups.scope.ScopeFormat.TaskManagerScopeFormat;
import org.apache.flink.util.AbstractID;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.scope.TaskManagerScopeFormat;
import java.util.HashMap;
import java.util.Map;
......@@ -33,7 +32,6 @@ import java.util.Map;
* <p>Contains extra logic for adding jobs with tasks, and removing jobs when they do
* not contain tasks any more
*/
@Internal
public class TaskManagerMetricGroup extends ComponentMetricGroup {
private final Map<JobID, TaskManagerJobMetricGroup> jobs = new HashMap<>();
......@@ -69,14 +67,11 @@ public class TaskManagerMetricGroup extends ComponentMetricGroup {
// job groups
// ------------------------------------------------------------------------
public TaskMetricGroup addTaskForJob(
JobID jobId,
String jobName,
AbstractID vertexID,
AbstractID executionId,
String taskName,
int subtaskIndex,
int attemptNumber) {
public TaskMetricGroup addTaskForJob(TaskDeploymentDescriptor tdd) {
JobID jobId = tdd.getJobID();
String jobName = tdd.getJobName().length() == 0
? tdd.getJobID().toString()
: tdd.getJobName();
// we cannot strictly lock both our map modification and the job group modification
// because it might lead to a deadlock
......@@ -94,8 +89,7 @@ public class TaskManagerMetricGroup extends ComponentMetricGroup {
// try to add another task. this may fail if we found a pre-existing job metrics
// group and it is closed concurrently
TaskMetricGroup taskGroup = currentJobGroup.addTask(
vertexID, executionId, taskName, subtaskIndex, attemptNumber);
TaskMetricGroup taskGroup = currentJobGroup.addTask(tdd);
if (taskGroup != null) {
// successfully added the next task
......@@ -126,6 +120,10 @@ public class TaskManagerMetricGroup extends ComponentMetricGroup {
return jobs.size();
}
// ------------------------------------------------------------------------
// Component Metric Group Specifics
// ------------------------------------------------------------------------
@Override
protected Iterable<? extends ComponentMetricGroup> subComponents() {
return jobs.values();
......
......@@ -16,11 +16,10 @@
* limitations under the License.
*/
package org.apache.flink.metrics.groups;
package org.apache.flink.runtime.metrics.groups;
import org.apache.flink.annotation.Internal;
import org.apache.flink.metrics.MetricRegistry;
import org.apache.flink.metrics.groups.scope.ScopeFormat.TaskScopeFormat;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.scope.TaskScopeFormat;
import org.apache.flink.util.AbstractID;
import javax.annotation.Nullable;
......@@ -34,7 +33,6 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
*
* <p>Contains extra logic for adding operators.
*/
@Internal
public class TaskMetricGroup extends ComponentMetricGroup {
/** The job metrics group containing this task metrics group */
......@@ -137,6 +135,7 @@ public class TaskMetricGroup extends ComponentMetricGroup {
// ------------------------------------------------------------------------
// operators and cleanup
// ------------------------------------------------------------------------
public OperatorMetricGroup addOperator(String name) {
OperatorMetricGroup operator = new OperatorMetricGroup(this.registry, this, name);
......@@ -160,6 +159,8 @@ public class TaskMetricGroup extends ComponentMetricGroup {
parent.removeTaskMetricGroup(executionId);
}
// ------------------------------------------------------------------------
// Component Metric Group Specifics
// ------------------------------------------------------------------------
@Override
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.metrics.scope;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
/**
* The scope format for the {@link org.apache.flink.runtime.metrics.groups.JobMetricGroup}.
*/
public class JobManagerJobScopeFormat extends ScopeFormat {
public JobManagerJobScopeFormat(String format, JobManagerScopeFormat parentFormat) {
super(format, parentFormat, new String[] {
SCOPE_ACTOR_HOST,
SCOPE_JOB_ID,
SCOPE_JOB_NAME
});
}
public String[] formatScope(JobManagerMetricGroup parent, JobID jid, String jobName) {
final String[] template = copyTemplate();
final String[] values = {
parent.hostname(),
valueOrNull(jid),
valueOrNull(jobName)
};
return bindVariables(template, values);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.metrics.scope;
/**
* The scope format for the {@link org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup}.
*/
public class JobManagerScopeFormat extends ScopeFormat {
public JobManagerScopeFormat(String format) {
super(format, null, new String[] {
SCOPE_ACTOR_HOST
});
}
public String[] formatScope(String hostname) {
final String[] template = copyTemplate();
final String[] values = { hostname };
return bindVariables(template, values);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.metrics.scope;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
/**
* The scope format for the {@link org.apache.flink.runtime.metrics.groups.OperatorMetricGroup}.
*/
public class OperatorScopeFormat extends ScopeFormat {
public OperatorScopeFormat(String format, TaskScopeFormat parentFormat) {
super(format, parentFormat, new String[] {
SCOPE_ACTOR_HOST,
SCOPE_TASKMANAGER_ID,
SCOPE_JOB_ID,
SCOPE_JOB_NAME,
SCOPE_TASK_VERTEX_ID,
SCOPE_TASK_ATTEMPT_ID,
SCOPE_TASK_NAME,
SCOPE_TASK_SUBTASK_INDEX,
SCOPE_TASK_ATTEMPT_NUM,
SCOPE_OPERATOR_NAME
});
}
public String[] formatScope(TaskMetricGroup parent, String operatorName) {
final String[] template = copyTemplate();
final String[] values = {
parent.parent().parent().hostname(),
parent.parent().parent().taskManagerId(),
valueOrNull(parent.parent().jobId()),
valueOrNull(parent.parent().jobName()),
valueOrNull(parent.vertexId()),
valueOrNull(parent.executionId()),
valueOrNull(parent.taskName()),
String.valueOf(parent.subtaskIndex()),
String.valueOf(parent.attemptNumber()),
valueOrNull(operatorName)
};
return bindVariables(template, values);
}
}
\ No newline at end of file
......@@ -16,16 +16,9 @@
* limitations under the License.
*/
package org.apache.flink.metrics.groups.scope;
package org.apache.flink.runtime.metrics.scope;
import org.apache.flink.api.common.JobID;
import org.apache.flink.metrics.CharacterFilter;
import org.apache.flink.metrics.groups.JobManagerMetricGroup;
import org.apache.flink.metrics.groups.JobMetricGroup;
import org.apache.flink.metrics.groups.TaskManagerJobMetricGroup;
import org.apache.flink.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.metrics.groups.TaskMetricGroup;
import org.apache.flink.util.AbstractID;
import java.util.ArrayList;
import java.util.HashMap;
......@@ -35,8 +28,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* This class represents the format after which the "scope" (or namespace) of the various
* component metric groups is built. Component metric groups
* ({@link org.apache.flink.metrics.groups.ComponentMetricGroup}), are for example
* component metric groups is built. Component metric groups are for example
* "TaskManager", "Task", or "Operator".
*
* <p>User defined scope formats allow users to include or exclude
......@@ -61,8 +53,8 @@ public abstract class ScopeFormat {
* If the scope format starts with this character, then the parent components scope
* format will be used as a prefix.
*
* <p>For example, if the {@link JobMetricGroup} format is {@code "*.<job_name>"}, and the
* {@link TaskManagerMetricGroup} format is {@code "<host>"}, then the job's metrics
* <p>For example, if the TaskManager's job format is {@code "*.<job_name>"}, and the
* TaskManager format is {@code "<host>"}, then the job's metrics
* will have {@code "<host>.<job_name>"} as their scope.
*/
public static final String SCOPE_INHERIT_PARENT = "*";
......@@ -147,182 +139,7 @@ public abstract class ScopeFormat {
* {@code "<host>.taskmanager.<tm_id>.<job_name>.<operator_name>.<subtask_index>"} */
public static final String DEFAULT_SCOPE_OPERATOR_GROUP =
concat(DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP, DEFAULT_SCOPE_OPERATOR_COMPONENT);
// ------------------------------------------------------------------------
// Formatters form the individual component types
// ------------------------------------------------------------------------
/**
* The scope format for the {@link JobManagerMetricGroup}.
*/
public static class JobManagerScopeFormat extends ScopeFormat {
public JobManagerScopeFormat(String format) {
super(format, null, new String[] {
SCOPE_ACTOR_HOST
});
}
public String[] formatScope(String hostname) {
final String[] template = copyTemplate();
final String[] values = { hostname };
return bindVariables(template, values);
}
}
/**
* The scope format for the {@link TaskManagerMetricGroup}.
*/
public static class TaskManagerScopeFormat extends ScopeFormat {
public TaskManagerScopeFormat(String format) {
super(format, null, new String[] {
SCOPE_ACTOR_HOST,
SCOPE_TASKMANAGER_ID
});
}
public String[] formatScope(String hostname, String taskManagerId) {
final String[] template = copyTemplate();
final String[] values = { hostname, taskManagerId };
return bindVariables(template, values);
}
}
// ------------------------------------------------------------------------
/**
* The scope format for the {@link JobMetricGroup}.
*/
public static class JobManagerJobScopeFormat extends ScopeFormat {
public JobManagerJobScopeFormat(String format, JobManagerScopeFormat parentFormat) {
super(format, parentFormat, new String[] {
SCOPE_ACTOR_HOST,
SCOPE_JOB_ID,
SCOPE_JOB_NAME
});
}
public String[] formatScope(JobManagerMetricGroup parent, JobID jid, String jobName) {
final String[] template = copyTemplate();
final String[] values = {
parent.hostname(),
valueOrNull(jid),
valueOrNull(jobName)
};
return bindVariables(template, values);
}
}
/**
* The scope format for the {@link JobMetricGroup}.
*/
public static class TaskManagerJobScopeFormat extends ScopeFormat {
public TaskManagerJobScopeFormat(String format, TaskManagerScopeFormat parentFormat) {
super(format, parentFormat, new String[] {
SCOPE_ACTOR_HOST,
SCOPE_TASKMANAGER_ID,
SCOPE_JOB_ID,
SCOPE_JOB_NAME
});
}
public String[] formatScope(TaskManagerMetricGroup parent, JobID jid, String jobName) {
final String[] template = copyTemplate();
final String[] values = {
parent.hostname(),
parent.taskManagerId(),
valueOrNull(jid),
valueOrNull(jobName)
};
return bindVariables(template, values);
}
}
// ------------------------------------------------------------------------
/**
* The scope format for the {@link TaskMetricGroup}.
*/
public static class TaskScopeFormat extends ScopeFormat {
public TaskScopeFormat(String format, TaskManagerJobScopeFormat parentFormat) {
super(format, parentFormat, new String[] {
SCOPE_ACTOR_HOST,
SCOPE_TASKMANAGER_ID,
SCOPE_JOB_ID,
SCOPE_JOB_NAME,
SCOPE_TASK_VERTEX_ID,
SCOPE_TASK_ATTEMPT_ID,
SCOPE_TASK_NAME,
SCOPE_TASK_SUBTASK_INDEX,
SCOPE_TASK_ATTEMPT_NUM
});
}
public String[] formatScope(
TaskManagerJobMetricGroup parent,
AbstractID vertexId, AbstractID attemptId,
String taskName, int subtask, int attemptNumber) {
final String[] template = copyTemplate();
final String[] values = {
parent.parent().hostname(),
parent.parent().taskManagerId(),
valueOrNull(parent.jobId()),
valueOrNull(parent.jobName()),
valueOrNull(vertexId),
valueOrNull(attemptId),
valueOrNull(taskName),
String.valueOf(subtask),
String.valueOf(attemptNumber)
};
return bindVariables(template, values);
}
}
// ------------------------------------------------------------------------
/**
* The scope format for the {@link org.apache.flink.metrics.groups.OperatorMetricGroup}.
*/
public static class OperatorScopeFormat extends ScopeFormat {
public OperatorScopeFormat(String format, TaskScopeFormat parentFormat) {
super(format, parentFormat, new String[] {
SCOPE_ACTOR_HOST,
SCOPE_TASKMANAGER_ID,
SCOPE_JOB_ID,
SCOPE_JOB_NAME,
SCOPE_TASK_VERTEX_ID,
SCOPE_TASK_ATTEMPT_ID,
SCOPE_TASK_NAME,
SCOPE_TASK_SUBTASK_INDEX,
SCOPE_TASK_ATTEMPT_NUM,
SCOPE_OPERATOR_NAME
});
}
public String[] formatScope(TaskMetricGroup parent, String operatorName) {
final String[] template = copyTemplate();
final String[] values = {
parent.parent().parent().hostname(),
parent.parent().parent().taskManagerId(),
valueOrNull(parent.parent().jobId()),
valueOrNull(parent.parent().jobName()),
valueOrNull(parent.vertexId()),
valueOrNull(parent.executionId()),
valueOrNull(parent.taskName()),
String.valueOf(parent.subtaskIndex()),
String.valueOf(parent.attemptNumber()),
valueOrNull(operatorName)
};
return bindVariables(template, values);
}
}
// ------------------------------------------------------------------------
// Scope Format Base
......@@ -466,12 +283,12 @@ public abstract class ScopeFormat {
return sb.toString();
}
static String valueOrNull(Object value) {
protected static String valueOrNull(Object value) {
return (value == null || (value instanceof String && ((String) value).isEmpty())) ?
"null" : value.toString();
}
static HashMap<String, Integer> arrayToMap(String[] array) {
protected static HashMap<String, Integer> arrayToMap(String[] array) {
HashMap<String, Integer> map = new HashMap<>(array.length);
for (int i = 0; i < array.length; i++) {
map.put(array[i], i);
......
......@@ -16,22 +16,16 @@
* limitations under the License.
*/
package org.apache.flink.metrics.groups.scope;
package org.apache.flink.runtime.metrics.scope;
import org.apache.flink.annotation.Internal;
import org.apache.flink.metrics.groups.scope.ScopeFormat.JobManagerJobScopeFormat;
import org.apache.flink.metrics.groups.scope.ScopeFormat.JobManagerScopeFormat;
import org.apache.flink.metrics.groups.scope.ScopeFormat.OperatorScopeFormat;
import org.apache.flink.metrics.groups.scope.ScopeFormat.TaskManagerJobScopeFormat;
import org.apache.flink.metrics.groups.scope.ScopeFormat.TaskManagerScopeFormat;
import org.apache.flink.metrics.groups.scope.ScopeFormat.TaskScopeFormat;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* A container for component scope formats.
*/
@Internal
public class ScopeFormats {
private final JobManagerScopeFormat jobManagerFormat;
......@@ -50,16 +44,16 @@ public class ScopeFormats {
this.jobManagerFormat = new JobManagerScopeFormat(ScopeFormat.DEFAULT_SCOPE_JOBMANAGER_COMPONENT);
this.jobManagerJobFormat = new JobManagerJobScopeFormat(
ScopeFormat.DEFAULT_SCOPE_JOBMANAGER_JOB_GROUP, this.jobManagerFormat);
ScopeFormat.DEFAULT_SCOPE_JOBMANAGER_JOB_GROUP, this.jobManagerFormat);
this.taskManagerFormat = new TaskManagerScopeFormat(ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_COMPONENT);
this.taskManagerJobFormat = new TaskManagerJobScopeFormat(
ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP, this.taskManagerFormat);
this.taskFormat = new TaskScopeFormat(
ScopeFormat.DEFAULT_SCOPE_TASK_GROUP, this.taskManagerJobFormat);
this.operatorFormat = new OperatorScopeFormat(
ScopeFormat.DEFAULT_SCOPE_OPERATOR_GROUP, this.taskFormat);
}
......@@ -102,6 +96,8 @@ public class ScopeFormats {
this.operatorFormat = checkNotNull(operatorFormat);
}
// ------------------------------------------------------------------------
// Accessors
// ------------------------------------------------------------------------
public JobManagerScopeFormat getJobManagerFormat() {
......@@ -127,4 +123,31 @@ public class ScopeFormats {
public OperatorScopeFormat getOperatorFormat() {
return this.operatorFormat;
}
// ------------------------------------------------------------------------
// Parsing from Config
// ------------------------------------------------------------------------
/**
* Creates the scope formats as defined in the given configuration
*
* @param config The configuration that defines the formats
* @return The ScopeFormats parsed from the configuration
*/
public static ScopeFormats fromConfig(Configuration config) {
String jmFormat = config.getString(
ConfigConstants.METRICS_SCOPE_NAMING_JM, ScopeFormat.DEFAULT_SCOPE_JOBMANAGER_GROUP);
String jmJobFormat = config.getString(
ConfigConstants.METRICS_SCOPE_NAMING_JM_JOB, ScopeFormat.DEFAULT_SCOPE_JOBMANAGER_JOB_GROUP);
String tmFormat = config.getString(
ConfigConstants.METRICS_SCOPE_NAMING_TM, ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_GROUP);
String tmJobFormat = config.getString(
ConfigConstants.METRICS_SCOPE_NAMING_TM_JOB, ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP);
String taskFormat = config.getString(
ConfigConstants.METRICS_SCOPE_NAMING_TASK, ScopeFormat.DEFAULT_SCOPE_TASK_GROUP);
String operatorFormat = config.getString(
ConfigConstants.METRICS_SCOPE_NAMING_OPERATOR, ScopeFormat.DEFAULT_SCOPE_OPERATOR_GROUP);
return new ScopeFormats(jmFormat, jmJobFormat, tmFormat, tmJobFormat, taskFormat, operatorFormat);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.metrics.scope;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
/**
* The scope format for the {@link org.apache.flink.runtime.metrics.groups.JobMetricGroup}.
*/
public class TaskManagerJobScopeFormat extends ScopeFormat {
public TaskManagerJobScopeFormat(String format, TaskManagerScopeFormat parentFormat) {
super(format, parentFormat, new String[] {
SCOPE_ACTOR_HOST,
SCOPE_TASKMANAGER_ID,
SCOPE_JOB_ID,
SCOPE_JOB_NAME
});
}
public String[] formatScope(TaskManagerMetricGroup parent, JobID jid, String jobName) {
final String[] template = copyTemplate();
final String[] values = {
parent.hostname(),
parent.taskManagerId(),
valueOrNull(jid),
valueOrNull(jobName)
};
return bindVariables(template, values);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.metrics.scope;
/**
* The scope format for the {@link org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup}.
*/
public class TaskManagerScopeFormat extends ScopeFormat {
public TaskManagerScopeFormat(String format) {
super(format, null, new String[] {
SCOPE_ACTOR_HOST,
SCOPE_TASKMANAGER_ID
});
}
public String[] formatScope(String hostname, String taskManagerId) {
final String[] template = copyTemplate();
final String[] values = { hostname, taskManagerId };
return bindVariables(template, values);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.metrics.scope;
import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
import org.apache.flink.util.AbstractID;
/**
* The scope format for the {@link org.apache.flink.runtime.metrics.groups.TaskMetricGroup}.
*/
public class TaskScopeFormat extends ScopeFormat {
public TaskScopeFormat(String format, TaskManagerJobScopeFormat parentFormat) {
super(format, parentFormat, new String[] {
SCOPE_ACTOR_HOST,
SCOPE_TASKMANAGER_ID,
SCOPE_JOB_ID,
SCOPE_JOB_NAME,
SCOPE_TASK_VERTEX_ID,
SCOPE_TASK_ATTEMPT_ID,
SCOPE_TASK_NAME,
SCOPE_TASK_SUBTASK_INDEX,
SCOPE_TASK_ATTEMPT_NUM
});
}
public String[] formatScope(
TaskManagerJobMetricGroup parent,
AbstractID vertexId, AbstractID attemptId,
String taskName, int subtask, int attemptNumber) {
final String[] template = copyTemplate();
final String[] values = {
parent.parent().hostname(),
parent.parent().taskManagerId(),
valueOrNull(parent.jobId()),
valueOrNull(parent.jobName()),
valueOrNull(vertexId),
valueOrNull(attemptId),
valueOrNull(taskName),
String.valueOf(subtask),
String.valueOf(attemptNumber)
};
return bindVariables(template, values);
}
}
\ No newline at end of file
......@@ -23,7 +23,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.execution.Environment;
......
......@@ -25,7 +25,7 @@ import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
......
......@@ -35,8 +35,8 @@ import org.apache.flink.api.common.{ExecutionConfig, JobID}
import org.apache.flink.configuration.{ConfigConstants, Configuration, GlobalConfiguration}
import org.apache.flink.core.fs.FileSystem
import org.apache.flink.core.io.InputSplitAssigner
import org.apache.flink.metrics.{Gauge, MetricGroup, MetricRegistry => FlinkMetricRegistry}
import org.apache.flink.metrics.groups.{JobManagerMetricGroup, UnregisteredMetricsGroup}
import org.apache.flink.metrics.{Gauge, MetricGroup}
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup
import org.apache.flink.runtime.accumulators.AccumulatorSnapshot
import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour}
import org.apache.flink.runtime.blob.BlobServer
......@@ -71,6 +71,8 @@ import org.apache.flink.runtime.messages.checkpoint.{DeclineCheckpoint, Abstract
import org.apache.flink.runtime.messages.webmonitor.InfoMessage
import org.apache.flink.runtime.messages.webmonitor._
import org.apache.flink.runtime.metrics.{MetricRegistry => FlinkMetricRegistry}
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup
import org.apache.flink.runtime.process.ProcessReaper
import org.apache.flink.runtime.security.SecurityUtils
import org.apache.flink.runtime.security.SecurityUtils.FlinkSecuredRunner
......@@ -1114,7 +1116,7 @@ class JobManager(
val jobMetrics = jobManagerMetricGroup match {
case Some(group) =>
group.addJob(jobGraph.getJobID, jobGraph.getName) match {
group.addJob(jobGraph) match {
case (jobGroup:Any) => jobGroup
case null => new UnregisteredMetricsGroup()
}
......
......@@ -38,8 +38,7 @@ import grizzled.slf4j.Logger
import org.apache.flink.configuration._
import org.apache.flink.core.fs.FileSystem
import org.apache.flink.core.memory.{HeapMemorySegment, HybridMemorySegment, MemorySegmentFactory, MemoryType}
import org.apache.flink.metrics.groups.TaskManagerMetricGroup
import org.apache.flink.metrics.{MetricGroup, Gauge => FlinkGauge, MetricRegistry => FlinkMetricRegistry}
import org.apache.flink.metrics.{MetricGroup, Gauge => FlinkGauge}
import org.apache.flink.runtime.accumulators.AccumulatorSnapshot
import org.apache.flink.runtime.clusterframework.messages.StopCluster
import org.apache.flink.runtime.clusterframework.types.ResourceID
......@@ -65,6 +64,8 @@ import org.apache.flink.runtime.messages.StackTraceSampleMessages.{ResponseStack
import org.apache.flink.runtime.messages.TaskManagerMessages._
import org.apache.flink.runtime.messages.TaskMessages._
import org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, NotifyCheckpointComplete, TriggerCheckpoint}
import org.apache.flink.runtime.metrics.{MetricRegistry => FlinkMetricRegistry}
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup
import org.apache.flink.runtime.process.ProcessReaper
import org.apache.flink.runtime.security.SecurityUtils
import org.apache.flink.runtime.security.SecurityUtils.FlinkSecuredRunner
......@@ -1092,11 +1093,7 @@ class TaskManager(
jobName = tdd.getJobName
}
val taskMetricGroup = taskManagerMetricGroup
.addTaskForJob(
tdd.getJobID, jobName,
tdd.getVertexID, tdd.getExecutionId, tdd.getTaskName,
tdd.getIndexInSubtaskGroup, tdd.getAttemptNumber)
val taskMetricGroup = taskManagerMetricGroup.addTaskForJob(tdd)
val task = new Task(
tdd,
......
......@@ -21,7 +21,6 @@ package org.apache.flink.runtime.testingUtils
import akka.actor.ActorRef
import org.apache.flink.configuration.Configuration
import org.apache.flink.metrics.MetricRegistry
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
......@@ -30,6 +29,7 @@ import org.apache.flink.runtime.instance.InstanceManager
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
import org.apache.flink.runtime.jobmanager.{JobManager, SubmittedJobGraphStore}
import org.apache.flink.runtime.leaderelection.LeaderElectionService
import org.apache.flink.runtime.metrics.MetricRegistry
import scala.concurrent.duration._
import scala.language.postfixOps
......
......@@ -23,10 +23,8 @@ import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.MetricConfig;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.MetricRegistry;
import org.apache.flink.metrics.groups.AbstractMetricGroup;
import org.apache.flink.metrics.groups.JobManagerMetricGroup;
import org.apache.flink.metrics.reporter.MetricReporter;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.blob.BlobKey;
......@@ -44,6 +42,8 @@ import org.apache.flink.runtime.jobmanager.Tasks;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.messages.Messages;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
......@@ -274,18 +274,18 @@ public class ExecutionGraphMetricsTest extends TestLogger {
private final Map<String, Metric> metrics = new HashMap<>();
@Override
public void open(Configuration config) {}
public void open(MetricConfig config) {}
@Override
public void close() {}
@Override
public void notifyOfAddedMetric(Metric metric, String metricName, AbstractMetricGroup group) {
public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {
metrics.put(metricName, metric);
}
@Override
public void notifyOfRemovedMetric(Metric metric, String metricName, AbstractMetricGroup group) {
public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) {
metrics.remove(metricName);
}
......
......@@ -16,15 +16,18 @@
* limitations under the License.
*/
package org.apache.flink.metrics;
package org.apache.flink.runtime.metrics;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.groups.AbstractMetricGroup;
import org.apache.flink.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.metrics.groups.scope.ScopeFormats;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.MetricConfig;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.reporter.Scheduled;
import org.apache.flink.metrics.util.TestReporter;
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.runtime.metrics.scope.ScopeFormats;
import org.apache.flink.runtime.metrics.util.TestReporter;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
......@@ -53,7 +56,7 @@ public class MetricRegistryTest extends TestLogger {
public static boolean wasOpened = false;
@Override
public void open(Configuration config) {
public void open(MetricConfig config) {
wasOpened = true;
}
}
......@@ -73,7 +76,7 @@ public class MetricRegistryTest extends TestLogger {
protected static class TestReporter2 extends TestReporter {
@Override
public void open(Configuration config) {
public void open(MetricConfig config) {
Assert.assertEquals("hello", config.getString("arg1", null));
Assert.assertEquals("world", config.getString("arg2", null));
}
......@@ -144,14 +147,14 @@ public class MetricRegistryTest extends TestLogger {
public static boolean removeCalled = false;
@Override
public void notifyOfAddedMetric(Metric metric, String metricName, AbstractMetricGroup group) {
public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {
addCalled = true;
assertTrue(metric instanceof Counter);
assertEquals("rootCounter", metricName);
}
@Override
public void notifyOfRemovedMetric(Metric metric, String metricName, AbstractMetricGroup group) {
public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) {
removeCalled = true;
Assert.assertTrue(metric instanceof Counter);
Assert.assertEquals("rootCounter", metricName);
......
......@@ -15,12 +15,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.metrics.groups;
package org.apache.flink.runtime.metrics.groups;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.MetricRegistry;
import org.apache.flink.metrics.groups.scope.ScopeFormat.JobManagerScopeFormat;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.scope.JobManagerScopeFormat;
import org.junit.Test;
import static org.junit.Assert.assertArrayEquals;
......@@ -44,9 +45,9 @@ public class JobManagerGroupTest {
final String jobName1 = "testjob";
final String jobName2 = "anotherJob";
JobManagerJobMetricGroup jmJobGroup11 = group.addJob(jid1, jobName1);
JobManagerJobMetricGroup jmJobGroup12 = group.addJob(jid1, jobName1);
JobManagerJobMetricGroup jmJobGroup21 = group.addJob(jid2, jobName2);
JobManagerJobMetricGroup jmJobGroup11 = group.addJob(new JobGraph(jid1, jobName1));
JobManagerJobMetricGroup jmJobGroup12 = group.addJob(new JobGraph(jid1, jobName1));
JobManagerJobMetricGroup jmJobGroup21 = group.addJob(new JobGraph(jid2, jobName2));
assertEquals(jmJobGroup11, jmJobGroup12);
......@@ -74,8 +75,8 @@ public class JobManagerGroupTest {
final String jobName1 = "testjob";
final String jobName2 = "anotherJob";
JobManagerJobMetricGroup jmJobGroup11 = group.addJob(jid1, jobName1);
JobManagerJobMetricGroup jmJobGroup21 = group.addJob(jid2, jobName2);
JobManagerJobMetricGroup jmJobGroup11 = group.addJob(new JobGraph(jid1, jobName1));
JobManagerJobMetricGroup jmJobGroup21 = group.addJob(new JobGraph(jid2, jobName2));
group.close();
......
......@@ -16,13 +16,13 @@
* limitations under the License.
*/
package org.apache.flink.metrics.groups;
package org.apache.flink.runtime.metrics.groups;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.MetricRegistry;
import org.apache.flink.metrics.groups.scope.ScopeFormat.JobManagerJobScopeFormat;
import org.apache.flink.metrics.groups.scope.ScopeFormat.JobManagerScopeFormat;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.scope.JobManagerJobScopeFormat;
import org.apache.flink.runtime.metrics.scope.JobManagerScopeFormat;
import org.junit.Test;
import static org.junit.Assert.assertArrayEquals;
......
......@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.metrics.groups;
package org.apache.flink.runtime.metrics.groups;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
......@@ -25,8 +25,8 @@ import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.HistogramStatistics;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.MetricRegistry;
import org.apache.flink.metrics.util.TestReporter;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.util.TestReporter;
import org.junit.Assert;
import org.junit.Test;
......@@ -88,7 +88,7 @@ public class MetricGroupRegistrationTest {
public static String lastPassedName;
@Override
public void notifyOfAddedMetric(Metric metric, String metricName, AbstractMetricGroup group) {
public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {
lastPassedMetric = metric;
lastPassedName = metricName;
}
......
......@@ -16,13 +16,13 @@
* limitations under the License.
*/
package org.apache.flink.metrics.groups;
package org.apache.flink.runtime.metrics.groups;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.junit.After;
import org.junit.Before;
......@@ -120,12 +120,12 @@ public class MetricGroupTest {
}
@Override
public void register(Metric metric, String name, AbstractMetricGroup parent) {
public void register(Metric metric, String name, MetricGroup parent) {
fail("Metric should never be registered");
}
@Override
public void unregister(Metric metric, String name, AbstractMetricGroup parent) {
public void unregister(Metric metric, String name, MetricGroup parent) {
fail("Metric should never be un-registered");
}
}
......
......@@ -16,11 +16,11 @@
* limitations under the License.
*/
package org.apache.flink.metrics.groups;
package org.apache.flink.runtime.metrics.groups;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.util.AbstractID;
import org.junit.Test;
......
......@@ -16,16 +16,28 @@
* limitations under the License.
*/
package org.apache.flink.metrics.groups;
package org.apache.flink.runtime.metrics.groups;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.MetricRegistry;
import org.apache.flink.metrics.groups.scope.ScopeFormat.TaskManagerScopeFormat;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.scope.TaskManagerScopeFormat;
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.SerializedValue;
import org.junit.Test;
import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
import static org.junit.Assert.*;
public class TaskManagerGroupTest {
......@@ -35,7 +47,7 @@ public class TaskManagerGroupTest {
// ------------------------------------------------------------------------
@Test
public void addAndRemoveJobs() {
public void addAndRemoveJobs() throws IOException {
MetricRegistry registry = new MetricRegistry(new Configuration());
final TaskManagerMetricGroup group = new TaskManagerMetricGroup(
......@@ -48,19 +60,79 @@ public class TaskManagerGroupTest {
final String jobName1 = "testjob";
final String jobName2 = "anotherJob";
final AbstractID vertex11 = new AbstractID();
final AbstractID vertex12 = new AbstractID();
final AbstractID vertex13 = new AbstractID();
final AbstractID vertex21 = new AbstractID();
final AbstractID execution11 = new AbstractID();
final AbstractID execution12 = new AbstractID();
final AbstractID execution13 = new AbstractID();
final AbstractID execution21 = new AbstractID();
final JobVertexID vertex11 = new JobVertexID();
final JobVertexID vertex12 = new JobVertexID();
final JobVertexID vertex13 = new JobVertexID();
final JobVertexID vertex21 = new JobVertexID();
final ExecutionAttemptID execution11 = new ExecutionAttemptID();
final ExecutionAttemptID execution12 = new ExecutionAttemptID();
final ExecutionAttemptID execution13 = new ExecutionAttemptID();
final ExecutionAttemptID execution21 = new ExecutionAttemptID();
TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(
jid1,
jobName1,
vertex11,
execution11,
new SerializedValue<>(new ExecutionConfig()),
"test",
17, 18, 0,
new Configuration(), new Configuration(),
"",
new ArrayList<ResultPartitionDeploymentDescriptor>(),
new ArrayList<InputGateDeploymentDescriptor>(),
new ArrayList<BlobKey>(),
new ArrayList<URL>(), 0);
TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(
jid1,
jobName1,
vertex12,
execution12,
new SerializedValue<>(new ExecutionConfig()),
"test",
13, 18, 1,
new Configuration(), new Configuration(),
"",
new ArrayList<ResultPartitionDeploymentDescriptor>(),
new ArrayList<InputGateDeploymentDescriptor>(),
new ArrayList<BlobKey>(),
new ArrayList<URL>(), 0);
TaskDeploymentDescriptor tdd3 = new TaskDeploymentDescriptor(
jid2,
jobName2,
vertex21,
execution21,
new SerializedValue<>(new ExecutionConfig()),
"test",
7, 18, 2,
new Configuration(), new Configuration(),
"",
new ArrayList<ResultPartitionDeploymentDescriptor>(),
new ArrayList<InputGateDeploymentDescriptor>(),
new ArrayList<BlobKey>(),
new ArrayList<URL>(), 0);
TaskDeploymentDescriptor tdd4 = new TaskDeploymentDescriptor(
jid1,
jobName1,
vertex13,
execution13,
new SerializedValue<>(new ExecutionConfig()),
"test",
0, 18, 0,
new Configuration(), new Configuration(),
"",
new ArrayList<ResultPartitionDeploymentDescriptor>(),
new ArrayList<InputGateDeploymentDescriptor>(),
new ArrayList<BlobKey>(),
new ArrayList<URL>(), 0);
TaskMetricGroup tmGroup11 = group.addTaskForJob(jid1, jobName1, vertex11, execution11, "test", 17, 0);
TaskMetricGroup tmGroup12 = group.addTaskForJob(jid1, jobName1, vertex12, execution12, "test", 13, 1);
TaskMetricGroup tmGroup21 = group.addTaskForJob(jid2, jobName2, vertex21, execution21, "test", 7, 2);
TaskMetricGroup tmGroup11 = group.addTaskForJob(tdd1);
TaskMetricGroup tmGroup12 = group.addTaskForJob(tdd2);
TaskMetricGroup tmGroup21 = group.addTaskForJob(tdd3);
assertEquals(2, group.numRegisteredJobMetricGroups());
assertFalse(tmGroup11.parent().isClosed());
......@@ -80,7 +152,7 @@ public class TaskManagerGroupTest {
assertEquals(1, group.numRegisteredJobMetricGroups());
// add one more to job one
TaskMetricGroup tmGroup13 = group.addTaskForJob(jid1, jobName1, vertex13, execution13, "test", 0, 0);
TaskMetricGroup tmGroup13 = group.addTaskForJob(tdd4);
tmGroup12.close();
tmGroup13.close();
......@@ -94,7 +166,7 @@ public class TaskManagerGroupTest {
}
@Test
public void testCloseClosesAll() {
public void testCloseClosesAll() throws IOException {
MetricRegistry registry = new MetricRegistry(new Configuration());
final TaskManagerMetricGroup group = new TaskManagerMetricGroup(
registry, "localhost", new AbstractID().toString());
......@@ -106,17 +178,62 @@ public class TaskManagerGroupTest {
final String jobName1 = "testjob";
final String jobName2 = "anotherJob";
final AbstractID vertex11 = new AbstractID();
final AbstractID vertex12 = new AbstractID();
final AbstractID vertex21 = new AbstractID();
final JobVertexID vertex11 = new JobVertexID();
final JobVertexID vertex12 = new JobVertexID();
final JobVertexID vertex21 = new JobVertexID();
final ExecutionAttemptID execution11 = new ExecutionAttemptID();
final ExecutionAttemptID execution12 = new ExecutionAttemptID();
final ExecutionAttemptID execution21 = new ExecutionAttemptID();
TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(
jid1,
jobName1,
vertex11,
execution11,
new SerializedValue<>(new ExecutionConfig()),
"test",
17, 18, 0,
new Configuration(), new Configuration(),
"",
new ArrayList<ResultPartitionDeploymentDescriptor>(),
new ArrayList<InputGateDeploymentDescriptor>(),
new ArrayList<BlobKey>(),
new ArrayList<URL>(), 0);
TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(
jid1,
jobName1,
vertex12,
execution12,
new SerializedValue<>(new ExecutionConfig()),
"test",
13, 18, 1,
new Configuration(), new Configuration(),
"",
new ArrayList<ResultPartitionDeploymentDescriptor>(),
new ArrayList<InputGateDeploymentDescriptor>(),
new ArrayList<BlobKey>(),
new ArrayList<URL>(), 0);
final AbstractID execution11 = new AbstractID();
final AbstractID execution12 = new AbstractID();
final AbstractID execution21 = new AbstractID();
TaskDeploymentDescriptor tdd3 = new TaskDeploymentDescriptor(
jid2,
jobName2,
vertex21,
execution21,
new SerializedValue<>(new ExecutionConfig()),
"test",
7, 18, 1,
new Configuration(), new Configuration(),
"",
new ArrayList<ResultPartitionDeploymentDescriptor>(),
new ArrayList<InputGateDeploymentDescriptor>(),
new ArrayList<BlobKey>(),
new ArrayList<URL>(), 0);
TaskMetricGroup tmGroup11 = group.addTaskForJob(jid1, jobName1, vertex11, execution11, "test", 17, 1);
TaskMetricGroup tmGroup12 = group.addTaskForJob(jid1, jobName1, vertex12, execution12, "test", 13, 2);
TaskMetricGroup tmGroup21 = group.addTaskForJob(jid2, jobName2, vertex21, execution21, "test", 7, 1);
TaskMetricGroup tmGroup11 = group.addTaskForJob(tdd1);
TaskMetricGroup tmGroup12 = group.addTaskForJob(tdd2);
TaskMetricGroup tmGroup21 = group.addTaskForJob(tdd3);
group.close();
......
......@@ -16,13 +16,13 @@
* limitations under the License.
*/
package org.apache.flink.metrics.groups;
package org.apache.flink.runtime.metrics.groups;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.MetricRegistry;
import org.apache.flink.metrics.groups.scope.ScopeFormat.TaskManagerJobScopeFormat;
import org.apache.flink.metrics.groups.scope.ScopeFormat.TaskManagerScopeFormat;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.scope.TaskManagerJobScopeFormat;
import org.apache.flink.runtime.metrics.scope.TaskManagerScopeFormat;
import org.junit.Test;
......
......@@ -16,16 +16,17 @@
* limitations under the License.
*/
package org.apache.flink.metrics.groups;
package org.apache.flink.runtime.metrics.groups;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.MetricRegistry;
import org.apache.flink.metrics.groups.scope.ScopeFormat;
import org.apache.flink.metrics.groups.scope.ScopeFormat.TaskManagerJobScopeFormat;
import org.apache.flink.metrics.groups.scope.ScopeFormat.TaskManagerScopeFormat;
import org.apache.flink.metrics.groups.scope.ScopeFormat.TaskScopeFormat;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.scope.ScopeFormat;
import org.apache.flink.runtime.metrics.scope.TaskManagerJobScopeFormat;
import org.apache.flink.runtime.metrics.scope.TaskManagerScopeFormat;
import org.apache.flink.runtime.metrics.scope.TaskScopeFormat;
import org.apache.flink.util.AbstractID;
import org.junit.After;
......@@ -149,13 +150,13 @@ public class TaskMetricGroupTest {
}
@Override
public void register(Metric metric, String metricName, AbstractMetricGroup group) {
public void register(Metric metric, String metricName, MetricGroup group) {
super.register(metric, metricName, group);
counter++;
}
@Override
public void unregister(Metric metric, String metricName, AbstractMetricGroup group) {
public void unregister(Metric metric, String metricName, MetricGroup group) {
super.unregister(metric, metricName, group);
counter--;
}
......
......@@ -16,26 +16,26 @@
* limitations under the License.
*/
package org.apache.flink.metrics.util;
package org.apache.flink.runtime.metrics.util;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.groups.AbstractMetricGroup;
import org.apache.flink.metrics.MetricConfig;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.reporter.AbstractReporter;
public class TestReporter extends AbstractReporter {
@Override
public void open(Configuration config) {}
public void open(MetricConfig config) {}
@Override
public void close() {}
@Override
public void notifyOfAddedMetric(Metric metric, String metricName, AbstractMetricGroup group) {}
public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {}
@Override
public void notifyOfRemovedMetric(Metric metric, String metricName, AbstractMetricGroup group) {}
public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) {}
@Override
public String filterCharacters(String input) {
......
......@@ -23,7 +23,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.execution.Environment;
......
......@@ -19,12 +19,12 @@
package org.apache.flink.runtime.operators.testutils;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.UnmodifiableConfiguration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.execution.Environment;
......@@ -39,10 +39,10 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.types.Record;
......
......@@ -22,12 +22,12 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.MetricRegistry;
import org.apache.flink.metrics.groups.IOMetricGroup;
import org.apache.flink.metrics.groups.TaskManagerJobMetricGroup;
import org.apache.flink.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.metrics.groups.TaskMetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.metrics.groups.IOMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
......
......@@ -22,7 +22,7 @@ import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
......
......@@ -17,11 +17,10 @@
*/
package org.apache.flink.runtime.taskmanager;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
......
......@@ -21,7 +21,7 @@ package org.apache.flink.runtime.taskmanager;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
......
......@@ -127,6 +127,13 @@ under the License.
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-metrics-jmx</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-base_2.10</artifactId>
......
......@@ -103,6 +103,13 @@ under the License.
<version>${kafka.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-metrics-jmx</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
......
......@@ -124,6 +124,13 @@ under the License.
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-metrics-jmx</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.10</artifactId>
......
......@@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.jmx.JMXReporter;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.util.ForkableFlinkMiniCluster;
......@@ -101,7 +102,7 @@ public abstract class KafkaTestBase extends TestLogger {
flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16);
flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");
flinkConfig.setString(ConfigConstants.METRICS_REPORTER_CLASS, "org.apache.flink.metrics.reporter.JMXReporter");
flinkConfig.setString(ConfigConstants.METRICS_REPORTER_CLASS, JMXReporter.class.getName());
flink = new ForkableFlinkMiniCluster(flinkConfig, false);
flink.start();
......
......@@ -24,7 +24,7 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.groups.IOMetricGroup;
import org.apache.flink.runtime.metrics.groups.IOMetricGroup;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
......
......@@ -21,7 +21,7 @@ package org.apache.flink.streaming.runtime.io;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.groups.IOMetricGroup;
import org.apache.flink.runtime.metrics.groups.IOMetricGroup;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
......
......@@ -25,7 +25,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.event.AbstractEvent;
......
......@@ -23,7 +23,7 @@ import akka.actor.ActorRef;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
......
......@@ -24,13 +24,13 @@ import akka.actor.ActorRef
import org.apache.flink.configuration.Configuration
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore
import org.apache.flink.metrics.MetricRegistry
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
import org.apache.flink.runtime.instance.InstanceManager
import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
import org.apache.flink.runtime.leaderelection.LeaderElectionService
import org.apache.flink.runtime.metrics.MetricRegistry
import org.apache.flink.runtime.testingUtils.TestingJobManagerLike
import scala.concurrent.duration.FiniteDuration
......
......@@ -24,7 +24,6 @@ import akka.actor.ActorRef
import org.apache.flink.api.common.JobID
import org.apache.flink.configuration.{Configuration => FlinkConfiguration, ConfigConstants}
import org.apache.flink.metrics.MetricRegistry
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore
import org.apache.flink.runtime.clusterframework.ApplicationStatus
......@@ -35,6 +34,7 @@ import org.apache.flink.runtime.jobmanager.{SubmittedJobGraphStore, JobManager}
import org.apache.flink.runtime.leaderelection.LeaderElectionService
import org.apache.flink.runtime.messages.JobManagerMessages.{RequestJobStatus, CurrentJobStatus, JobNotFound}
import org.apache.flink.runtime.messages.Messages.Acknowledge
import org.apache.flink.runtime.metrics.MetricRegistry
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
import org.apache.flink.runtime.instance.InstanceManager
import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkScheduler}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册