diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java index c67c5ea2e852e560c4ad975cc78506bad39560c1..ab59977e4edda44dc7d2cc24f389e36be4eff125 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java @@ -63,7 +63,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; @Internal public abstract class AbstractMetricGroup> implements MetricGroup { - private static final Logger LOG = LoggerFactory.getLogger(MetricGroup.class); + protected static final Logger LOG = LoggerFactory.getLogger(MetricGroup.class); // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java index cb7aaa005180328a717c4a494f400740edb33190..338fc203cb29ed2ca7ee7a16ef00f9306566d391 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java @@ -42,6 +42,8 @@ public class TaskMetricGroup extends ComponentMetricGroup operators = new HashMap<>(); + static final int METRICS_OPERATOR_NAME_MAX_LENGTH = 80; + private final TaskIOMetricGroup ioMetrics; /** The execution Id uniquely identifying the executed task represented by this metrics group. */ @@ -131,6 +133,10 @@ public class TaskMetricGroup extends ComponentMetricGroup METRICS_OPERATOR_NAME_MAX_LENGTH) { + LOG.warn("The operator name {} exceeded the {} characters length limit and was truncated.", name, METRICS_OPERATOR_NAME_MAX_LENGTH); + name = name.substring(0, METRICS_OPERATOR_NAME_MAX_LENGTH); + } OperatorMetricGroup operator = new OperatorMetricGroup(this.registry, this, name); synchronized (this) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java index 22b0a1a893e2937ef21241ad0103183e27e900b4..47fc98b7ad6c8aba61d48de1798db3931377ef68 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java @@ -25,10 +25,12 @@ import org.apache.flink.metrics.Metric; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; import org.apache.flink.runtime.metrics.dump.QueryScopeInfo; +import org.apache.flink.runtime.metrics.scope.ScopeFormat; import org.apache.flink.runtime.metrics.util.DummyCharacterFilter; import org.apache.flink.util.AbstractID; import org.apache.flink.util.TestLogger; +import org.junit.Assert; import org.junit.Test; import static org.junit.Assert.assertArrayEquals; @@ -150,6 +152,23 @@ public class TaskMetricGroupTest extends TestLogger { registry.shutdown(); } + @Test + public void testOperatorNameTruncation() { + Configuration cfg = new Configuration(); + cfg.setString(MetricOptions.SCOPE_NAMING_OPERATOR, ScopeFormat.SCOPE_OPERATOR_NAME); + MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg)); + TaskManagerMetricGroup tm = new TaskManagerMetricGroup(registry, "host", "id"); + TaskManagerJobMetricGroup job = new TaskManagerJobMetricGroup(registry, tm, new JobID(), "jobname"); + TaskMetricGroup taskMetricGroup = new TaskMetricGroup(registry, job, new AbstractID(), new AbstractID(), "task", 0, 0); + + String originalName = new String(new char[100]).replace("\0", "-"); + OperatorMetricGroup operatorMetricGroup = taskMetricGroup.addOperator(originalName); + + String storedName = operatorMetricGroup.getScopeComponents()[0]; + Assert.assertEquals(TaskMetricGroup.METRICS_OPERATOR_NAME_MAX_LENGTH, storedName.length()); + Assert.assertEquals(originalName.substring(0, TaskMetricGroup.METRICS_OPERATOR_NAME_MAX_LENGTH), storedName); + } + private static class CountingMetricRegistry extends MetricRegistry { private int counter = 0;