From 0bf0fdc26ea86020929fa64d083dce02ba2a2ae2 Mon Sep 17 00:00:00 2001 From: zentol Date: Wed, 6 Dec 2017 14:39:15 +0100 Subject: [PATCH] [FLINK-8213][metrics] Improve fallback behaviors This closes #8213. --- .../internal/KafkaConsumerThreadTest.java | 4 +- .../flink/storm/wrappers/BoltWrapperTest.java | 4 +- .../webmonitor/WebRuntimeMonitorITCase.java | 2 +- .../flink/runtime/jobmaster/JobMaster.java | 6 +- .../runtime/metrics/NoOpMetricRegistry.java | 15 +- .../groups/UnregisteredMetricGroups.java | 164 ++++++++++++++++++ .../flink/runtime/operators/DataSinkTask.java | 22 ++- .../runtime/operators/DataSourceTask.java | 25 ++- .../IndividualRestartsConcurrencyTest.java | 4 +- .../partition/InputGateConcurrentTest.java | 16 +- .../partition/InputGateFairnessTest.java | 18 +- .../consumer/LocalInputChannelTest.java | 14 +- .../consumer/RemoteInputChannelTest.java | 8 +- .../consumer/SingleInputGateTest.java | 24 +-- .../consumer/TestSingleInputGate.java | 4 +- .../consumer/UnionInputGateTest.java | 6 +- .../JobManagerHAJobGraphRecoveryITCase.java | 2 +- .../jobmanager/JobManagerHARecoveryTest.java | 7 +- .../runtime/jobmanager/JobManagerTest.java | 20 +-- .../runtime/jobmanager/JobSubmitTest.java | 2 +- .../JobManagerLeaderElectionTest.java | 3 +- .../metrics/TaskManagerMetricsTest.java | 2 +- .../metrics/groups/MetricGroupTest.java | 10 +- .../metrics/groups/TaskIOMetricGroupTest.java | 3 +- .../operators/drivers/TestTaskContext.java | 4 +- .../testutils/BinaryOperatorTestBase.java | 5 +- .../operators/testutils/DriverTestBase.java | 3 +- .../operators/testutils/DummyEnvironment.java | 3 +- .../operators/testutils/MockEnvironment.java | 3 +- .../testutils/UnaryOperatorTestBase.java | 3 +- .../UnregisteredTaskMetricsGroup.java | 83 --------- .../resourcemanager/ResourceManagerTest.java | 2 +- ...kManagerComponentsStartupShutdownTest.java | 4 +- .../TaskManagerProcessReapingTestBase.java | 2 +- .../taskmanager/TaskManagerStartupTest.java | 2 +- .../runtime/util/JvmExitOnFatalErrorTest.java | 4 +- .../api/operators/AbstractStreamOperator.java | 6 +- .../runtime/io/StreamInputProcessor.java | 13 +- .../runtime/io/StreamTwoInputProcessor.java | 13 +- .../runtime/tasks/OperatorChain.java | 18 +- .../async/RichAsyncFunctionTest.java | 3 +- .../async/AsyncWaitOperatorTest.java | 4 +- .../tasks/InterruptSensitiveRestoreTest.java | 4 +- .../runtime/tasks/StreamMockEnvironment.java | 4 +- .../tasks/StreamTaskTerminationTest.java | 4 +- .../runtime/tasks/StreamTaskTest.java | 4 +- .../tasks/TaskCheckpointingBehaviourTest.java | 4 +- ...TaskManagerProcessFailureRecoveryTest.java | 2 +- .../JobManagerHACheckpointRecoveryITCase.java | 2 +- ...erHAProcessFailureBatchRecoveryITCase.java | 2 +- .../ProcessFailureCancelingITCase.java | 2 +- .../AbstractOperatorRestoreTestBase.java | 4 +- .../flink/yarn/YarnResourceManagerTest.java | 2 +- 53 files changed, 376 insertions(+), 218 deletions(-) rename flink-runtime/src/{test => main}/java/org/apache/flink/runtime/metrics/NoOpMetricRegistry.java (83%) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/UnregisteredMetricGroups.java delete mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java index 2368091ee20..383eb13ea33 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java @@ -20,7 +20,7 @@ package org.apache.flink.streaming.connectors.kafka.internal; import org.apache.flink.core.testutils.MultiShotLatch; import org.apache.flink.core.testutils.OneShotLatch; -import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.streaming.connectors.kafka.internals.ClosableBlockingQueue; import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; @@ -716,7 +716,7 @@ public class KafkaConsumerThreadTest { handover, new Properties(), unassignedPartitionsQueue, - mock(MetricGroup.class), + new UnregisteredMetricsGroup(), new KafkaConsumerCallBridge(), "test-kafka-consumer-thread", 0, diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java index f518d178b5d..98816e462c8 100644 --- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java +++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java @@ -25,7 +25,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.UnmodifiableConfiguration; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.execution.Environment; -import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo; import org.apache.flink.storm.util.AbstractTest; import org.apache.flink.storm.util.SplitStreamType; @@ -372,7 +372,7 @@ public class BoltWrapperTest extends AbstractTest { Environment env = mock(Environment.class); when(env.getTaskInfo()).thenReturn(new TaskInfo("Mock Task", 1, 0, 1, 0)); when(env.getUserClassLoader()).thenReturn(BoltWrapperTest.class.getClassLoader()); - when(env.getMetricGroup()).thenReturn(new UnregisteredTaskMetricsGroup()); + when(env.getMetricGroup()).thenReturn(UnregisteredMetricGroups.createUnregisteredTaskMetricGroup()); when(env.getTaskManagerInfo()).thenReturn(new TestingTaskManagerRuntimeInfo()); StreamTask mockTask = mock(StreamTask.class); diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java index 75a844caa5a..e6cfdda968c 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java @@ -194,7 +194,7 @@ public class WebRuntimeMonitorITCase extends TestLogger { TestingUtils.defaultExecutor(), TestingUtils.defaultExecutor(), highAvailabilityServices, - new NoOpMetricRegistry(), + NoOpMetricRegistry.INSTANCE, Option.apply(webMonitor[i].getRestAddress()), JobManager.class, MemoryArchivist.class)._1(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index c4c4445a66a..6d0de74ba1f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -27,7 +27,6 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.InputSplit; import org.apache.flink.core.io.InputSplitAssigner; import org.apache.flink.metrics.MetricGroup; -import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.queryablestate.KvStateID; import org.apache.flink.runtime.StoppingException; import org.apache.flink.runtime.blob.BlobServer; @@ -78,6 +77,7 @@ import org.apache.flink.runtime.messages.webmonitor.ClusterOverview; import org.apache.flink.runtime.messages.webmonitor.JobDetails; import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails; import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.query.KvStateLocation; import org.apache.flink.runtime.query.KvStateLocationRegistry; import org.apache.flink.runtime.query.UnknownKvStateLocation; @@ -264,8 +264,8 @@ public class JobMaster extends FencedRpcEndpoint implements JobMast this.jobManagerMetricGroup = jobManagerMetricGroup; this.jobMetricGroup = jobManagerMetricGroup.addJob(jobGraph); } else { - this.jobManagerMetricGroup = new UnregisteredMetricsGroup(); - this.jobMetricGroup = new UnregisteredMetricsGroup(); + this.jobManagerMetricGroup = UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(); + this.jobMetricGroup = UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup(); } log.info("Initializing job {} ({}).", jobName, jid); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/NoOpMetricRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/NoOpMetricRegistry.java similarity index 83% rename from flink-runtime/src/test/java/org/apache/flink/runtime/metrics/NoOpMetricRegistry.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/metrics/NoOpMetricRegistry.java index 46d6548cd02..c161aa261b3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/NoOpMetricRegistry.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/NoOpMetricRegistry.java @@ -26,13 +26,16 @@ import org.apache.flink.runtime.metrics.scope.ScopeFormats; import javax.annotation.Nullable; /** - * Metric registry which does nothing and is intended for testing purposes. + * Metric registry which does nothing. */ public class NoOpMetricRegistry implements MetricRegistry { + private static final char delimiter = '.'; + private static final ScopeFormats scopeFormats = ScopeFormats.fromConfig(new Configuration()); - final char delimiter = ','; + public static final MetricRegistry INSTANCE = new NoOpMetricRegistry(); - final ScopeFormats scopeFormats = ScopeFormats.fromConfig(new Configuration()); + private NoOpMetricRegistry() { + } @Override public char getDelimiter() { @@ -50,10 +53,12 @@ public class NoOpMetricRegistry implements MetricRegistry { } @Override - public void register(Metric metric, String metricName, AbstractMetricGroup group) {} + public void register(Metric metric, String metricName, AbstractMetricGroup group) { + } @Override - public void unregister(Metric metric, String metricName, AbstractMetricGroup group) {} + public void unregister(Metric metric, String metricName, AbstractMetricGroup group) { + } @Override public ScopeFormats getScopeFormats() { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/UnregisteredMetricGroups.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/UnregisteredMetricGroups.java new file mode 100644 index 00000000000..3869aa642f9 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/UnregisteredMetricGroups.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.metrics.groups; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.metrics.NoOpMetricRegistry; + +/** + * A collection of safe drop-in replacements for existing {@link ComponentMetricGroup}s. + */ +public class UnregisteredMetricGroups { + + private UnregisteredMetricGroups() { + } + + public static JobManagerMetricGroup createUnregisteredJobManagerMetricGroup() { + return new UnregisteredJobManagerMetricGroup(); + } + + public static JobManagerJobMetricGroup createUnregisteredJobManagerJobMetricGroup() { + return new UnregisteredJobManagerJobMetricGroup(); + } + + public static TaskManagerMetricGroup createUnregisteredTaskManagerMetricGroup() { + return new UnregisteredTaskManagerMetricGroup(); + } + + public static TaskManagerJobMetricGroup createUnregisteredTaskManagerJobMetricGroup() { + return new UnregisteredTaskManagerJobMetricGroup(); + } + + public static TaskMetricGroup createUnregisteredTaskMetricGroup() { + return new UnregisteredTaskMetricGroup(); + } + + public static OperatorMetricGroup createUnregisteredOperatorMetricGroup() { + return new UnregisteredOperatorMetricGroup(); + } + + /** + * A safe drop-in replacement for {@link JobManagerMetricGroup}s. + */ + public static class UnregisteredJobManagerMetricGroup extends JobManagerMetricGroup { + private static final String DEFAULT_HOST_NAME = "UnregisteredHost"; + + private UnregisteredJobManagerMetricGroup() { + super(NoOpMetricRegistry.INSTANCE, DEFAULT_HOST_NAME); + } + + @Override + public JobManagerJobMetricGroup addJob(JobGraph job) { + return createUnregisteredJobManagerJobMetricGroup(); + } + } + + /** + * A safe drop-in replacement for {@link JobManagerJobMetricGroup}s. + */ + public static class UnregisteredJobManagerJobMetricGroup extends JobManagerJobMetricGroup { + private static final JobID DEFAULT_JOB_ID = new JobID(0, 0); + private static final String DEFAULT_JOB_NAME = "UnregisteredJob"; + + protected UnregisteredJobManagerJobMetricGroup() { + super(NoOpMetricRegistry.INSTANCE, new UnregisteredJobManagerMetricGroup(), DEFAULT_JOB_ID, DEFAULT_JOB_NAME); + } + } + + /** + * A safe drop-in replacement for {@link TaskManagerMetricGroup}s. + */ + public static class UnregisteredTaskManagerMetricGroup extends TaskManagerMetricGroup { + private static final String DEFAULT_HOST_NAME = "UnregisteredHost"; + private static final String DEFAULT_TASKMANAGER_ID = "0"; + + protected UnregisteredTaskManagerMetricGroup() { + super(NoOpMetricRegistry.INSTANCE, DEFAULT_HOST_NAME, DEFAULT_TASKMANAGER_ID); + } + + @Override + public TaskMetricGroup addTaskForJob( + final JobID jobId, + final String jobName, + final JobVertexID jobVertexId, + final ExecutionAttemptID executionAttemptId, + final String taskName, + final int subtaskIndex, + final int attemptNumber) { + return createUnregisteredTaskMetricGroup(); + } + } + + /** + * A safe drop-in replacement for {@link TaskManagerJobMetricGroup}s. + */ + public static class UnregisteredTaskManagerJobMetricGroup extends TaskManagerJobMetricGroup { + private static final JobID DEFAULT_JOB_ID = new JobID(0, 0); + private static final String DEFAULT_JOB_NAME = "UnregisteredJob"; + + protected UnregisteredTaskManagerJobMetricGroup() { + super(NoOpMetricRegistry.INSTANCE, new UnregisteredTaskManagerMetricGroup(), DEFAULT_JOB_ID, DEFAULT_JOB_NAME); + } + + @Override + public TaskMetricGroup addTask( + final JobVertexID jobVertexId, + final ExecutionAttemptID executionAttemptID, + final String taskName, + final int subtaskIndex, + final int attemptNumber) { + return createUnregisteredTaskMetricGroup(); + } + } + + /** + * A safe drop-in replacement for {@link TaskMetricGroup}s. + */ + public static class UnregisteredTaskMetricGroup extends TaskMetricGroup { + private static final JobVertexID DEFAULT_VERTEX_ID = new JobVertexID(0, 0); + private static final ExecutionAttemptID DEFAULT_ATTEMPT_ID = new ExecutionAttemptID(0, 0); + private static final String DEFAULT_TASK_NAME = "UnregisteredTask"; + + protected UnregisteredTaskMetricGroup() { + super(NoOpMetricRegistry.INSTANCE, new UnregisteredTaskManagerJobMetricGroup(), + DEFAULT_VERTEX_ID, DEFAULT_ATTEMPT_ID, DEFAULT_TASK_NAME, 0, 0); + } + + @Override + public OperatorMetricGroup addOperator(OperatorID operatorID, String name) { + return createUnregisteredOperatorMetricGroup(); + } + } + + /** + * A safe drop-in replacement for {@link OperatorMetricGroup}s. + */ + public static class UnregisteredOperatorMetricGroup extends OperatorMetricGroup { + private static final OperatorID DEFAULT_OPERATOR_ID = new OperatorID(0, 0); + private static final String DEFAULT_OPERATOR_NAME = "UnregisteredOperator"; + + protected UnregisteredOperatorMetricGroup() { + super(NoOpMetricRegistry.INSTANCE, new UnregisteredTaskMetricGroup(), DEFAULT_OPERATOR_ID, DEFAULT_OPERATOR_NAME); + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java index bd052f5980e..bb253abb561 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java @@ -29,12 +29,14 @@ import org.apache.flink.api.common.typeutils.TypeSerializerFactory; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.IOReadableWritable; import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.SimpleCounter; import org.apache.flink.runtime.execution.CancelTaskException; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.io.network.api.reader.MutableReader; import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader; import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup; import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup; import org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException; import org.apache.flink.runtime.operators.sort.UnilateralSortMerger; @@ -107,10 +109,22 @@ public class DataSinkTask extends AbstractInvokable { LOG.debug(getLogString("Starting data sink operator")); RuntimeContext ctx = createRuntimeContext(); - final Counter numRecordsIn = ((OperatorMetricGroup) ctx.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter(); - ((OperatorMetricGroup) ctx.getMetricGroup()).getIOMetricGroup().reuseInputMetricsForTask(); - ((OperatorMetricGroup) ctx.getMetricGroup()).getIOMetricGroup().reuseOutputMetricsForTask(); - + + final Counter numRecordsIn; + { + Counter tmpNumRecordsIn; + try { + OperatorIOMetricGroup ioMetricGroup = ((OperatorMetricGroup) ctx.getMetricGroup()).getIOMetricGroup(); + ioMetricGroup.reuseInputMetricsForTask(); + ioMetricGroup.reuseOutputMetricsForTask(); + tmpNumRecordsIn = ioMetricGroup.getNumRecordsInCounter(); + } catch (Exception e) { + LOG.warn("An exception occurred during the metrics setup.", e); + tmpNumRecordsIn = new SimpleCounter(); + } + numRecordsIn = tmpNumRecordsIn; + } + if(RichOutputFormat.class.isAssignableFrom(this.format.getClass())){ ((RichOutputFormat) this.format).setRuntimeContext(ctx); LOG.debug(getLogString("Rich Sink detected. Initializing runtime context.")); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java index 2600b2c6946..14378770d8b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java @@ -27,12 +27,14 @@ import org.apache.flink.api.common.typeutils.TypeSerializerFactory; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.InputSplit; import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.SimpleCounter; import org.apache.flink.runtime.execution.CancelTaskException; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.io.network.api.writer.RecordWriter; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException; +import org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup; import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup; import org.apache.flink.runtime.operators.chaining.ChainedDriver; import org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException; @@ -102,12 +104,25 @@ public class DataSourceTask extends AbstractInvokable { LOG.debug(getLogString("Starting data source operator")); RuntimeContext ctx = createRuntimeContext(); - Counter completedSplitsCounter = ctx.getMetricGroup().counter("numSplitsProcessed"); - ((OperatorMetricGroup) ctx.getMetricGroup()).getIOMetricGroup().reuseInputMetricsForTask(); - Counter numRecordsOut = ((OperatorMetricGroup) ctx.getMetricGroup()).getIOMetricGroup().getNumRecordsOutCounter(); - if (this.config.getNumberOfChainedStubs() == 0) { - ((OperatorMetricGroup) ctx.getMetricGroup()).getIOMetricGroup().reuseOutputMetricsForTask(); + + final Counter numRecordsOut; + { + Counter tmpNumRecordsOut; + try { + OperatorIOMetricGroup ioMetricGroup = ((OperatorMetricGroup) ctx.getMetricGroup()).getIOMetricGroup(); + ioMetricGroup.reuseInputMetricsForTask(); + if (this.config.getNumberOfChainedStubs() == 0) { + ioMetricGroup.reuseOutputMetricsForTask(); + } + tmpNumRecordsOut = ioMetricGroup.getNumRecordsOutCounter(); + } catch (Exception e) { + LOG.warn("An exception occurred during the metrics setup.", e); + tmpNumRecordsOut = new SimpleCounter(); + } + numRecordsOut = tmpNumRecordsOut; } + + Counter completedSplitsCounter = ctx.getMetricGroup().counter("numSplitsProcessed"); if (RichInputFormat.class.isAssignableFrom(this.format.getClass())) { ((RichInputFormat) this.format).setRuntimeContext(ctx); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java index cb94d25d3e1..c9775036e2a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java @@ -45,7 +45,7 @@ import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; -import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.testtasks.NoOpInvokable; @@ -344,7 +344,7 @@ public class IndividualRestartsConcurrencyTest extends TestLogger { 1, allVertices, checkpointCoordinatorConfiguration, - new UnregisteredTaskMetricsGroup())); + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup())); final CheckpointCoordinator checkpointCoordinator = graph.getCheckpointCoordinator(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java index 64f82f3fd0d..6f98119ed28 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java @@ -29,7 +29,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; -import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.taskmanager.TaskActions; import org.junit.Test; @@ -63,11 +63,11 @@ public class InputGateConcurrentTest { new IntermediateDataSetID(), ResultPartitionType.PIPELINED, 0, numChannels, mock(TaskActions.class), - new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); for (int i = 0; i < numChannels; i++) { LocalInputChannel channel = new LocalInputChannel(gate, i, new ResultPartitionID(), - resultPartitionManager, mock(TaskEventDispatcher.class), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); + resultPartitionManager, mock(TaskEventDispatcher.class), UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); gate.setInputChannel(new IntermediateResultPartitionID(), channel); partitions[i] = new PipelinedSubpartition(0, resultPartition); @@ -99,12 +99,12 @@ public class InputGateConcurrentTest { 0, numChannels, mock(TaskActions.class), - new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); for (int i = 0; i < numChannels; i++) { RemoteInputChannel channel = new RemoteInputChannel( gate, i, new ResultPartitionID(), mock(ConnectionID.class), - connManager, 0, 0, new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); + connManager, 0, 0, UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); gate.setInputChannel(new IntermediateResultPartitionID(), channel); sources[i] = new RemoteChannelSource(channel); @@ -148,7 +148,7 @@ public class InputGateConcurrentTest { 0, numChannels, mock(TaskActions.class), - new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); for (int i = 0, local = 0; i < numChannels; i++) { if (localOrRemote.get(i)) { @@ -158,14 +158,14 @@ public class InputGateConcurrentTest { sources[i] = new PipelinedSubpartitionSource(psp); LocalInputChannel channel = new LocalInputChannel(gate, i, new ResultPartitionID(), - resultPartitionManager, mock(TaskEventDispatcher.class), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); + resultPartitionManager, mock(TaskEventDispatcher.class), UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); gate.setInputChannel(new IntermediateResultPartitionID(), channel); } else { //remote channel RemoteInputChannel channel = new RemoteInputChannel( gate, i, new ResultPartitionID(), mock(ConnectionID.class), - connManager, 0, 0, new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); + connManager, 0, 0, UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); gate.setInputChannel(new IntermediateResultPartitionID(), channel); sources[i] = new RemoteChannelSource(channel); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java index f933840073b..324a0607718 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java @@ -33,7 +33,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; -import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.taskmanager.TaskActions; import org.junit.Test; @@ -89,11 +89,11 @@ public class InputGateFairnessTest { new IntermediateDataSetID(), 0, numChannels, mock(TaskActions.class), - new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); for (int i = 0; i < numChannels; i++) { LocalInputChannel channel = new LocalInputChannel(gate, i, new ResultPartitionID(), - resultPartitionManager, mock(TaskEventDispatcher.class), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); + resultPartitionManager, mock(TaskEventDispatcher.class), UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); gate.setInputChannel(new IntermediateResultPartitionID(), channel); } @@ -142,11 +142,11 @@ public class InputGateFairnessTest { new IntermediateDataSetID(), 0, numChannels, mock(TaskActions.class), - new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); for (int i = 0; i < numChannels; i++) { LocalInputChannel channel = new LocalInputChannel(gate, i, new ResultPartitionID(), - resultPartitionManager, mock(TaskEventDispatcher.class), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); + resultPartitionManager, mock(TaskEventDispatcher.class), UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); gate.setInputChannel(new IntermediateResultPartitionID(), channel); } @@ -192,7 +192,7 @@ public class InputGateFairnessTest { new IntermediateDataSetID(), 0, numChannels, mock(TaskActions.class), - new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); final ConnectionManager connManager = createDummyConnectionManager(); @@ -201,7 +201,7 @@ public class InputGateFairnessTest { for (int i = 0; i < numChannels; i++) { RemoteInputChannel channel = new RemoteInputChannel( gate, i, new ResultPartitionID(), mock(ConnectionID.class), - connManager, 0, 0, new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); + connManager, 0, 0, UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); channels[i] = channel; @@ -247,7 +247,7 @@ public class InputGateFairnessTest { new IntermediateDataSetID(), 0, numChannels, mock(TaskActions.class), - new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); final ConnectionManager connManager = createDummyConnectionManager(); @@ -257,7 +257,7 @@ public class InputGateFairnessTest { for (int i = 0; i < numChannels; i++) { RemoteInputChannel channel = new RemoteInputChannel( gate, i, new ResultPartitionID(), mock(ConnectionID.class), - connManager, 0, 0, new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); + connManager, 0, 0, UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); channels[i] = channel; gate.setInputChannel(new IntermediateResultPartitionID(), channel); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java index 5f7fd823668..16cd90defcf 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java @@ -38,7 +38,7 @@ import org.apache.flink.runtime.io.network.util.TestPartitionProducer; import org.apache.flink.runtime.io.network.util.TestProducerSource; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; -import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.taskmanager.TaskActions; import org.apache.flink.shaded.guava18.com.google.common.collect.Lists; @@ -291,7 +291,7 @@ public class LocalInputChannelTest { 0, 1, mock(TaskActions.class), - new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup() + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup() ); ResultPartitionManager partitionManager = mock(ResultPartitionManager.class); @@ -318,7 +318,7 @@ public class LocalInputChannelTest { partitionManager, new TaskEventDispatcher(), 1, 1, - new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); gate.setInputChannel(new IntermediateResultPartitionID(), channel); @@ -370,7 +370,7 @@ public class LocalInputChannelTest { new ResultPartitionID(), partitionManager, new TaskEventDispatcher(), - new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); channel.requestSubpartition(0); @@ -411,7 +411,7 @@ public class LocalInputChannelTest { mock(TaskEventDispatcher.class), initialAndMaxRequestBackoff._1(), initialAndMaxRequestBackoff._2(), - new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); } /** @@ -487,7 +487,7 @@ public class LocalInputChannelTest { subpartitionIndex, numberOfInputChannels, mock(TaskActions.class), - new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); // Set buffer pool inputGate.setBufferPool(bufferPool); @@ -502,7 +502,7 @@ public class LocalInputChannelTest { consumedPartitionIds[i], partitionManager, taskEventDispatcher, - new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup())); + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup())); } this.numberOfInputChannels = numberOfInputChannels; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java index bced9ce3d6e..d791ced7eb3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java @@ -28,7 +28,7 @@ import org.apache.flink.runtime.io.network.netty.PartitionRequestClient; import org.apache.flink.runtime.io.network.partition.ProducerFailedException; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.util.TestBufferFactory; -import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.shaded.guava18.com.google.common.collect.Lists; @@ -270,7 +270,7 @@ public class RemoteInputChannelTest { partitionId, mock(ConnectionID.class), connectionManager, - new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); ch.onFailedPartitionRequest(); @@ -290,7 +290,7 @@ public class RemoteInputChannelTest { new ResultPartitionID(), mock(ConnectionID.class), connManager, - new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); ch.onError(new ProducerFailedException(new RuntimeException("Expected test exception."))); @@ -401,6 +401,6 @@ public class RemoteInputChannelTest { connectionManager, initialAndMaxRequestBackoff._1(), initialAndMaxRequestBackoff._2(), - new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java index 4d7d8844c30..da649cd7350 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java @@ -43,7 +43,7 @@ import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView; import org.apache.flink.runtime.io.network.util.TestTaskEvent; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; -import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.taskmanager.TaskActions; import org.junit.Test; @@ -80,7 +80,7 @@ public class SingleInputGateTest { new IntermediateDataSetID(), ResultPartitionType.PIPELINED, 0, 2, mock(TaskActions.class), - new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); assertEquals(ResultPartitionType.PIPELINED, inputGate.getConsumedPartitionType()); @@ -140,7 +140,7 @@ public class SingleInputGateTest { resultId, ResultPartitionType.PIPELINED, 0, 2, mock(TaskActions.class), - new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); final BufferPool bufferPool = mock(BufferPool.class); when(bufferPool.getNumberOfRequiredMemorySegments()).thenReturn(2); @@ -149,12 +149,12 @@ public class SingleInputGateTest { // Local ResultPartitionID localPartitionId = new ResultPartitionID(new IntermediateResultPartitionID(), new ExecutionAttemptID()); - InputChannel local = new LocalInputChannel(inputGate, 0, localPartitionId, partitionManager, taskEventDispatcher, new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); + InputChannel local = new LocalInputChannel(inputGate, 0, localPartitionId, partitionManager, taskEventDispatcher, UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); // Unknown ResultPartitionID unknownPartitionId = new ResultPartitionID(new IntermediateResultPartitionID(), new ExecutionAttemptID()); - InputChannel unknown = new UnknownInputChannel(inputGate, 1, unknownPartitionId, partitionManager, taskEventDispatcher, mock(ConnectionManager.class), 0, 0, new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); + InputChannel unknown = new UnknownInputChannel(inputGate, 1, unknownPartitionId, partitionManager, taskEventDispatcher, mock(ConnectionManager.class), 0, 0, UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); // Set channels inputGate.setInputChannel(localPartitionId.getPartitionId(), local); @@ -195,7 +195,7 @@ public class SingleInputGateTest { ResultPartitionType.PIPELINED, 0, 1, - mock(TaskActions.class), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); + mock(TaskActions.class), UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); ResultPartitionManager partitionManager = mock(ResultPartitionManager.class); @@ -206,7 +206,7 @@ public class SingleInputGateTest { partitionManager, new TaskEventDispatcher(), new LocalConnectionManager(), - 0, 0, new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); + 0, 0, UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); inputGate.setInputChannel(unknown.partitionId.getPartitionId(), unknown); @@ -236,7 +236,7 @@ public class SingleInputGateTest { 0, 1, mock(TaskActions.class), - new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); InputChannel unknown = new UnknownInputChannel( inputGate, @@ -246,7 +246,7 @@ public class SingleInputGateTest { new TaskEventDispatcher(), new LocalConnectionManager(), 0, 0, - new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); inputGate.setInputChannel(unknown.partitionId.getPartitionId(), unknown); @@ -339,7 +339,7 @@ public class SingleInputGateTest { gateDesc, netEnv, mock(TaskActions.class), - new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); assertEquals(gateDesc.getConsumedPartitionType(), gate.getConsumedPartitionType()); @@ -388,7 +388,7 @@ public class SingleInputGateTest { 0, 1, mock(TaskActions.class), - new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); RemoteInputChannel remote = mock(RemoteInputChannel.class); inputGate.setInputChannel(new IntermediateResultPartitionID(), remote); @@ -416,7 +416,7 @@ public class SingleInputGateTest { 0, 1, mock(TaskActions.class), - new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); UnknownInputChannel unknown = mock(UnknownInputChannel.class); final ResultPartitionID resultPartitionId = new ResultPartitionID(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java index 18ad49065c9..0ae6e747920 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java @@ -22,7 +22,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; -import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.taskmanager.TaskActions; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -60,7 +60,7 @@ public class TestSingleInputGate { 0, numberOfInputChannels, mock(TaskActions.class), - new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); this.inputGate = spy(realGate); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java index bc1dd07e629..9884855bed5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java @@ -21,7 +21,7 @@ package org.apache.flink.runtime.io.network.partition.consumer; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; -import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.taskmanager.TaskActions; import org.junit.Test; @@ -48,13 +48,13 @@ public class UnionInputGateTest { new IntermediateDataSetID(), ResultPartitionType.PIPELINED, 0, 3, mock(TaskActions.class), - new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); final SingleInputGate ig2 = new SingleInputGate( testTaskName, new JobID(), new IntermediateDataSetID(), ResultPartitionType.PIPELINED, 0, 5, mock(TaskActions.class), - new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); final UnionInputGate union = new UnionInputGate(new SingleInputGate[]{ig1, ig2}); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHAJobGraphRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHAJobGraphRecoveryITCase.java index f5d68020e09..0b7547df33a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHAJobGraphRecoveryITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHAJobGraphRecoveryITCase.java @@ -203,7 +203,7 @@ public class JobManagerHAJobGraphRecoveryITCase extends TestLogger { ResourceID.generate(), taskManagerSystem, highAvailabilityServices, - new NoOpMetricRegistry(), + NoOpMetricRegistry.INSTANCE, "localhost", Option.empty(), false, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java index 88141d6ca71..f86e7e199c7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java @@ -64,6 +64,7 @@ import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; import org.apache.flink.runtime.messages.JobManagerMessages; import org.apache.flink.runtime.metrics.NoOpMetricRegistry; import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; import org.apache.flink.runtime.taskmanager.TaskManager; @@ -206,7 +207,7 @@ public class JobManagerHARecoveryTest extends TestLogger { mySubmittedJobGraphStore, checkpointStateFactory, jobRecoveryTimeout, - new JobManagerMetricGroup(new NoOpMetricRegistry(), "localhost"), + UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(), Option.empty()); jobManager = system.actorOf(jobManagerProps); @@ -217,7 +218,7 @@ public class JobManagerHARecoveryTest extends TestLogger { ResourceID.generate(), system, testingHighAvailabilityServices, - new NoOpMetricRegistry(), + NoOpMetricRegistry.INSTANCE, "localhost", Option.apply("taskmanager"), true, @@ -381,7 +382,7 @@ public class JobManagerHARecoveryTest extends TestLogger { submittedJobGraphStore, mock(CheckpointRecoveryFactory.class), jobRecoveryTimeout, - new JobManagerMetricGroup(new NoOpMetricRegistry(), "localhost"), + UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(), recoveredJobs).withDispatcher(CallingThreadDispatcher.Id()); jobManager = system.actorOf(jobManagerProps); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java index 6a02d1f0244..51cc469d560 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java @@ -626,7 +626,7 @@ public class JobManagerTest extends TestLogger { TestingUtils.defaultExecutor(), TestingUtils.defaultExecutor(), highAvailabilityServices, - new NoOpMetricRegistry(), + NoOpMetricRegistry.INSTANCE, Option.empty(), TestingJobManager.class, MemoryArchivist.class)._1(); @@ -648,7 +648,7 @@ public class JobManagerTest extends TestLogger { ResourceID.generate(), system, highAvailabilityServices, - new NoOpMetricRegistry(), + NoOpMetricRegistry.INSTANCE, "localhost", scala.Option.empty(), true, @@ -845,7 +845,7 @@ public class JobManagerTest extends TestLogger { TestingUtils.defaultExecutor(), TestingUtils.defaultExecutor(), highAvailabilityServices, - new NoOpMetricRegistry(), + NoOpMetricRegistry.INSTANCE, Option.empty(), Option.apply("jm"), Option.apply("arch"), @@ -864,7 +864,7 @@ public class JobManagerTest extends TestLogger { ResourceID.generate(), actorSystem, highAvailabilityServices, - new NoOpMetricRegistry(), + NoOpMetricRegistry.INSTANCE, "localhost", Option.apply("tm"), true, @@ -1057,7 +1057,7 @@ public class JobManagerTest extends TestLogger { TestingUtils.defaultExecutor(), TestingUtils.defaultExecutor(), highAvailabilityServices, - new NoOpMetricRegistry(), + NoOpMetricRegistry.INSTANCE, Option.empty(), Option.apply("jm"), Option.apply("arch"), @@ -1076,7 +1076,7 @@ public class JobManagerTest extends TestLogger { ResourceID.generate(), actorSystem, highAvailabilityServices, - new NoOpMetricRegistry(), + NoOpMetricRegistry.INSTANCE, "localhost", Option.apply("tm"), true, @@ -1172,7 +1172,7 @@ public class JobManagerTest extends TestLogger { TestingUtils.defaultExecutor(), TestingUtils.defaultExecutor(), highAvailabilityServices, - new NoOpMetricRegistry(), + NoOpMetricRegistry.INSTANCE, Option.empty(), Option.apply("jm"), Option.apply("arch"), @@ -1191,7 +1191,7 @@ public class JobManagerTest extends TestLogger { ResourceID.generate(), actorSystem, highAvailabilityServices, - new NoOpMetricRegistry(), + NoOpMetricRegistry.INSTANCE, "localhost", Option.apply("tm"), true, @@ -1285,7 +1285,7 @@ public class JobManagerTest extends TestLogger { TestingUtils.defaultExecutor(), TestingUtils.defaultExecutor(), highAvailabilityServices, - new NoOpMetricRegistry(), + NoOpMetricRegistry.INSTANCE, Option.empty(), Option.apply("jm"), Option.apply("arch"), @@ -1307,7 +1307,7 @@ public class JobManagerTest extends TestLogger { ResourceID.generate(), actorSystem, highAvailabilityServices, - new NoOpMetricRegistry(), + NoOpMetricRegistry.INSTANCE, "localhost", Option.apply("tm"), true, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java index cc93f189519..0ca83ae62d1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java @@ -95,7 +95,7 @@ public class JobSubmitTest { TestingUtils.defaultExecutor(), TestingUtils.defaultExecutor(), highAvailabilityServices, - new NoOpMetricRegistry(), + NoOpMetricRegistry.INSTANCE, Option.empty(), JobManager.class, MemoryArchivist.class)._1(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java index 72c03af5e10..703cd0bf085 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java @@ -35,6 +35,7 @@ import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore; import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.metrics.NoOpMetricRegistry; import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.testingUtils.TestingJobManager; import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages; import org.apache.flink.runtime.testingUtils.TestingUtils; @@ -201,7 +202,7 @@ public class JobManagerLeaderElectionTest extends TestLogger { submittedJobGraphStore, checkpointRecoveryFactory, AkkaUtils.getDefaultTimeoutAsFiniteDuration(), - new JobManagerMetricGroup(new NoOpMetricRegistry(), "localhost"), + UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(), Option.empty()); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java index d934ea9936d..eec71658f76 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java @@ -78,7 +78,7 @@ public class TaskManagerMetricsTest extends TestLogger { TestingUtils.defaultExecutor(), TestingUtils.defaultExecutor(), highAvailabilityServices, - new NoOpMetricRegistry(), + NoOpMetricRegistry.INSTANCE, Option.empty(), JobManager.class, MemoryArchivist.class)._1(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java index 0fced3316f9..4dc5edf8a17 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java @@ -86,7 +86,7 @@ public class MetricGroupTest extends TestLogger { */ @Test public void testUserDefinedVariable() { - MetricRegistry registry = new NoOpMetricRegistry(); + MetricRegistry registry = NoOpMetricRegistry.INSTANCE; GenericMetricGroup root = new GenericMetricGroup(registry, new DummyAbstractMetricGroup(registry), "root"); String key = "key"; @@ -111,7 +111,7 @@ public class MetricGroupTest extends TestLogger { */ @Test public void testUserDefinedVariableOnKeyGroup() { - MetricRegistry registry = new NoOpMetricRegistry(); + MetricRegistry registry = NoOpMetricRegistry.INSTANCE; GenericMetricGroup root = new GenericMetricGroup(registry, new DummyAbstractMetricGroup(registry), "root"); String key1 = "key1"; @@ -142,7 +142,7 @@ public class MetricGroupTest extends TestLogger { */ @Test public void testNameCollisionForKeyAfterGenericGroup() { - MetricRegistry registry = new NoOpMetricRegistry(); + MetricRegistry registry = NoOpMetricRegistry.INSTANCE; GenericMetricGroup root = new GenericMetricGroup(registry, new DummyAbstractMetricGroup(registry), "root"); String key = "key"; @@ -169,7 +169,7 @@ public class MetricGroupTest extends TestLogger { */ @Test public void testNameCollisionForKeyAndValueAfterGenericGroup() { - MetricRegistry registry = new NoOpMetricRegistry(); + MetricRegistry registry = NoOpMetricRegistry.INSTANCE; GenericMetricGroup root = new GenericMetricGroup(registry, new DummyAbstractMetricGroup(registry), "root"); String key = "key"; @@ -195,7 +195,7 @@ public class MetricGroupTest extends TestLogger { */ @Test public void testNameCollisionAfterKeyValueGroup() { - MetricRegistry registry = new NoOpMetricRegistry(); + MetricRegistry registry = NoOpMetricRegistry.INSTANCE; GenericMetricGroup root = new GenericMetricGroup(registry, new DummyAbstractMetricGroup(registry), "root"); String key = "key"; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java index bcf77de632d..f23b2f5def0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java @@ -21,7 +21,6 @@ package org.apache.flink.runtime.metrics.groups; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.SimpleCounter; import org.apache.flink.runtime.executiongraph.IOMetrics; -import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup; import org.junit.Test; @@ -34,7 +33,7 @@ import static org.junit.Assert.assertNotNull; public class TaskIOMetricGroupTest { @Test public void testTaskIOMetricGroup() { - TaskMetricGroup task = new UnregisteredTaskMetricsGroup(); + TaskMetricGroup task = UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(); TaskIOMetricGroup taskIO = task.getIOMetricGroup(); // test counter forwarding diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java index 4fa74b33b5f..a4d14c41aad 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java @@ -29,10 +29,10 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.operators.DriverStrategy; import org.apache.flink.runtime.operators.TaskContext; import org.apache.flink.runtime.operators.testutils.DummyInvokable; -import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup; import org.apache.flink.runtime.operators.util.TaskConfig; import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo; @@ -227,6 +227,6 @@ public class TestTaskContext implements TaskContext { @Override public OperatorMetricGroup getMetricGroup() { - return new UnregisteredTaskMetricsGroup.DummyOperatorMetricGroup(); + return UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup(); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java index 96c8b73cec0..a76f110178b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java @@ -26,13 +26,12 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerFactory; import org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory; import org.apache.flink.configuration.Configuration; -import org.apache.flink.metrics.MetricGroup; -import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.operators.Driver; import org.apache.flink.runtime.operators.ResettableDriver; import org.apache.flink.runtime.operators.TaskContext; @@ -373,7 +372,7 @@ public abstract class BinaryOperatorTestBase extend @Override public OperatorMetricGroup getMetricGroup() { - return new UnregisteredTaskMetricsGroup.DummyOperatorMetricGroup(); + return UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup(); } // -------------------------------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java index d2cedb994d9..3820bf91b04 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java @@ -29,6 +29,7 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.operators.Driver; import org.apache.flink.runtime.operators.ResettableDriver; import org.apache.flink.runtime.operators.TaskContext; @@ -368,7 +369,7 @@ public abstract class DriverTestBase extends TestLogger impl @Override public OperatorMetricGroup getMetricGroup() { - return new UnregisteredTaskMetricsGroup.DummyOperatorMetricGroup(); + return UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup(); } // -------------------------------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java index 718ecfe04e5..148eb0bd960 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java @@ -37,6 +37,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.query.KvStateRegistry; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; @@ -100,7 +101,7 @@ public class DummyEnvironment implements Environment { @Override public TaskMetricGroup getMetricGroup() { - return new UnregisteredTaskMetricsGroup(); + return UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(); } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java index f655b127d6a..861cf35423d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java @@ -45,6 +45,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.query.KvStateRegistry; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; @@ -281,7 +282,7 @@ public class MockEnvironment implements Environment { @Override public TaskMetricGroup getMetricGroup() { - return new UnregisteredTaskMetricsGroup(); + return UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(); } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java index 141aec68823..2ef82da9359 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java @@ -31,6 +31,7 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.operators.Driver; import org.apache.flink.runtime.operators.TaskContext; import org.apache.flink.runtime.operators.ResettableDriver; @@ -364,7 +365,7 @@ public abstract class UnaryOperatorTestBase extends @Override public OperatorMetricGroup getMetricGroup() { - return new UnregisteredTaskMetricsGroup.DummyOperatorMetricGroup(); + return UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup(); } // -------------------------------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java deleted file mode 100644 index 7065e6b806f..00000000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.operators.testutils; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.metrics.Metric; -import org.apache.flink.metrics.MetricGroup; -import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; -import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.jobgraph.OperatorID; -import org.apache.flink.runtime.metrics.MetricRegistry; -import org.apache.flink.runtime.metrics.NoOpMetricRegistry; -import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup; -import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; -import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup; -import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; -import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; - -import java.util.UUID; - -public class UnregisteredTaskMetricsGroup extends TaskMetricGroup { - - private static final MetricRegistry EMPTY_REGISTRY = new NoOpMetricRegistry(); - - - public UnregisteredTaskMetricsGroup() { - super(EMPTY_REGISTRY, new DummyJobMetricGroup(), - new JobVertexID(), new ExecutionAttemptID(), "testtask", 0, 0); - } - - @Override - protected void addMetric(String name, Metric metric) {} - - @Override - public MetricGroup addGroup(String name) { - return new UnregisteredMetricsGroup(); - } - - // ------------------------------------------------------------------------ - - private static class DummyTaskManagerMetricsGroup extends TaskManagerMetricGroup { - - public DummyTaskManagerMetricsGroup() { - super(EMPTY_REGISTRY, "localhost", UUID.randomUUID().toString()); - } - } - - private static class DummyJobMetricGroup extends TaskManagerJobMetricGroup { - - public DummyJobMetricGroup() { - super(EMPTY_REGISTRY, new DummyTaskManagerMetricsGroup(), new JobID(), "testjob"); - } - } - - public static class DummyTaskIOMetricGroup extends TaskIOMetricGroup { - public DummyTaskIOMetricGroup() { - super(new UnregisteredTaskMetricsGroup()); - } - } - - public static class DummyOperatorMetricGroup extends OperatorMetricGroup { - public DummyOperatorMetricGroup() { - super(EMPTY_REGISTRY, new UnregisteredTaskMetricsGroup(), new OperatorID(), "testoperator"); - } - } -} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java index 3050718e353..87cb4a96c54 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java @@ -97,7 +97,7 @@ public class ResourceManagerTest extends TestLogger { highAvailabilityServices, new HeartbeatServices(1000L, 10000L), slotManager, - new NoOpMetricRegistry(), + NoOpMetricRegistry.INSTANCE, jobLeaderIdService, testingFatalErrorHandler); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java index 98b5b8b1116..a3c41c59ef5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java @@ -98,7 +98,7 @@ public class TaskManagerComponentsStartupShutdownTest extends TestLogger { TestingUtils.defaultExecutor(), TestingUtils.defaultExecutor(), highAvailabilityServices, - new NoOpMetricRegistry(), + NoOpMetricRegistry.INSTANCE, Option.empty(), JobManager.class, MemoryArchivist.class)._1(); @@ -168,7 +168,7 @@ public class TaskManagerComponentsStartupShutdownTest extends TestLogger { network, numberOfSlots, highAvailabilityServices, - new TaskManagerMetricGroup(new NoOpMetricRegistry(), connectionInfo.getHostname(), connectionInfo.getResourceID().getResourceIdString())); + new TaskManagerMetricGroup(NoOpMetricRegistry.INSTANCE, connectionInfo.getHostname(), connectionInfo.getResourceID().getResourceIdString())); taskManager = actorSystem.actorOf(tmProps); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java index 7429ec5e80c..fadebce485d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java @@ -122,7 +122,7 @@ public abstract class TaskManagerProcessReapingTestBase extends TestLogger { TestingUtils.defaultExecutor(), TestingUtils.defaultExecutor(), highAvailabilityServices, - new NoOpMetricRegistry(), + NoOpMetricRegistry.INSTANCE, Option.empty(), JobManager.class, MemoryArchivist.class)._1; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java index ed06dc0a472..4c7a8cfd0bd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java @@ -250,7 +250,7 @@ public class TaskManagerStartupTest extends TestLogger { ResourceID.generate(), null, highAvailabilityServices, - new NoOpMetricRegistry(), + NoOpMetricRegistry.INSTANCE, "localhost", Option.empty(), false, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java index 38238cd09af..086ad71b65b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java @@ -52,7 +52,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; import org.apache.flink.runtime.memory.MemoryManager; -import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration; import org.apache.flink.runtime.taskmanager.CheckpointResponder; @@ -193,7 +193,7 @@ public class JvmExitOnFatalErrorTest { new String[0]), new FileCache(tmInfo.getTmpDirectories()), tmInfo, - new UnregisteredTaskMetricsGroup(), + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(), new NoOpResultPartitionConsumableNotifier(), new NoOpPartitionProducerStateChecker(), executor); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java index 58022a9c61a..4062749ae94 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java @@ -33,12 +33,12 @@ import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.MetricGroup; -import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.checkpoint.CheckpointOptions.CheckpointType; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.DefaultKeyedStateStore; @@ -155,7 +155,7 @@ public abstract class AbstractStreamOperator // --------------- Metrics --------------------------- /** Metric group for the operator. */ - protected transient MetricGroup metrics; + protected transient OperatorMetricGroup metrics; protected transient LatencyGauge latencyGauge; @@ -191,7 +191,7 @@ public abstract class AbstractStreamOperator this.metrics = operatorMetricGroup; } catch (Exception e) { LOG.warn("An error occurred while instantiating task metrics.", e); - this.metrics = new UnregisteredMetricsGroup(); + this.metrics = UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup(); this.output = output; } Configuration taskManagerConfig = container.getEnvironment().getTaskManagerInfo().getConfiguration(); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java index 609f8b89eff..0c71a53782b 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java @@ -25,6 +25,7 @@ import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.SimpleCounter; import org.apache.flink.runtime.event.AbstractEvent; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; @@ -49,6 +50,9 @@ import org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve; import org.apache.flink.streaming.runtime.streamstatus.StreamStatus; import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.IOException; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -70,6 +74,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull; @Internal public class StreamInputProcessor { + private static final Logger LOG = LoggerFactory.getLogger(StreamInputProcessor.class); + private final RecordDeserializer>[] recordDeserializers; private RecordDeserializer> currentRecordDeserializer; @@ -169,7 +175,12 @@ public class StreamInputProcessor { return false; } if (numRecordsIn == null) { - numRecordsIn = ((OperatorMetricGroup) streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter(); + try { + numRecordsIn = ((OperatorMetricGroup) streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter(); + } catch (Exception e) { + LOG.warn("An exception occurred during the metrics setup.", e); + numRecordsIn = new SimpleCounter(); + } } while (true) { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java index 78741471b1b..824acad4889 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java @@ -25,6 +25,7 @@ import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.SimpleCounter; import org.apache.flink.runtime.event.AbstractEvent; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; @@ -49,6 +50,9 @@ import org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve; import org.apache.flink.streaming.runtime.streamstatus.StreamStatus; import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.IOException; import java.util.Collection; @@ -72,6 +76,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull; @Internal public class StreamTwoInputProcessor { + private static final Logger LOG = LoggerFactory.getLogger(StreamTwoInputProcessor.class); + private final RecordDeserializer>[] recordDeserializers; private RecordDeserializer> currentRecordDeserializer; @@ -201,7 +207,12 @@ public class StreamTwoInputProcessor { return false; } if (numRecordsIn == null) { - numRecordsIn = ((OperatorMetricGroup) streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter(); + try { + numRecordsIn = ((OperatorMetricGroup) streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter(); + } catch (Exception e) { + LOG.warn("An exception occurred during the metrics setup.", e); + numRecordsIn = new SimpleCounter(); + } } while (true) { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java index a44cffb437e..141a6234674 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java @@ -21,11 +21,13 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.SimpleCounter; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; +import org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup; import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup; import org.apache.flink.runtime.plugable.SerializationDelegate; import org.apache.flink.streaming.api.collector.selector.CopyingDirectedOutput; @@ -426,7 +428,21 @@ public class OperatorChain> implements Strea StreamStatusProvider streamStatusProvider, OutputTag outputTag) { this.operator = operator; - this.numRecordsIn = ((OperatorMetricGroup) operator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter(); + + { + Counter tmpNumRecordsIn; + try { + OperatorIOMetricGroup ioMetricGroup = ((OperatorMetricGroup) operator.getMetricGroup()).getIOMetricGroup(); + ioMetricGroup.reuseInputMetricsForTask(); + ioMetricGroup.reuseOutputMetricsForTask(); + tmpNumRecordsIn = ioMetricGroup.getNumRecordsInCounter(); + } catch (Exception e) { + LOG.warn("An exception occurred during the metrics setup.", e); + tmpNumRecordsIn = new SimpleCounter(); + } + numRecordsIn = tmpNumRecordsIn; + } + this.streamStatusProvider = streamStatusProvider; this.outputTag = outputTag; } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java index d6f5e618450..2618e5329cc 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java @@ -33,6 +33,7 @@ import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.ReducingStateDescriptor; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.junit.Test; @@ -101,7 +102,7 @@ public class RichAsyncFunctionTest { }; final String taskName = "foobarTask"; - final MetricGroup metricGroup = mock(MetricGroup.class); + final MetricGroup metricGroup = new UnregisteredMetricsGroup(); final int numberOfParallelSubtasks = 42; final int indexOfSubtask = 43; final int attemptNumber = 1337; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java index 993bffb484c..a556b18a5e4 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java @@ -35,8 +35,8 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; -import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup; import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo; import org.apache.flink.streaming.api.datastream.AsyncDataStream; @@ -658,7 +658,7 @@ public class AsyncWaitOperatorTest extends TestLogger { final Configuration taskConfiguration = new Configuration(); final ExecutionConfig executionConfig = new ExecutionConfig(); - final TaskMetricGroup metricGroup = new UnregisteredTaskMetricsGroup(); + final TaskMetricGroup metricGroup = UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(); final TaskManagerRuntimeInfo taskManagerRuntimeInfo = new TestingTaskManagerRuntimeInfo(); final TaskInfo taskInfo = new TaskInfo("foobarTask", 1, 0, 1, 1); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java index eacded6dc5d..0af1471a09a 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java @@ -49,7 +49,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; import org.apache.flink.runtime.memory.MemoryManager; -import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.DefaultOperatorStateBackend; import org.apache.flink.runtime.state.FunctionInitializationContext; @@ -272,7 +272,7 @@ public class InterruptSensitiveRestoreTest { new String[0]), new FileCache(new String[] { EnvironmentInformation.getTemporaryFileDirectory() }), new TestingTaskManagerRuntimeInfo(), - new UnregisteredTaskMetricsGroup(), + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(), mock(ResultPartitionConsumableNotifier.class), mock(PartitionProducerStateChecker.class), mock(Executor.class)); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java index 277ca51d7c5..ee7337ce08a 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java @@ -47,8 +47,8 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; -import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup; import org.apache.flink.runtime.plugable.DeserializationDelegate; import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate; import org.apache.flink.runtime.query.KvStateRegistry; @@ -363,6 +363,6 @@ public class StreamMockEnvironment implements Environment { @Override public TaskMetricGroup getMetricGroup() { - return new UnregisteredTaskMetricsGroup(); + return UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(); } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java index 5480ce7e77e..e3e51aa0d39 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java @@ -49,7 +49,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; import org.apache.flink.runtime.memory.MemoryManager; -import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; import org.apache.flink.runtime.state.AbstractStateBackend; @@ -168,7 +168,7 @@ public class StreamTaskTerminationTest extends TestLogger { new String[0]), mock(FileCache.class), taskManagerRuntimeInfo, - new UnregisteredTaskMetricsGroup(), + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(), mock(ResultPartitionConsumableNotifier.class), mock(PartitionProducerStateChecker.class), Executors.directExecutor()); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java index d0ea7147ee6..8ce8b03a61b 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java @@ -58,9 +58,9 @@ import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.operators.testutils.MockEnvironment; import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; -import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; import org.apache.flink.runtime.state.AbstractStateBackend; @@ -935,7 +935,7 @@ public class StreamTaskTest extends TestLogger { libCache, mock(FileCache.class), new TestingTaskManagerRuntimeInfo(taskManagerConfig, new String[] {System.getProperty("java.io.tmpdir")}), - new UnregisteredTaskMetricsGroup(), + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(), consumableNotifier, partitionProducerStateChecker, executor); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java index d61b95d47b5..b1127d5be07 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java @@ -52,7 +52,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; import org.apache.flink.runtime.memory.MemoryManager; -import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.CheckpointStreamFactory; @@ -244,7 +244,7 @@ public class TaskCheckpointingBehaviourTest extends TestLogger { new String[0]), new FileCache(new String[] { EnvironmentInformation.getTemporaryFileDirectory() }), new TestingTaskManagerRuntimeInfo(), - new UnregisteredTaskMetricsGroup(), + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(), mock(ResultPartitionConsumableNotifier.class), mock(PartitionProducerStateChecker.class), Executors.directExecutor()); diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java index 94aed2a439f..29516dcab5a 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java @@ -146,7 +146,7 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test TestingUtils.defaultExecutor(), TestingUtils.defaultExecutor(), highAvailabilityServices, - new NoOpMetricRegistry(), + NoOpMetricRegistry.INSTANCE, Option.empty(), JobManager.class, MemoryArchivist.class)._1(); diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java index 7c53d523a9e..cefadb4ef7a 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java @@ -218,7 +218,7 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger { ResourceID.generate(), taskManagerSystem, highAvailabilityServices, - new NoOpMetricRegistry(), + NoOpMetricRegistry.INSTANCE, "localhost", Option.empty(), false, diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java index 8e97e9d17cf..357f7afee9d 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java @@ -282,7 +282,7 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger { ResourceID.generate(), tmActorSystem[i], highAvailabilityServices, - new NoOpMetricRegistry(), + NoOpMetricRegistry.INSTANCE, "localhost", Option.empty(), false, diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java index ecd0beac572..13d6804d40e 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java @@ -121,7 +121,7 @@ public class ProcessFailureCancelingITCase extends TestLogger { TestingUtils.defaultExecutor(), TestingUtils.defaultExecutor(), highAvailabilityServices, - new NoOpMetricRegistry(), + NoOpMetricRegistry.INSTANCE, Option.empty(), JobManager.class, MemoryArchivist.class)._1(); diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java index c86f21fddc2..9710c2080c0 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java @@ -122,7 +122,7 @@ public abstract class AbstractOperatorRestoreTestBase extends TestLogger { TestingUtils.defaultExecutor(), TestingUtils.defaultExecutor(), highAvailabilityServices, - new NoOpMetricRegistry(), + NoOpMetricRegistry.INSTANCE, Option.empty(), Option.apply("jm"), Option.apply("arch"), @@ -144,7 +144,7 @@ public abstract class AbstractOperatorRestoreTestBase extends TestLogger { ResourceID.generate(), actorSystem, highAvailabilityServices, - new NoOpMetricRegistry(), + NoOpMetricRegistry.INSTANCE, "localhost", Option.apply("tm"), true, diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java index 031a4cb013f..39ddfa76b25 100644 --- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java @@ -257,7 +257,7 @@ public class YarnResourceManagerTest extends TestLogger { rmLeaderElectionService = new TestingLeaderElectionService(); highAvailabilityServices.setResourceManagerLeaderElectionService(rmLeaderElectionService); heartbeatServices = new TestingHeartbeatServices(5L, 5L, scheduledExecutor); - metricRegistry = new NoOpMetricRegistry(); + metricRegistry = NoOpMetricRegistry.INSTANCE; slotManager = new SlotManager( new ScheduledExecutorServiceAdapter(new DirectScheduledExecutorService()), Time.seconds(10), Time.seconds(10), Time.minutes(1)); -- GitLab