提交 56cdec7d 编写于 作者: Z zentol

[FLINK-4074] Make metric reporters less blocking

This closes #2105
上级 9487fcbf
......@@ -31,6 +31,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.TimerTask;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
......@@ -60,7 +62,7 @@ public class MetricRegistry {
static final Logger LOG = LoggerFactory.getLogger(MetricRegistry.class);
private final MetricReporter reporter;
private final java.util.Timer timer;
private final ScheduledExecutorService executor;
private final ScopeFormats scopeFormats;
......@@ -86,12 +88,11 @@ public class MetricRegistry {
// by default, create JMX metrics
LOG.info("No metrics reporter configured, exposing metrics via JMX");
this.reporter = new JMXReporter();
this.timer = null;
this.executor = null;
}
else {
MetricReporter reporter;
java.util.Timer timer;
ScheduledExecutorService executor = null;
try {
String configuredPeriod = config.getString(KEY_METRICS_REPORTER_INTERVAL, null);
TimeUnit timeunit = TimeUnit.SECONDS;
......@@ -117,24 +118,20 @@ public class MetricRegistry {
reporter.open(reporterConfig);
if (reporter instanceof Scheduled) {
executor = Executors.newSingleThreadScheduledExecutor();
LOG.info("Periodically reporting metrics in intervals of {} {}", period, timeunit.name());
long millis = timeunit.toMillis(period);
timer = new java.util.Timer("Periodic Metrics Reporter", true);
timer.schedule(new ReporterTask((Scheduled) reporter), millis, millis);
}
else {
timer = null;
executor.scheduleWithFixedDelay(new ReporterTask((Scheduled) reporter), period, period, timeunit);
}
}
catch (Throwable t) {
reporter = new JMXReporter();
timer = null;
shutdownExecutor();
LOG.error("Could not instantiate custom metrics reporter. Defaulting to JMX metrics export.", t);
}
this.reporter = reporter;
this.timer = timer;
this.executor = executor;
}
}
......@@ -142,9 +139,6 @@ public class MetricRegistry {
* Shuts down this registry and the associated {@link org.apache.flink.metrics.reporter.MetricReporter}.
*/
public void shutdown() {
if (timer != null) {
timer.cancel();
}
if (reporter != null) {
try {
reporter.close();
......@@ -152,6 +146,21 @@ public class MetricRegistry {
LOG.warn("Metrics reporter did not shut down cleanly", t);
}
}
shutdownExecutor();
}
private void shutdownExecutor() {
if (executor != null) {
executor.shutdown();
try {
if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
}
}
}
public ScopeFormats getScopeFormats() {
......
......@@ -53,6 +53,8 @@ public class StatsDReporter extends AbstractReporter implements Scheduled {
// public static final String ARG_CONVERSION_RATE = "rateConversion";
// public static final String ARG_CONVERSION_DURATION = "durationConversion";
private boolean closed = false;
private DatagramSocket socket;
private InetSocketAddress address;
......@@ -81,6 +83,7 @@ public class StatsDReporter extends AbstractReporter implements Scheduled {
@Override
public void close() {
closed = true;
if (socket != null && !socket.isClosed()) {
socket.close();
}
......@@ -95,10 +98,16 @@ public class StatsDReporter extends AbstractReporter implements Scheduled {
// operator creation and shutdown
try {
for (Map.Entry<Gauge<?>, String> entry : gauges.entrySet()) {
if (closed) {
return;
}
reportGauge(entry.getValue(), entry.getKey());
}
for (Map.Entry<Counter, String> entry : counters.entrySet()) {
if (closed) {
return;
}
reportCounter(entry.getValue(), entry.getKey());
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册