提交 ee3c7a88 编写于 作者: T Till Rohrmann 提交者: zentol

[FLINK-3951] Add Histogram metric type

This closes #2112
上级 d43bf8d9
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.metrics;
import org.apache.flink.annotation.PublicEvolving;
/**
* Histogram interface to be used with Flink's metrics system.
*
* The histogram allows to record values, get the current count of recorded values and create
* histogram statistics for the currently seen elements.
*/
@PublicEvolving
public interface Histogram extends Metric {
/**
* Update the histogram with the given value.
*
* @param value Value to update the histogram with
*/
void update(long value);
/**
* Get the count of seen elements.
*
* @return Count of seen elements
*/
long getCount();
/**
* Create statistics for the currently recorded elements.
*
* @return Statistics about the currently recorded elements
*/
HistogramStatistics getStatistics();
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.metrics;
import org.apache.flink.annotation.PublicEvolving;
/**
* Histogram statistics represent the current snapshot of elements recorded in the histogram.
*
* The histogram statistics allow to calculate values for quantiles, the mean, the standard
* deviation, the minimum and the maximum.
*/
@PublicEvolving
public abstract class HistogramStatistics {
/**
* Returns the value for the given quantile based on the represented histogram statistics.
*
* @param quantile Quantile to calculate the value for
* @return Value for the given quantile
*/
public abstract double getQuantile(double quantile);
/**
* Returns the elements of the statistics' sample
*
* @return Elements of the statistics' sample
*/
public abstract long[] getValues();
/**
* Returns the size of the statistics' sample
*
* @return Size of the statistics' sample
*/
public abstract int size();
/**
* Returns the mean of the histogram values.
*
* @return Mean of the histogram values
*/
public abstract double getMean();
/**
* Returns the standard deviation of the distribution reflected by the histogram statistics.
*
* @return Standard deviation of histogram distribution
*/
public abstract double getStdDev();
/**
* Returns the maximum value of the histogram.
*
* @return Maximum value of the histogram
*/
public abstract long getMax();
/**
* Returns the minimum value of the histogram.
*
* @return Minimum value of the histogram
*/
public abstract long getMin();
}
......@@ -115,6 +115,26 @@ public interface MetricGroup {
*/
<T, G extends Gauge<T>> G gauge(String name, G gauge);
/**
* Registers a new {@link Histogram} with Flink.
*
* @param name name of the histogram
* @param histogram histogram to register
* @param <H> histogram type
* @return the registered histogram
*/
<H extends Histogram> H histogram(String name, H histogram);
/**
* Registers a new {@link Histogram} with Flink.
*
* @param name name of the histogram
* @param histogram histogram to register
* @param <H> histogram type
* @return the registered histogram
*/
<H extends Histogram> H histogram(int name, H histogram);
// ------------------------------------------------------------------------
// Groups
// ------------------------------------------------------------------------
......
......@@ -21,6 +21,7 @@ package org.apache.flink.metrics.groups;
import org.apache.flink.annotation.Internal;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.MetricRegistry;
......@@ -172,6 +173,17 @@ public abstract class AbstractMetricGroup implements MetricGroup {
return gauge;
}
@Override
public <H extends Histogram> H histogram(int name, H histogram) {
return histogram(String.valueOf(name), histogram);
}
@Override
public <H extends Histogram> H histogram(String name, H histogram) {
addMetric(name, histogram);
return histogram;
}
/**
* Adds the given metric to the group and registers it at the registry, if the group
* is not yet closed, and if no metric with the same name has been registered before.
......
......@@ -21,6 +21,7 @@ package org.apache.flink.metrics.groups;
import org.apache.flink.annotation.Internal;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.SimpleCounter;
......@@ -71,7 +72,16 @@ public class UnregisteredMetricsGroup implements MetricGroup {
return gauge;
}
@Override
public <H extends Histogram> H histogram(int name, H histogram) {
return histogram;
}
@Override
public <H extends Histogram> H histogram(String name, H histogram) {
return histogram;
}
@Override
public MetricGroup addGroup(int name) {
return addGroup(String.valueOf(name));
......
......@@ -21,8 +21,11 @@ package org.apache.flink.metrics.reporter;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.groups.AbstractMetricGroup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
......@@ -32,9 +35,11 @@ import java.util.Map;
*/
@PublicEvolving
public abstract class AbstractReporter implements MetricReporter {
protected final Logger log = LoggerFactory.getLogger(getClass());
protected final Map<Gauge<?>, String> gauges = new HashMap<>();
protected final Map<Counter, String> counters = new HashMap<>();
protected final Map<Histogram, String> histograms = new HashMap<>();
@Override
public void notifyOfAddedMetric(Metric metric, String metricName, AbstractMetricGroup group) {
......@@ -45,6 +50,11 @@ public abstract class AbstractReporter implements MetricReporter {
counters.put((Counter) metric, name);
} else if (metric instanceof Gauge) {
gauges.put((Gauge<?>) metric, name);
} else if (metric instanceof Histogram) {
histograms.put((Histogram) metric, name);
} else {
log.warn("Cannot add unknown metric type {}. This indicates that the reporter " +
"does not support this metric type.", metric.getClass().getName());
}
}
}
......@@ -56,6 +66,11 @@ public abstract class AbstractReporter implements MetricReporter {
counters.remove(metric);
} else if (metric instanceof Gauge) {
gauges.remove(metric);
} else if (metric instanceof Histogram) {
histograms.remove(metric);
} else {
log.warn("Cannot remove unknown metric type {}. This indicates that the reporter " +
"does not support this metric type.", metric.getClass().getName());
}
}
}
......
......@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.groups.AbstractMetricGroup;
import org.apache.flink.util.NetUtils;
......@@ -146,8 +147,11 @@ public class JMXReporter implements MetricReporter {
jmxMetric = new JmxGauge((Gauge<?>) metric);
} else if (metric instanceof Counter) {
jmxMetric = new JmxCounter((Counter) metric);
} else if (metric instanceof Histogram) {
jmxMetric = new JmxHistogram((Histogram) metric);
} else {
LOG.error("Unknown metric type: " + metric.getClass().getName());
LOG.error("Cannot add unknown metric type: {}. This indicates that the metric type " +
"is not supported by this reporter.", metric.getClass().getName());
return;
}
......@@ -285,7 +289,7 @@ public class JMXReporter implements MetricReporter {
private static class JmxCounter extends AbstractBean implements JmxCounterMBean {
private Counter counter;
public JmxCounter(Counter counter) {
JmxCounter(Counter counter) {
this.counter = counter;
}
......@@ -303,7 +307,7 @@ public class JMXReporter implements MetricReporter {
private final Gauge<?> gauge;
public JmxGauge(Gauge<?> gauge) {
JmxGauge(Gauge<?> gauge) {
this.gauge = gauge;
}
......@@ -313,6 +317,94 @@ public class JMXReporter implements MetricReporter {
}
}
public interface JmxHistogramMBean extends MetricMBean {
long getCount();
double getMean();
double getStdDev();
long getMax();
long getMin();
double getMedian();
double get75thPercentile();
double get95thPercentile();
double get98thPercentile();
double get99thPercentile();
double get999thPercentile();
}
private static class JmxHistogram extends AbstractBean implements JmxHistogramMBean {
private final Histogram histogram;
JmxHistogram(Histogram histogram) {
this.histogram = histogram;
}
@Override
public long getCount() {
return histogram.getCount();
}
@Override
public double getMean() {
return histogram.getStatistics().getMean();
}
@Override
public double getStdDev() {
return histogram.getStatistics().getStdDev();
}
@Override
public long getMax() {
return histogram.getStatistics().getMax();
}
@Override
public long getMin() {
return histogram.getStatistics().getMin();
}
@Override
public double getMedian() {
return histogram.getStatistics().getQuantile(0.5);
}
@Override
public double get75thPercentile() {
return histogram.getStatistics().getQuantile(0.75);
}
@Override
public double get95thPercentile() {
return histogram.getStatistics().getQuantile(0.95);
}
@Override
public double get98thPercentile() {
return histogram.getStatistics().getQuantile(0.98);
}
@Override
public double get99thPercentile() {
return histogram.getStatistics().getQuantile(0.99);
}
@Override
public double get999thPercentile() {
return histogram.getStatistics().getQuantile(0.999);
}
}
/**
* JMX Server implementation that JMX clients can connect to.
*
......
......@@ -25,12 +25,13 @@ import org.apache.flink.metrics.groups.scope.ScopeFormats;
import org.apache.flink.metrics.reporter.Scheduled;
import org.apache.flink.metrics.util.TestReporter;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;
import static org.junit.Assert.*;
public class MetricRegistryTest {
public class MetricRegistryTest extends TestLogger {
/**
* Verifies that the reporter class argument is correctly used to instantiate and open the reporter.
......
......@@ -20,6 +20,8 @@ package org.apache.flink.metrics.groups;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.HistogramStatistics;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.MetricRegistry;
......@@ -57,6 +59,25 @@ public class MetricGroupRegistrationTest {
Assert.assertEquals(gauge, TestReporter1.lastPassedMetric);
assertEquals("gauge", TestReporter1.lastPassedName);
Histogram histogram = root.histogram("histogram", new Histogram() {
@Override
public void update(long value) {
}
@Override
public long getCount() {
return 0;
}
@Override
public HistogramStatistics getStatistics() {
return null;
}
});
Assert.assertEquals(histogram, TestReporter1.lastPassedMetric);
assertEquals("histogram", TestReporter1.lastPassedName);
registry.shutdown();
}
......
......@@ -20,11 +20,16 @@ package org.apache.flink.metrics.reporter;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.HistogramStatistics;
import org.apache.flink.metrics.MetricRegistry;
import org.apache.flink.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.metrics.util.TestReporter;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
import javax.management.MBeanAttributeInfo;
import javax.management.MBeanInfo;
import javax.management.MBeanServer;
import javax.management.MBeanServerConnection;
import javax.management.ObjectName;
......@@ -37,7 +42,7 @@ import static org.apache.flink.metrics.MetricRegistry.KEY_METRICS_JMX_PORT;
import static org.apache.flink.metrics.MetricRegistry.KEY_METRICS_REPORTER_CLASS;
import static org.junit.Assert.assertEquals;
public class JMXReporterTest {
public class JMXReporterTest extends TestLogger {
@Test
public void testReplaceInvalidChars() {
......@@ -188,4 +193,105 @@ public class JMXReporterTest {
rep2.close();
reg.shutdown();
}
/**
* Tests that histograms are properly reported via the JMXReporter.
*/
@Test
public void testHistogramReporting() throws Exception {
MetricRegistry registry = null;
String histogramName = "histogram";
try {
Configuration config = new Configuration();
registry = new MetricRegistry(config);
TaskManagerMetricGroup metricGroup = new TaskManagerMetricGroup(registry, "localhost", "tmId");
TestingHistogram histogram = new TestingHistogram();
metricGroup.histogram(histogramName, histogram);
MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
ObjectName objectName = new ObjectName(JMXReporter.generateJmxName(histogramName, metricGroup.getScopeComponents()));
MBeanInfo info = mBeanServer.getMBeanInfo(objectName);
MBeanAttributeInfo[] attributeInfos = info.getAttributes();
assertEquals(11, attributeInfos.length);
assertEquals(histogram.getCount(), mBeanServer.getAttribute(objectName, "Count"));
assertEquals(histogram.getStatistics().getMean(), mBeanServer.getAttribute(objectName, "Mean"));
assertEquals(histogram.getStatistics().getStdDev(), mBeanServer.getAttribute(objectName, "StdDev"));
assertEquals(histogram.getStatistics().getMax(), mBeanServer.getAttribute(objectName, "Max"));
assertEquals(histogram.getStatistics().getMin(), mBeanServer.getAttribute(objectName, "Min"));
assertEquals(histogram.getStatistics().getQuantile(0.5), mBeanServer.getAttribute(objectName, "Median"));
assertEquals(histogram.getStatistics().getQuantile(0.75), mBeanServer.getAttribute(objectName, "75thPercentile"));
assertEquals(histogram.getStatistics().getQuantile(0.95), mBeanServer.getAttribute(objectName, "95thPercentile"));
assertEquals(histogram.getStatistics().getQuantile(0.98), mBeanServer.getAttribute(objectName, "98thPercentile"));
assertEquals(histogram.getStatistics().getQuantile(0.99), mBeanServer.getAttribute(objectName, "99thPercentile"));
assertEquals(histogram.getStatistics().getQuantile(0.999), mBeanServer.getAttribute(objectName, "999thPercentile"));
} finally {
if (registry != null) {
registry.shutdown();
}
}
}
static class TestingHistogram implements Histogram {
@Override
public void update(long value) {
}
@Override
public long getCount() {
return 1;
}
@Override
public HistogramStatistics getStatistics() {
return new HistogramStatistics() {
@Override
public double getQuantile(double quantile) {
return quantile;
}
@Override
public long[] getValues() {
return new long[0];
}
@Override
public int size() {
return 3;
}
@Override
public double getMean() {
return 4;
}
@Override
public double getStdDev() {
return 5;
}
@Override
public long getMax() {
return 6;
}
@Override
public long getMin() {
return 7;
}
};
}
}
}
......@@ -24,7 +24,7 @@ under the License.
<parent>
<groupId>org.apache.flink</groupId>
<artifactId>flink-metric-reporters</artifactId>
<artifactId>flink-metrics</artifactId>
<version>1.1-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
......@@ -40,6 +40,14 @@ under the License.
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
......
......@@ -24,14 +24,19 @@ import com.codahale.metrics.ScheduledReporter;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.dropwizard.metrics.CounterWrapper;
import org.apache.flink.dropwizard.metrics.GaugeWrapper;
import org.apache.flink.dropwizard.metrics.FlinkCounterWrapper;
import org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper;
import org.apache.flink.dropwizard.metrics.FlinkGaugeWrapper;
import org.apache.flink.dropwizard.metrics.FlinkHistogramWrapper;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.groups.AbstractMetricGroup;
import org.apache.flink.metrics.reporter.MetricReporter;
import org.apache.flink.metrics.reporter.Scheduled;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
......@@ -44,6 +49,8 @@ import java.util.SortedMap;
@PublicEvolving
public abstract class ScheduledDropwizardReporter implements MetricReporter, Scheduled, Reporter {
protected final Logger log = LoggerFactory.getLogger(getClass());
public static final String ARG_HOST = "host";
public static final String ARG_PORT = "port";
public static final String ARG_PREFIX = "prefix";
......@@ -58,6 +65,7 @@ public abstract class ScheduledDropwizardReporter implements MetricReporter, Sch
private final Map<Gauge<?>, String> gauges = new HashMap<>();
private final Map<Counter, String> counters = new HashMap<>();
private final Map<Histogram, String> histograms = new HashMap<>();
// ------------------------------------------------------------------------
......@@ -90,11 +98,23 @@ public abstract class ScheduledDropwizardReporter implements MetricReporter, Sch
synchronized (this) {
if (metric instanceof Counter) {
counters.put((Counter) metric, fullName);
registry.register(fullName, new CounterWrapper((Counter) metric));
registry.register(fullName, new FlinkCounterWrapper((Counter) metric));
}
else if (metric instanceof Gauge) {
gauges.put((Gauge<?>) metric, fullName);
registry.register(fullName, GaugeWrapper.fromGauge((Gauge<?>) metric));
registry.register(fullName, FlinkGaugeWrapper.fromGauge((Gauge<?>) metric));
} else if (metric instanceof Histogram) {
Histogram histogram = (Histogram) metric;
histograms.put(histogram, fullName);
if (histogram instanceof DropwizardHistogramWrapper) {
registry.register(fullName, ((DropwizardHistogramWrapper) histogram).getDropwizarHistogram());
} else {
registry.register(fullName, new FlinkHistogramWrapper(histogram));
}
} else {
log.warn("Cannot add metric of type {}. This indicates that the reporter " +
"does not support this metric type.", metric.getClass().getName());
}
}
}
......@@ -108,6 +128,8 @@ public abstract class ScheduledDropwizardReporter implements MetricReporter, Sch
fullName = counters.remove(metric);
} else if (metric instanceof Gauge) {
fullName = gauges.remove(metric);
} else if (metric instanceof Histogram) {
fullName = histograms.remove(metric);
} else {
fullName = null;
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.dropwizard.metrics;
import com.codahale.metrics.Snapshot;
import org.apache.flink.metrics.HistogramStatistics;
/**
* Dropwizard histogram statistics implementation returned by {@link DropwizardHistogramWrapper}.
* The statistics class wraps a {@link Snapshot} instance and forwards the method calls accordingly.
*/
class DropwizardHistogramStatistics extends HistogramStatistics {
private final com.codahale.metrics.Snapshot snapshot;
DropwizardHistogramStatistics(com.codahale.metrics.Snapshot snapshot) {
this.snapshot = snapshot;
}
@Override
public double getQuantile(double quantile) {
return snapshot.getValue(quantile);
}
@Override
public long[] getValues() {
return snapshot.getValues();
}
@Override
public int size() {
return snapshot.size();
}
@Override
public double getMean() {
return snapshot.getMean();
}
@Override
public double getStdDev() {
return snapshot.getStdDev();
}
@Override
public long getMax() {
return snapshot.getMax();
}
@Override
public long getMin() {
return snapshot.getMin();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.dropwizard.metrics;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.HistogramStatistics;
/**
* Wrapper to use a Dropwizard {@link com.codahale.metrics.Histogram} as a Flink {@link Histogram}.
*/
public class DropwizardHistogramWrapper implements Histogram {
private final com.codahale.metrics.Histogram dropwizarHistogram;
public DropwizardHistogramWrapper(com.codahale.metrics.Histogram dropwizardHistogram) {
this.dropwizarHistogram = dropwizardHistogram;
}
public com.codahale.metrics.Histogram getDropwizarHistogram() {
return dropwizarHistogram;
}
@Override
public void update(long value) {
dropwizarHistogram.update(value);
}
@Override
public long getCount() {
return dropwizarHistogram.getCount();
}
@Override
public HistogramStatistics getStatistics() {
return new DropwizardHistogramStatistics(dropwizarHistogram.getSnapshot());
}
}
......@@ -19,10 +19,10 @@ package org.apache.flink.dropwizard.metrics;
import org.apache.flink.metrics.Counter;
public class CounterWrapper extends com.codahale.metrics.Counter {
public class FlinkCounterWrapper extends com.codahale.metrics.Counter {
private final Counter counter;
public CounterWrapper(Counter counter) {
public FlinkCounterWrapper(Counter counter) {
this.counter = counter;
}
......
......@@ -20,11 +20,11 @@ package org.apache.flink.dropwizard.metrics;
import org.apache.flink.metrics.Gauge;
public class GaugeWrapper<T> implements com.codahale.metrics.Gauge<T> {
public class FlinkGaugeWrapper<T> implements com.codahale.metrics.Gauge<T> {
private final Gauge<T> gauge;
public GaugeWrapper(Gauge<T> gauge) {
public FlinkGaugeWrapper(Gauge<T> gauge) {
this.gauge = gauge;
}
......@@ -33,9 +33,9 @@ public class GaugeWrapper<T> implements com.codahale.metrics.Gauge<T> {
return this.gauge.getValue();
}
public static <T> GaugeWrapper<T> fromGauge(Gauge<?> gauge) {
public static <T> FlinkGaugeWrapper<T> fromGauge(Gauge<?> gauge) {
@SuppressWarnings("unchecked")
Gauge<T> typedGauge = (Gauge<T>) gauge;
return new GaugeWrapper<>(typedGauge);
return new FlinkGaugeWrapper<>(typedGauge);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.dropwizard.metrics;
import com.codahale.metrics.Snapshot;
import org.apache.flink.metrics.Histogram;
/**
* Wrapper to use a Flink {@link Histogram} as a Dropwizard {@link com.codahale.metrics.Histogram}.
* This is necessary to report Flink's histograms via the Dropwizard
* {@link com.codahale.metrics.Reporter}.
*/
public class FlinkHistogramWrapper extends com.codahale.metrics.Histogram {
private final Histogram histogram;
public FlinkHistogramWrapper(Histogram histogram) {
super(null);
this.histogram = histogram;
}
@Override
public void update(long value) {
histogram.update(value);
}
@Override
public long getCount() {
return histogram.getCount();
}
@Override
public Snapshot getSnapshot() {
return new HistogramStatisticsWrapper(histogram.getStatistics());
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.dropwizard.metrics;
import com.codahale.metrics.Snapshot;
import org.apache.flink.metrics.HistogramStatistics;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.nio.charset.Charset;
/**
* Wrapper to use Flink's {@link HistogramStatistics} as a Dropwizard {@link Snapshot}. This is
* necessary to report Flink's histograms via the Dropwizard {@link com.codahale.metrics.Reporter}.
*/
class HistogramStatisticsWrapper extends Snapshot {
private static final Charset UTF_8 = Charset.forName("UTF-8");
private final HistogramStatistics histogramStatistics;
HistogramStatisticsWrapper(HistogramStatistics histogramStatistics) {
this.histogramStatistics = histogramStatistics;
}
@Override
public double getValue(double quantile) {
return histogramStatistics.getQuantile(quantile);
}
@Override
public long[] getValues() {
return histogramStatistics.getValues();
}
@Override
public int size() {
return histogramStatistics.size();
}
@Override
public long getMax() {
return histogramStatistics.getMax();
}
@Override
public double getMean() {
return histogramStatistics.getMean();
}
@Override
public long getMin() {
return histogramStatistics.getMin();
}
@Override
public double getStdDev() {
return histogramStatistics.getStdDev();
}
@Override
public void dump(OutputStream output) {
try(PrintWriter printWriter = new PrintWriter(new OutputStreamWriter(output, UTF_8))){
for (Long value : histogramStatistics.getValues()) {
printWriter.printf("%d%n", value);
}
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.dropwizard.metrics;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricFilter;
import com.codahale.metrics.ScheduledReporter;
import com.codahale.metrics.SlidingWindowReservoir;
import com.codahale.metrics.Snapshot;
import com.codahale.metrics.Timer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.dropwizard.ScheduledDropwizardReporter;
import org.apache.flink.metrics.MetricRegistry;
import org.apache.flink.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.metrics.reporter.MetricReporter;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static org.apache.flink.metrics.MetricRegistry.KEY_METRICS_REPORTER_INTERVAL;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class DropwizardFlinkHistogramWrapperTest extends TestLogger {
/**
* Tests the histogram functionality of the DropwizardHistogramWrapper.
*/
@Test
public void testDropwizardHistogramWrapper() {
int size = 10;
DropwizardHistogramWrapper histogramWrapper = new DropwizardHistogramWrapper(
new com.codahale.metrics.Histogram(new SlidingWindowReservoir(size)));
for (int i = 0; i < size; i++) {
histogramWrapper.update(i);
assertEquals(i + 1, histogramWrapper.getCount());
assertEquals(i, histogramWrapper.getStatistics().getMax());
assertEquals(0, histogramWrapper.getStatistics().getMin());
}
assertEquals(size, histogramWrapper.getStatistics().size());
assertEquals((size - 1)/2.0, histogramWrapper.getStatistics().getQuantile(0.5), 0.001);
for (int i = size; i < 2 * size; i++) {
histogramWrapper.update(i);
assertEquals(i + 1, histogramWrapper.getCount());
assertEquals(i, histogramWrapper.getStatistics().getMax());
assertEquals(i + 1 - size, histogramWrapper.getStatistics().getMin());
}
assertEquals(size, histogramWrapper.getStatistics().size());
assertEquals(size + (size - 1)/2.0, histogramWrapper.getStatistics().getQuantile(0.5), 0.001);
}
/**
* Tests that the DropwizardHistogramWrapper reports correct dropwizard snapshots to the
* ScheduledReporter.
*/
@Test
public void testDropwizardHistogramWrapperReporting() throws Exception {
long reportingInterval = 1000;
long timeout = 30000;
int size = 10;
String histogramMetricName = "histogram";
Configuration config = new Configuration();
config.setString(MetricRegistry.KEY_METRICS_REPORTER_CLASS, TestingReporter.class.getName());
config.setString(KEY_METRICS_REPORTER_INTERVAL, reportingInterval + " MILLISECONDS");
MetricRegistry registry = null;
try {
registry = new MetricRegistry(config);
DropwizardHistogramWrapper histogramWrapper = new DropwizardHistogramWrapper(
new com.codahale.metrics.Histogram(new SlidingWindowReservoir(size)));
TaskManagerMetricGroup metricGroup = new TaskManagerMetricGroup(registry, "localhost", "tmId");
metricGroup.histogram(histogramMetricName, histogramWrapper);
String fullMetricName = metricGroup.getScopeString() + "." + histogramMetricName;
Field f = registry.getClass().getDeclaredField("reporter");
f.setAccessible(true);
MetricReporter reporter = (MetricReporter) f.get(registry);
assertTrue(reporter instanceof TestingReporter);
TestingReporter testingReporter = (TestingReporter) reporter;
TestingScheduledReporter scheduledReporter = testingReporter.scheduledReporter;
// check that the metric has been registered
assertEquals(1, testingReporter.getMetrics().size());
for (int i = 0; i < size; i++) {
histogramWrapper.update(i);
}
Future<Snapshot> snapshotFuture = scheduledReporter.getNextHistogramSnapshot(fullMetricName);
Snapshot snapshot = snapshotFuture.get(timeout, TimeUnit.MILLISECONDS);
assertEquals(0, snapshot.getMin());
assertEquals((size - 1) / 2.0, snapshot.getMedian(), 0.001);
assertEquals(size - 1, snapshot.getMax());
assertEquals(size, snapshot.size());
registry.unregister(histogramWrapper, "histogram", metricGroup);
// check that the metric has been de-registered
assertEquals(0, testingReporter.getMetrics().size());
} finally {
if (registry != null) {
registry.shutdown();
}
}
}
public static class TestingReporter extends ScheduledDropwizardReporter {
TestingScheduledReporter scheduledReporter = null;
@Override
public ScheduledReporter getReporter(Configuration config) {
scheduledReporter = new TestingScheduledReporter(
registry,
getClass().getName(),
null,
TimeUnit.MILLISECONDS,
TimeUnit.MILLISECONDS);
return scheduledReporter;
}
public Map<String, com.codahale.metrics.Metric> getMetrics() {
return registry.getMetrics();
}
}
static class TestingScheduledReporter extends ScheduledReporter {
final Map<String, Snapshot> histogramSnapshots = new HashMap<>();
final Map<String, List<CompletableFuture<Snapshot>>> histogramSnapshotFutures = new HashMap<>();
protected TestingScheduledReporter(com.codahale.metrics.MetricRegistry registry, String name, MetricFilter filter, TimeUnit rateUnit, TimeUnit durationUnit) {
super(registry, name, filter, rateUnit, durationUnit);
}
@Override
public void report(SortedMap<String, Gauge> gauges, SortedMap<String, Counter> counters, SortedMap<String, com.codahale.metrics.Histogram> histograms, SortedMap<String, Meter> meters, SortedMap<String, Timer> timers) {
for (Map.Entry<String, com.codahale.metrics.Histogram> entry: histograms.entrySet()) {
reportHistogram(entry.getKey(), entry.getValue());
}
}
void reportHistogram(String name, com.codahale.metrics.Histogram histogram) {
histogramSnapshots.put(name, histogram.getSnapshot());
synchronized (histogramSnapshotFutures) {
if (histogramSnapshotFutures.containsKey(name)) {
List<CompletableFuture<Snapshot>> futures = histogramSnapshotFutures.remove(name);
for (CompletableFuture<Snapshot> future: futures) {
future.complete(histogram.getSnapshot());
}
}
}
}
Future<Snapshot> getNextHistogramSnapshot(String name) {
synchronized (histogramSnapshotFutures) {
List<CompletableFuture<Snapshot>> futures;
if (histogramSnapshotFutures.containsKey(name)) {
futures = histogramSnapshotFutures.get(name);
} else {
futures = new ArrayList<>();
histogramSnapshotFutures.put(name, futures);
}
CompletableFuture<Snapshot> future = new CompletableFuture<>();
futures.add(future);
return future;
}
}
}
static class CompletableFuture<T> implements Future<T> {
private Exception exception = null;
private T value = null;
private Object lock = new Object();
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
synchronized (lock) {
if (isDone()) {
return false;
} else {
exception = new CancellationException("Future was cancelled.");
lock.notifyAll();
return true;
}
}
}
@Override
public boolean isCancelled() {
return exception instanceof CancellationException;
}
@Override
public boolean isDone() {
return value != null || exception != null;
}
@Override
public T get() throws InterruptedException, ExecutionException {
while (!isDone() && !isCancelled()) {
synchronized (lock) {
lock.wait();
}
}
if (exception != null) {
throw new ExecutionException(exception);
} else if (value != null) {
return value;
} else {
throw new ExecutionException(new Exception("Future did not complete correctly."));
}
}
@Override
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
long timeoutMs = unit.toMillis(timeout);
long timeoutEnd = timeoutMs + System.currentTimeMillis();
while (!isDone() && !isCancelled() && timeoutMs > 0) {
synchronized (lock) {
lock.wait(unit.toMillis(timeoutMs));
}
timeoutMs = timeoutEnd - System.currentTimeMillis();
}
if (exception != null) {
throw new ExecutionException(exception);
} else if (value != null) {
return value;
} else {
throw new ExecutionException(new Exception("Future did not complete correctly."));
}
}
public boolean complete(T value) {
synchronized (lock) {
if (!isDone()) {
this.value = value;
lock.notifyAll();
return true;
} else {
return false;
}
}
}
public boolean fail(Exception exception) {
synchronized (lock) {
if (!isDone()) {
this.exception = exception;
lock.notifyAll();
return true;
} else {
return false;
}
}
}
}
}
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Set root logger level to OFF to not flood build logs
# set manually to INFO for debugging purposes
log4j.rootLogger=OFF, testlogger
# A1 is set to be a ConsoleAppender.
log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
log4j.appender.testlogger.target = System.err
log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
</encoder>
</appender>
<root level="WARN">
<appender-ref ref="STDOUT"/>
</root>
<logger name="org.apache.flink.api.common.io.DelimitedInputFormat" level="OFF"/>
<logger name="org.apache.flink.api.common.io.FileInputFormat" level="OFF"/>
<logger name="org.apache.flink.configuration.GlobalConfiguration" level="OFF"/>
<logger name="org.apache.flink.configuration.Configuration" level="OFF"/>
</configuration>
\ No newline at end of file
......@@ -24,7 +24,7 @@ under the License.
<parent>
<groupId>org.apache.flink</groupId>
<artifactId>flink-metric-reporters</artifactId>
<artifactId>flink-metrics</artifactId>
<version>1.1-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
......
......@@ -24,7 +24,7 @@ under the License.
<parent>
<groupId>org.apache.flink</groupId>
<artifactId>flink-metric-reporters</artifactId>
<artifactId>flink-metrics</artifactId>
<version>1.1-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
......
......@@ -24,7 +24,7 @@ under the License.
<parent>
<groupId>org.apache.flink</groupId>
<artifactId>flink-metric-reporters</artifactId>
<artifactId>flink-metrics</artifactId>
<version>1.1-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
......@@ -39,5 +39,13 @@ under the License.
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
</dependencies>
</project>
......@@ -22,6 +22,8 @@ import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.HistogramStatistics;
import org.apache.flink.metrics.reporter.AbstractReporter;
import org.apache.flink.metrics.reporter.Scheduled;
......@@ -110,6 +112,10 @@ public class StatsDReporter extends AbstractReporter implements Scheduled {
}
reportCounter(entry.getValue(), entry.getKey());
}
for (Map.Entry<Histogram, String> entry : histograms.entrySet()) {
reportHistogram(entry.getValue(), entry.getKey());
}
}
catch (ConcurrentModificationException | NoSuchElementException e) {
// ignore - may happen when metrics are concurrently added or removed
......@@ -130,6 +136,41 @@ public class StatsDReporter extends AbstractReporter implements Scheduled {
}
}
private void reportHistogram(final String name, final Histogram histogram) {
if (histogram != null) {
HistogramStatistics statistics = histogram.getStatistics();
if (statistics != null) {
send(prefix(name, "count"), String.valueOf(histogram.getCount()));
send(prefix(name, "max"), String.valueOf(statistics.getMax()));
send(prefix(name, "min"), String.valueOf(statistics.getMin()));
send(prefix(name, "mean"), String.valueOf(statistics.getMean()));
send(prefix(name, "stddev"), String.valueOf(statistics.getStdDev()));
send(prefix(name, "p50"), String.valueOf(statistics.getQuantile(0.5)));
send(prefix(name, "p75"), String.valueOf(statistics.getQuantile(0.75)));
send(prefix(name, "p95"), String.valueOf(statistics.getQuantile(0.95)));
send(prefix(name, "p98"), String.valueOf(statistics.getQuantile(0.98)));
send(prefix(name, "p99"), String.valueOf(statistics.getQuantile(0.99)));
send(prefix(name, "p999"), String.valueOf(statistics.getQuantile(0.999)));
}
}
}
private String prefix(String ... names) {
if (names.length > 0) {
StringBuilder stringBuilder = new StringBuilder(names[0]);
for (int i = 1; i < names.length; i++) {
stringBuilder.append('.').append(names[i]);
}
return stringBuilder.toString();
} else {
return "";
}
}
private void send(final String name, final String value) {
try {
String formatted = String.format("%s:%s|g", name, value);
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.metrics.statsd;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.HistogramStatistics;
import org.apache.flink.metrics.MetricRegistry;
import org.apache.flink.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.SocketException;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;
import static org.junit.Assert.assertEquals;
public class StatsDReporterTest extends TestLogger {
/**
* Tests that histograms are properly reported via the StatsD reporter
*/
@Test
public void testStatsDHistogramReporting() throws Exception {
MetricRegistry registry = null;
DatagramSocketReceiver receiver = null;
Thread receiverThread = null;
long timeout = 5000;
long joinTimeout = 30000;
String histogramName = "histogram";
try {
receiver = new DatagramSocketReceiver();
receiverThread = new Thread(receiver);
receiverThread.start();
int port = receiver.getPort();
Configuration config = new Configuration();
config.setString(MetricRegistry.KEY_METRICS_REPORTER_CLASS, StatsDReporter.class.getName());
config.setString(MetricRegistry.KEY_METRICS_REPORTER_INTERVAL, "1 SECONDS");
config.setString(MetricRegistry.KEY_METRICS_REPORTER_ARGUMENTS, "--host localhost --port " + port);
registry = new MetricRegistry(config);
TaskManagerMetricGroup metricGroup = new TaskManagerMetricGroup(registry, "localhost", "tmId");
TestingHistogram histogram = new TestingHistogram();
metricGroup.histogram(histogramName, histogram);
receiver.waitUntilNumLines(11, timeout);
Set<String> lines = receiver.getLines();
String prefix = metricGroup.getScopeString() + "." + histogramName;
Set<String> expectedLines = new HashSet<>();
expectedLines.add(prefix + ".count:1|g");
expectedLines.add(prefix + ".mean:3.0|g");
expectedLines.add(prefix + ".min:6|g");
expectedLines.add(prefix + ".max:5|g");
expectedLines.add(prefix + ".stddev:4.0|g");
expectedLines.add(prefix + ".p75:0.75|g");
expectedLines.add(prefix + ".p98:0.98|g");
expectedLines.add(prefix + ".p99:0.99|g");
expectedLines.add(prefix + ".p999:0.999|g");
expectedLines.add(prefix + ".p95:0.95|g");
expectedLines.add(prefix + ".p50:0.5|g");
assertEquals(expectedLines, lines);
} finally {
if (registry != null) {
registry.shutdown();
}
if (receiver != null) {
receiver.stop();
}
if (receiverThread != null) {
receiverThread.join(joinTimeout);
}
}
}
public static class TestingHistogram implements Histogram {
@Override
public void update(long value) {
}
@Override
public long getCount() {
return 1;
}
@Override
public HistogramStatistics getStatistics() {
return new HistogramStatistics() {
@Override
public double getQuantile(double quantile) {
return quantile;
}
@Override
public long[] getValues() {
return new long[0];
}
@Override
public int size() {
return 2;
}
@Override
public double getMean() {
return 3;
}
@Override
public double getStdDev() {
return 4;
}
@Override
public long getMax() {
return 5;
}
@Override
public long getMin() {
return 6;
}
};
}
}
public static class DatagramSocketReceiver implements Runnable {
private static final Object obj = new Object();
private final DatagramSocket socket;
private final ConcurrentHashMap<String, Object> lines;
private boolean running = true;
public DatagramSocketReceiver() throws SocketException {
socket = new DatagramSocket();
lines = new ConcurrentHashMap<>();
}
public int getPort() {
return socket.getLocalPort();
}
public void stop() {
running = false;
socket.close();
}
public void waitUntilNumLines(int numberLines, long timeout) throws TimeoutException {
long endTimeout = System.currentTimeMillis() + timeout;
long remainingTimeout = timeout;
while (numberLines > lines.size() && remainingTimeout > 0) {
synchronized (lines) {
try {
lines.wait(remainingTimeout);
} catch (InterruptedException e) {
// ignore interruption exceptions
}
}
remainingTimeout = endTimeout - System.currentTimeMillis();
}
if (remainingTimeout <= 0) {
throw new TimeoutException("Have not received " + numberLines + " in time.");
}
}
public Set<String> getLines() {
return lines.keySet();
}
@Override
public void run() {
while (running) {
try {
byte[] buffer = new byte[1024];
DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
socket.receive(packet);
String line = new String(packet.getData(), 0, packet.getLength());
lines.put(line, obj);
synchronized (lines) {
lines.notifyAll();
}
} catch (IOException ex) {
// ignore the exceptions
}
}
}
}
}
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Set root logger level to OFF to not flood build logs
# set manually to INFO for debugging purposes
log4j.rootLogger=OFF, testlogger
# A1 is set to be a ConsoleAppender.
log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
log4j.appender.testlogger.target = System.err
log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
</encoder>
</appender>
<root level="WARN">
<appender-ref ref="STDOUT"/>
</root>
<logger name="org.apache.flink.api.common.io.DelimitedInputFormat" level="OFF"/>
<logger name="org.apache.flink.api.common.io.FileInputFormat" level="OFF"/>
<logger name="org.apache.flink.configuration.GlobalConfiguration" level="OFF"/>
<logger name="org.apache.flink.configuration.Configuration" level="OFF"/>
</configuration>
\ No newline at end of file
......@@ -29,8 +29,8 @@ under the License.
<relativePath>..</relativePath>
</parent>
<artifactId>flink-metric-reporters</artifactId>
<name>flink-metric-reporters</name>
<artifactId>flink-metrics</artifactId>
<name>flink-metrics</name>
<packaging>pom</packaging>
<modules>
......
......@@ -74,7 +74,7 @@ under the License.
<module>flink-quickstart</module>
<module>flink-contrib</module>
<module>flink-dist</module>
<module>flink-metric-reporters</module>
<module>flink-metrics</module>
</modules>
<properties>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册