From edb79b0a8bb2b33e1a0470b99b11274ac0e9c673 Mon Sep 17 00:00:00 2001 From: zentol Date: Mon, 12 Jun 2017 15:36:25 +0200 Subject: [PATCH] [FLINK-6898] [metrics] Limit size of operator component in metric name This closes #4109. --- .../metrics/groups/AbstractMetricGroup.java | 2 +- .../metrics/groups/TaskMetricGroup.java | 6 ++++++ .../metrics/groups/TaskMetricGroupTest.java | 19 +++++++++++++++++++ 3 files changed, 26 insertions(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java index c67c5ea2e85..ab59977e4ed 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java @@ -63,7 +63,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; @Internal public abstract class AbstractMetricGroup> implements MetricGroup { - private static final Logger LOG = LoggerFactory.getLogger(MetricGroup.class); + protected static final Logger LOG = LoggerFactory.getLogger(MetricGroup.class); // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java index cb7aaa00518..338fc203cb2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java @@ -42,6 +42,8 @@ public class TaskMetricGroup extends ComponentMetricGroup operators = new HashMap<>(); + static final int METRICS_OPERATOR_NAME_MAX_LENGTH = 80; + private final TaskIOMetricGroup ioMetrics; /** The execution Id uniquely identifying the executed task represented by this metrics group. */ @@ -131,6 +133,10 @@ public class TaskMetricGroup extends ComponentMetricGroup METRICS_OPERATOR_NAME_MAX_LENGTH) { + LOG.warn("The operator name {} exceeded the {} characters length limit and was truncated.", name, METRICS_OPERATOR_NAME_MAX_LENGTH); + name = name.substring(0, METRICS_OPERATOR_NAME_MAX_LENGTH); + } OperatorMetricGroup operator = new OperatorMetricGroup(this.registry, this, name); synchronized (this) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java index 22b0a1a893e..47fc98b7ad6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java @@ -25,10 +25,12 @@ import org.apache.flink.metrics.Metric; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; import org.apache.flink.runtime.metrics.dump.QueryScopeInfo; +import org.apache.flink.runtime.metrics.scope.ScopeFormat; import org.apache.flink.runtime.metrics.util.DummyCharacterFilter; import org.apache.flink.util.AbstractID; import org.apache.flink.util.TestLogger; +import org.junit.Assert; import org.junit.Test; import static org.junit.Assert.assertArrayEquals; @@ -150,6 +152,23 @@ public class TaskMetricGroupTest extends TestLogger { registry.shutdown(); } + @Test + public void testOperatorNameTruncation() { + Configuration cfg = new Configuration(); + cfg.setString(MetricOptions.SCOPE_NAMING_OPERATOR, ScopeFormat.SCOPE_OPERATOR_NAME); + MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg)); + TaskManagerMetricGroup tm = new TaskManagerMetricGroup(registry, "host", "id"); + TaskManagerJobMetricGroup job = new TaskManagerJobMetricGroup(registry, tm, new JobID(), "jobname"); + TaskMetricGroup taskMetricGroup = new TaskMetricGroup(registry, job, new AbstractID(), new AbstractID(), "task", 0, 0); + + String originalName = new String(new char[100]).replace("\0", "-"); + OperatorMetricGroup operatorMetricGroup = taskMetricGroup.addOperator(originalName); + + String storedName = operatorMetricGroup.getScopeComponents()[0]; + Assert.assertEquals(TaskMetricGroup.METRICS_OPERATOR_NAME_MAX_LENGTH, storedName.length()); + Assert.assertEquals(originalName.substring(0, TaskMetricGroup.METRICS_OPERATOR_NAME_MAX_LENGTH), storedName); + } + private static class CountingMetricRegistry extends MetricRegistry { private int counter = 0; -- GitLab