diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java index 2368091ee204c186388423e2118c9dcc3451b4b0..383eb13ea339bd3517b0558c541957418e21c5cf 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java @@ -20,7 +20,7 @@ package org.apache.flink.streaming.connectors.kafka.internal; import org.apache.flink.core.testutils.MultiShotLatch; import org.apache.flink.core.testutils.OneShotLatch; -import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.streaming.connectors.kafka.internals.ClosableBlockingQueue; import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; @@ -716,7 +716,7 @@ public class KafkaConsumerThreadTest { handover, new Properties(), unassignedPartitionsQueue, - mock(MetricGroup.class), + new UnregisteredMetricsGroup(), new KafkaConsumerCallBridge(), "test-kafka-consumer-thread", 0, diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java index f518d178b5dfeae50dc059136aa402f0fd206086..98816e462c819ea255186dc9807d8274cc2fc773 100644 --- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java +++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java @@ -25,7 +25,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.UnmodifiableConfiguration; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.execution.Environment; -import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo; import org.apache.flink.storm.util.AbstractTest; import org.apache.flink.storm.util.SplitStreamType; @@ -372,7 +372,7 @@ public class BoltWrapperTest extends AbstractTest { Environment env = mock(Environment.class); when(env.getTaskInfo()).thenReturn(new TaskInfo("Mock Task", 1, 0, 1, 0)); when(env.getUserClassLoader()).thenReturn(BoltWrapperTest.class.getClassLoader()); - when(env.getMetricGroup()).thenReturn(new UnregisteredTaskMetricsGroup()); + when(env.getMetricGroup()).thenReturn(UnregisteredMetricGroups.createUnregisteredTaskMetricGroup()); when(env.getTaskManagerInfo()).thenReturn(new TestingTaskManagerRuntimeInfo()); StreamTask mockTask = mock(StreamTask.class); diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java index 75a844caa5a77b05522ef5b9897497936c48c819..e6cfdda968c7c989ea0fa27b4269f5f4ed5b8232 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java @@ -194,7 +194,7 @@ public class WebRuntimeMonitorITCase extends TestLogger { TestingUtils.defaultExecutor(), TestingUtils.defaultExecutor(), highAvailabilityServices, - new NoOpMetricRegistry(), + NoOpMetricRegistry.INSTANCE, Option.apply(webMonitor[i].getRestAddress()), JobManager.class, MemoryArchivist.class)._1(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index c4c4445a66af2b03f7112ec1ddedbd7fa676eddd..6d0de74ba1ff4782a598e78635c5a641a1290d01 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -27,7 +27,6 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.InputSplit; import org.apache.flink.core.io.InputSplitAssigner; import org.apache.flink.metrics.MetricGroup; -import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.queryablestate.KvStateID; import org.apache.flink.runtime.StoppingException; import org.apache.flink.runtime.blob.BlobServer; @@ -78,6 +77,7 @@ import org.apache.flink.runtime.messages.webmonitor.ClusterOverview; import org.apache.flink.runtime.messages.webmonitor.JobDetails; import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails; import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.query.KvStateLocation; import org.apache.flink.runtime.query.KvStateLocationRegistry; import org.apache.flink.runtime.query.UnknownKvStateLocation; @@ -264,8 +264,8 @@ public class JobMaster extends FencedRpcEndpoint implements JobMast this.jobManagerMetricGroup = jobManagerMetricGroup; this.jobMetricGroup = jobManagerMetricGroup.addJob(jobGraph); } else { - this.jobManagerMetricGroup = new UnregisteredMetricsGroup(); - this.jobMetricGroup = new UnregisteredMetricsGroup(); + this.jobManagerMetricGroup = UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(); + this.jobMetricGroup = UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup(); } log.info("Initializing job {} ({}).", jobName, jid); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/NoOpMetricRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/NoOpMetricRegistry.java similarity index 83% rename from flink-runtime/src/test/java/org/apache/flink/runtime/metrics/NoOpMetricRegistry.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/metrics/NoOpMetricRegistry.java index 46d6548cd02da516a6ab5673147d08da675ede5e..c161aa261b3f3d5d311eeb466d27a3f9acf19207 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/NoOpMetricRegistry.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/NoOpMetricRegistry.java @@ -26,13 +26,16 @@ import org.apache.flink.runtime.metrics.scope.ScopeFormats; import javax.annotation.Nullable; /** - * Metric registry which does nothing and is intended for testing purposes. + * Metric registry which does nothing. */ public class NoOpMetricRegistry implements MetricRegistry { + private static final char delimiter = '.'; + private static final ScopeFormats scopeFormats = ScopeFormats.fromConfig(new Configuration()); - final char delimiter = ','; + public static final MetricRegistry INSTANCE = new NoOpMetricRegistry(); - final ScopeFormats scopeFormats = ScopeFormats.fromConfig(new Configuration()); + private NoOpMetricRegistry() { + } @Override public char getDelimiter() { @@ -50,10 +53,12 @@ public class NoOpMetricRegistry implements MetricRegistry { } @Override - public void register(Metric metric, String metricName, AbstractMetricGroup group) {} + public void register(Metric metric, String metricName, AbstractMetricGroup group) { + } @Override - public void unregister(Metric metric, String metricName, AbstractMetricGroup group) {} + public void unregister(Metric metric, String metricName, AbstractMetricGroup group) { + } @Override public ScopeFormats getScopeFormats() { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/UnregisteredMetricGroups.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/UnregisteredMetricGroups.java new file mode 100644 index 0000000000000000000000000000000000000000..3869aa642f918c32bf18c77c9277cb104b434d89 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/UnregisteredMetricGroups.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.metrics.groups; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.metrics.NoOpMetricRegistry; + +/** + * A collection of safe drop-in replacements for existing {@link ComponentMetricGroup}s. + */ +public class UnregisteredMetricGroups { + + private UnregisteredMetricGroups() { + } + + public static JobManagerMetricGroup createUnregisteredJobManagerMetricGroup() { + return new UnregisteredJobManagerMetricGroup(); + } + + public static JobManagerJobMetricGroup createUnregisteredJobManagerJobMetricGroup() { + return new UnregisteredJobManagerJobMetricGroup(); + } + + public static TaskManagerMetricGroup createUnregisteredTaskManagerMetricGroup() { + return new UnregisteredTaskManagerMetricGroup(); + } + + public static TaskManagerJobMetricGroup createUnregisteredTaskManagerJobMetricGroup() { + return new UnregisteredTaskManagerJobMetricGroup(); + } + + public static TaskMetricGroup createUnregisteredTaskMetricGroup() { + return new UnregisteredTaskMetricGroup(); + } + + public static OperatorMetricGroup createUnregisteredOperatorMetricGroup() { + return new UnregisteredOperatorMetricGroup(); + } + + /** + * A safe drop-in replacement for {@link JobManagerMetricGroup}s. + */ + public static class UnregisteredJobManagerMetricGroup extends JobManagerMetricGroup { + private static final String DEFAULT_HOST_NAME = "UnregisteredHost"; + + private UnregisteredJobManagerMetricGroup() { + super(NoOpMetricRegistry.INSTANCE, DEFAULT_HOST_NAME); + } + + @Override + public JobManagerJobMetricGroup addJob(JobGraph job) { + return createUnregisteredJobManagerJobMetricGroup(); + } + } + + /** + * A safe drop-in replacement for {@link JobManagerJobMetricGroup}s. + */ + public static class UnregisteredJobManagerJobMetricGroup extends JobManagerJobMetricGroup { + private static final JobID DEFAULT_JOB_ID = new JobID(0, 0); + private static final String DEFAULT_JOB_NAME = "UnregisteredJob"; + + protected UnregisteredJobManagerJobMetricGroup() { + super(NoOpMetricRegistry.INSTANCE, new UnregisteredJobManagerMetricGroup(), DEFAULT_JOB_ID, DEFAULT_JOB_NAME); + } + } + + /** + * A safe drop-in replacement for {@link TaskManagerMetricGroup}s. + */ + public static class UnregisteredTaskManagerMetricGroup extends TaskManagerMetricGroup { + private static final String DEFAULT_HOST_NAME = "UnregisteredHost"; + private static final String DEFAULT_TASKMANAGER_ID = "0"; + + protected UnregisteredTaskManagerMetricGroup() { + super(NoOpMetricRegistry.INSTANCE, DEFAULT_HOST_NAME, DEFAULT_TASKMANAGER_ID); + } + + @Override + public TaskMetricGroup addTaskForJob( + final JobID jobId, + final String jobName, + final JobVertexID jobVertexId, + final ExecutionAttemptID executionAttemptId, + final String taskName, + final int subtaskIndex, + final int attemptNumber) { + return createUnregisteredTaskMetricGroup(); + } + } + + /** + * A safe drop-in replacement for {@link TaskManagerJobMetricGroup}s. + */ + public static class UnregisteredTaskManagerJobMetricGroup extends TaskManagerJobMetricGroup { + private static final JobID DEFAULT_JOB_ID = new JobID(0, 0); + private static final String DEFAULT_JOB_NAME = "UnregisteredJob"; + + protected UnregisteredTaskManagerJobMetricGroup() { + super(NoOpMetricRegistry.INSTANCE, new UnregisteredTaskManagerMetricGroup(), DEFAULT_JOB_ID, DEFAULT_JOB_NAME); + } + + @Override + public TaskMetricGroup addTask( + final JobVertexID jobVertexId, + final ExecutionAttemptID executionAttemptID, + final String taskName, + final int subtaskIndex, + final int attemptNumber) { + return createUnregisteredTaskMetricGroup(); + } + } + + /** + * A safe drop-in replacement for {@link TaskMetricGroup}s. + */ + public static class UnregisteredTaskMetricGroup extends TaskMetricGroup { + private static final JobVertexID DEFAULT_VERTEX_ID = new JobVertexID(0, 0); + private static final ExecutionAttemptID DEFAULT_ATTEMPT_ID = new ExecutionAttemptID(0, 0); + private static final String DEFAULT_TASK_NAME = "UnregisteredTask"; + + protected UnregisteredTaskMetricGroup() { + super(NoOpMetricRegistry.INSTANCE, new UnregisteredTaskManagerJobMetricGroup(), + DEFAULT_VERTEX_ID, DEFAULT_ATTEMPT_ID, DEFAULT_TASK_NAME, 0, 0); + } + + @Override + public OperatorMetricGroup addOperator(OperatorID operatorID, String name) { + return createUnregisteredOperatorMetricGroup(); + } + } + + /** + * A safe drop-in replacement for {@link OperatorMetricGroup}s. + */ + public static class UnregisteredOperatorMetricGroup extends OperatorMetricGroup { + private static final OperatorID DEFAULT_OPERATOR_ID = new OperatorID(0, 0); + private static final String DEFAULT_OPERATOR_NAME = "UnregisteredOperator"; + + protected UnregisteredOperatorMetricGroup() { + super(NoOpMetricRegistry.INSTANCE, new UnregisteredTaskMetricGroup(), DEFAULT_OPERATOR_ID, DEFAULT_OPERATOR_NAME); + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java index bd052f5980e1c1ecbfe1e632714bf7a9209857c4..bb253abb561e2c806a1e7ff7c8b67be9883518fb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java @@ -29,12 +29,14 @@ import org.apache.flink.api.common.typeutils.TypeSerializerFactory; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.IOReadableWritable; import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.SimpleCounter; import org.apache.flink.runtime.execution.CancelTaskException; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.io.network.api.reader.MutableReader; import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader; import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup; import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup; import org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException; import org.apache.flink.runtime.operators.sort.UnilateralSortMerger; @@ -107,10 +109,22 @@ public class DataSinkTask extends AbstractInvokable { LOG.debug(getLogString("Starting data sink operator")); RuntimeContext ctx = createRuntimeContext(); - final Counter numRecordsIn = ((OperatorMetricGroup) ctx.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter(); - ((OperatorMetricGroup) ctx.getMetricGroup()).getIOMetricGroup().reuseInputMetricsForTask(); - ((OperatorMetricGroup) ctx.getMetricGroup()).getIOMetricGroup().reuseOutputMetricsForTask(); - + + final Counter numRecordsIn; + { + Counter tmpNumRecordsIn; + try { + OperatorIOMetricGroup ioMetricGroup = ((OperatorMetricGroup) ctx.getMetricGroup()).getIOMetricGroup(); + ioMetricGroup.reuseInputMetricsForTask(); + ioMetricGroup.reuseOutputMetricsForTask(); + tmpNumRecordsIn = ioMetricGroup.getNumRecordsInCounter(); + } catch (Exception e) { + LOG.warn("An exception occurred during the metrics setup.", e); + tmpNumRecordsIn = new SimpleCounter(); + } + numRecordsIn = tmpNumRecordsIn; + } + if(RichOutputFormat.class.isAssignableFrom(this.format.getClass())){ ((RichOutputFormat) this.format).setRuntimeContext(ctx); LOG.debug(getLogString("Rich Sink detected. Initializing runtime context.")); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java index 2600b2c69468af9c782a4381d4d1f4b66794585f..14378770d8b97f492a513f96f9e967c5d3fc627c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java @@ -27,12 +27,14 @@ import org.apache.flink.api.common.typeutils.TypeSerializerFactory; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.InputSplit; import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.SimpleCounter; import org.apache.flink.runtime.execution.CancelTaskException; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.io.network.api.writer.RecordWriter; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException; +import org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup; import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup; import org.apache.flink.runtime.operators.chaining.ChainedDriver; import org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException; @@ -102,12 +104,25 @@ public class DataSourceTask extends AbstractInvokable { LOG.debug(getLogString("Starting data source operator")); RuntimeContext ctx = createRuntimeContext(); - Counter completedSplitsCounter = ctx.getMetricGroup().counter("numSplitsProcessed"); - ((OperatorMetricGroup) ctx.getMetricGroup()).getIOMetricGroup().reuseInputMetricsForTask(); - Counter numRecordsOut = ((OperatorMetricGroup) ctx.getMetricGroup()).getIOMetricGroup().getNumRecordsOutCounter(); - if (this.config.getNumberOfChainedStubs() == 0) { - ((OperatorMetricGroup) ctx.getMetricGroup()).getIOMetricGroup().reuseOutputMetricsForTask(); + + final Counter numRecordsOut; + { + Counter tmpNumRecordsOut; + try { + OperatorIOMetricGroup ioMetricGroup = ((OperatorMetricGroup) ctx.getMetricGroup()).getIOMetricGroup(); + ioMetricGroup.reuseInputMetricsForTask(); + if (this.config.getNumberOfChainedStubs() == 0) { + ioMetricGroup.reuseOutputMetricsForTask(); + } + tmpNumRecordsOut = ioMetricGroup.getNumRecordsOutCounter(); + } catch (Exception e) { + LOG.warn("An exception occurred during the metrics setup.", e); + tmpNumRecordsOut = new SimpleCounter(); + } + numRecordsOut = tmpNumRecordsOut; } + + Counter completedSplitsCounter = ctx.getMetricGroup().counter("numSplitsProcessed"); if (RichInputFormat.class.isAssignableFrom(this.format.getClass())) { ((RichInputFormat) this.format).setRuntimeContext(ctx); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java index cb94d25d3e15499f5b0747741649b3a74b39950b..c9775036e2ae4c989a983a60446a8d1b5d8ed1d1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java @@ -45,7 +45,7 @@ import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; -import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.testtasks.NoOpInvokable; @@ -344,7 +344,7 @@ public class IndividualRestartsConcurrencyTest extends TestLogger { 1, allVertices, checkpointCoordinatorConfiguration, - new UnregisteredTaskMetricsGroup())); + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup())); final CheckpointCoordinator checkpointCoordinator = graph.getCheckpointCoordinator(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java index 64f82f3fd0ddc97474822cdcfc430d9185ac21d7..6f98119ed287f7d39ef6acc777f4c1b954986a41 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java @@ -29,7 +29,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; -import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.taskmanager.TaskActions; import org.junit.Test; @@ -63,11 +63,11 @@ public class InputGateConcurrentTest { new IntermediateDataSetID(), ResultPartitionType.PIPELINED, 0, numChannels, mock(TaskActions.class), - new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); for (int i = 0; i < numChannels; i++) { LocalInputChannel channel = new LocalInputChannel(gate, i, new ResultPartitionID(), - resultPartitionManager, mock(TaskEventDispatcher.class), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); + resultPartitionManager, mock(TaskEventDispatcher.class), UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); gate.setInputChannel(new IntermediateResultPartitionID(), channel); partitions[i] = new PipelinedSubpartition(0, resultPartition); @@ -99,12 +99,12 @@ public class InputGateConcurrentTest { 0, numChannels, mock(TaskActions.class), - new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); for (int i = 0; i < numChannels; i++) { RemoteInputChannel channel = new RemoteInputChannel( gate, i, new ResultPartitionID(), mock(ConnectionID.class), - connManager, 0, 0, new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); + connManager, 0, 0, UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); gate.setInputChannel(new IntermediateResultPartitionID(), channel); sources[i] = new RemoteChannelSource(channel); @@ -148,7 +148,7 @@ public class InputGateConcurrentTest { 0, numChannels, mock(TaskActions.class), - new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); for (int i = 0, local = 0; i < numChannels; i++) { if (localOrRemote.get(i)) { @@ -158,14 +158,14 @@ public class InputGateConcurrentTest { sources[i] = new PipelinedSubpartitionSource(psp); LocalInputChannel channel = new LocalInputChannel(gate, i, new ResultPartitionID(), - resultPartitionManager, mock(TaskEventDispatcher.class), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); + resultPartitionManager, mock(TaskEventDispatcher.class), UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); gate.setInputChannel(new IntermediateResultPartitionID(), channel); } else { //remote channel RemoteInputChannel channel = new RemoteInputChannel( gate, i, new ResultPartitionID(), mock(ConnectionID.class), - connManager, 0, 0, new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); + connManager, 0, 0, UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); gate.setInputChannel(new IntermediateResultPartitionID(), channel); sources[i] = new RemoteChannelSource(channel); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java index f933840073b9db1d60e3b44de7eb615d4d32a539..324a0607718a7d53b0779e99417d9ad98af85b64 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java @@ -33,7 +33,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; -import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.taskmanager.TaskActions; import org.junit.Test; @@ -89,11 +89,11 @@ public class InputGateFairnessTest { new IntermediateDataSetID(), 0, numChannels, mock(TaskActions.class), - new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); for (int i = 0; i < numChannels; i++) { LocalInputChannel channel = new LocalInputChannel(gate, i, new ResultPartitionID(), - resultPartitionManager, mock(TaskEventDispatcher.class), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); + resultPartitionManager, mock(TaskEventDispatcher.class), UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); gate.setInputChannel(new IntermediateResultPartitionID(), channel); } @@ -142,11 +142,11 @@ public class InputGateFairnessTest { new IntermediateDataSetID(), 0, numChannels, mock(TaskActions.class), - new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); for (int i = 0; i < numChannels; i++) { LocalInputChannel channel = new LocalInputChannel(gate, i, new ResultPartitionID(), - resultPartitionManager, mock(TaskEventDispatcher.class), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); + resultPartitionManager, mock(TaskEventDispatcher.class), UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); gate.setInputChannel(new IntermediateResultPartitionID(), channel); } @@ -192,7 +192,7 @@ public class InputGateFairnessTest { new IntermediateDataSetID(), 0, numChannels, mock(TaskActions.class), - new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); final ConnectionManager connManager = createDummyConnectionManager(); @@ -201,7 +201,7 @@ public class InputGateFairnessTest { for (int i = 0; i < numChannels; i++) { RemoteInputChannel channel = new RemoteInputChannel( gate, i, new ResultPartitionID(), mock(ConnectionID.class), - connManager, 0, 0, new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); + connManager, 0, 0, UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); channels[i] = channel; @@ -247,7 +247,7 @@ public class InputGateFairnessTest { new IntermediateDataSetID(), 0, numChannels, mock(TaskActions.class), - new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); final ConnectionManager connManager = createDummyConnectionManager(); @@ -257,7 +257,7 @@ public class InputGateFairnessTest { for (int i = 0; i < numChannels; i++) { RemoteInputChannel channel = new RemoteInputChannel( gate, i, new ResultPartitionID(), mock(ConnectionID.class), - connManager, 0, 0, new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); + connManager, 0, 0, UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); channels[i] = channel; gate.setInputChannel(new IntermediateResultPartitionID(), channel); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java index 5f7fd82366855070780f86cf195875ff5fd95659..16cd90defcf16b583db9223bf8c3ab7d5aa68414 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java @@ -38,7 +38,7 @@ import org.apache.flink.runtime.io.network.util.TestPartitionProducer; import org.apache.flink.runtime.io.network.util.TestProducerSource; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; -import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.taskmanager.TaskActions; import org.apache.flink.shaded.guava18.com.google.common.collect.Lists; @@ -291,7 +291,7 @@ public class LocalInputChannelTest { 0, 1, mock(TaskActions.class), - new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup() + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup() ); ResultPartitionManager partitionManager = mock(ResultPartitionManager.class); @@ -318,7 +318,7 @@ public class LocalInputChannelTest { partitionManager, new TaskEventDispatcher(), 1, 1, - new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); gate.setInputChannel(new IntermediateResultPartitionID(), channel); @@ -370,7 +370,7 @@ public class LocalInputChannelTest { new ResultPartitionID(), partitionManager, new TaskEventDispatcher(), - new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); channel.requestSubpartition(0); @@ -411,7 +411,7 @@ public class LocalInputChannelTest { mock(TaskEventDispatcher.class), initialAndMaxRequestBackoff._1(), initialAndMaxRequestBackoff._2(), - new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); } /** @@ -487,7 +487,7 @@ public class LocalInputChannelTest { subpartitionIndex, numberOfInputChannels, mock(TaskActions.class), - new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); // Set buffer pool inputGate.setBufferPool(bufferPool); @@ -502,7 +502,7 @@ public class LocalInputChannelTest { consumedPartitionIds[i], partitionManager, taskEventDispatcher, - new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup())); + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup())); } this.numberOfInputChannels = numberOfInputChannels; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java index bced9ce3d6eaa834c5cf0b67acf8f371baf9d8a7..d791ced7eb384c8c6263958fb7ced342f15f65cb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java @@ -28,7 +28,7 @@ import org.apache.flink.runtime.io.network.netty.PartitionRequestClient; import org.apache.flink.runtime.io.network.partition.ProducerFailedException; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.util.TestBufferFactory; -import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.shaded.guava18.com.google.common.collect.Lists; @@ -270,7 +270,7 @@ public class RemoteInputChannelTest { partitionId, mock(ConnectionID.class), connectionManager, - new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); ch.onFailedPartitionRequest(); @@ -290,7 +290,7 @@ public class RemoteInputChannelTest { new ResultPartitionID(), mock(ConnectionID.class), connManager, - new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); ch.onError(new ProducerFailedException(new RuntimeException("Expected test exception."))); @@ -401,6 +401,6 @@ public class RemoteInputChannelTest { connectionManager, initialAndMaxRequestBackoff._1(), initialAndMaxRequestBackoff._2(), - new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java index 4d7d8844c305f9989bb09bea16b085dda7d85443..da649cd735055797daafcebee946daa2ad84ac34 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java @@ -43,7 +43,7 @@ import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView; import org.apache.flink.runtime.io.network.util.TestTaskEvent; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; -import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.taskmanager.TaskActions; import org.junit.Test; @@ -80,7 +80,7 @@ public class SingleInputGateTest { new IntermediateDataSetID(), ResultPartitionType.PIPELINED, 0, 2, mock(TaskActions.class), - new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); assertEquals(ResultPartitionType.PIPELINED, inputGate.getConsumedPartitionType()); @@ -140,7 +140,7 @@ public class SingleInputGateTest { resultId, ResultPartitionType.PIPELINED, 0, 2, mock(TaskActions.class), - new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); final BufferPool bufferPool = mock(BufferPool.class); when(bufferPool.getNumberOfRequiredMemorySegments()).thenReturn(2); @@ -149,12 +149,12 @@ public class SingleInputGateTest { // Local ResultPartitionID localPartitionId = new ResultPartitionID(new IntermediateResultPartitionID(), new ExecutionAttemptID()); - InputChannel local = new LocalInputChannel(inputGate, 0, localPartitionId, partitionManager, taskEventDispatcher, new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); + InputChannel local = new LocalInputChannel(inputGate, 0, localPartitionId, partitionManager, taskEventDispatcher, UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); // Unknown ResultPartitionID unknownPartitionId = new ResultPartitionID(new IntermediateResultPartitionID(), new ExecutionAttemptID()); - InputChannel unknown = new UnknownInputChannel(inputGate, 1, unknownPartitionId, partitionManager, taskEventDispatcher, mock(ConnectionManager.class), 0, 0, new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); + InputChannel unknown = new UnknownInputChannel(inputGate, 1, unknownPartitionId, partitionManager, taskEventDispatcher, mock(ConnectionManager.class), 0, 0, UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); // Set channels inputGate.setInputChannel(localPartitionId.getPartitionId(), local); @@ -195,7 +195,7 @@ public class SingleInputGateTest { ResultPartitionType.PIPELINED, 0, 1, - mock(TaskActions.class), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); + mock(TaskActions.class), UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); ResultPartitionManager partitionManager = mock(ResultPartitionManager.class); @@ -206,7 +206,7 @@ public class SingleInputGateTest { partitionManager, new TaskEventDispatcher(), new LocalConnectionManager(), - 0, 0, new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); + 0, 0, UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); inputGate.setInputChannel(unknown.partitionId.getPartitionId(), unknown); @@ -236,7 +236,7 @@ public class SingleInputGateTest { 0, 1, mock(TaskActions.class), - new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); InputChannel unknown = new UnknownInputChannel( inputGate, @@ -246,7 +246,7 @@ public class SingleInputGateTest { new TaskEventDispatcher(), new LocalConnectionManager(), 0, 0, - new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); inputGate.setInputChannel(unknown.partitionId.getPartitionId(), unknown); @@ -339,7 +339,7 @@ public class SingleInputGateTest { gateDesc, netEnv, mock(TaskActions.class), - new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); assertEquals(gateDesc.getConsumedPartitionType(), gate.getConsumedPartitionType()); @@ -388,7 +388,7 @@ public class SingleInputGateTest { 0, 1, mock(TaskActions.class), - new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); RemoteInputChannel remote = mock(RemoteInputChannel.class); inputGate.setInputChannel(new IntermediateResultPartitionID(), remote); @@ -416,7 +416,7 @@ public class SingleInputGateTest { 0, 1, mock(TaskActions.class), - new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); UnknownInputChannel unknown = mock(UnknownInputChannel.class); final ResultPartitionID resultPartitionId = new ResultPartitionID(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java index 18ad49065c9f554d1861d6197b316ec0f6678d3e..0ae6e747920ea6116ea8f023677aaa56c3c2f939 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java @@ -22,7 +22,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; -import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.taskmanager.TaskActions; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -60,7 +60,7 @@ public class TestSingleInputGate { 0, numberOfInputChannels, mock(TaskActions.class), - new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); this.inputGate = spy(realGate); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java index bc1dd07e6296bd2d9fc5743c929ea52942183f6b..9884855bed5588f8856d799897df52fa2f5f02fa 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java @@ -21,7 +21,7 @@ package org.apache.flink.runtime.io.network.partition.consumer; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; -import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.taskmanager.TaskActions; import org.junit.Test; @@ -48,13 +48,13 @@ public class UnionInputGateTest { new IntermediateDataSetID(), ResultPartitionType.PIPELINED, 0, 3, mock(TaskActions.class), - new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); final SingleInputGate ig2 = new SingleInputGate( testTaskName, new JobID(), new IntermediateDataSetID(), ResultPartitionType.PIPELINED, 0, 5, mock(TaskActions.class), - new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); final UnionInputGate union = new UnionInputGate(new SingleInputGate[]{ig1, ig2}); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHAJobGraphRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHAJobGraphRecoveryITCase.java index f5d68020e09fce3f1ae3e03905ded04395a8ab00..0b7547df33a2318ed3c6d78827eaa984a1f0a925 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHAJobGraphRecoveryITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHAJobGraphRecoveryITCase.java @@ -203,7 +203,7 @@ public class JobManagerHAJobGraphRecoveryITCase extends TestLogger { ResourceID.generate(), taskManagerSystem, highAvailabilityServices, - new NoOpMetricRegistry(), + NoOpMetricRegistry.INSTANCE, "localhost", Option.empty(), false, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java index 88141d6ca710e125d65d81d9ec4a182244182307..f86e7e199c7225a392181fe6558cf584d89472db 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java @@ -64,6 +64,7 @@ import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; import org.apache.flink.runtime.messages.JobManagerMessages; import org.apache.flink.runtime.metrics.NoOpMetricRegistry; import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; import org.apache.flink.runtime.taskmanager.TaskManager; @@ -206,7 +207,7 @@ public class JobManagerHARecoveryTest extends TestLogger { mySubmittedJobGraphStore, checkpointStateFactory, jobRecoveryTimeout, - new JobManagerMetricGroup(new NoOpMetricRegistry(), "localhost"), + UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(), Option.empty()); jobManager = system.actorOf(jobManagerProps); @@ -217,7 +218,7 @@ public class JobManagerHARecoveryTest extends TestLogger { ResourceID.generate(), system, testingHighAvailabilityServices, - new NoOpMetricRegistry(), + NoOpMetricRegistry.INSTANCE, "localhost", Option.apply("taskmanager"), true, @@ -381,7 +382,7 @@ public class JobManagerHARecoveryTest extends TestLogger { submittedJobGraphStore, mock(CheckpointRecoveryFactory.class), jobRecoveryTimeout, - new JobManagerMetricGroup(new NoOpMetricRegistry(), "localhost"), + UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(), recoveredJobs).withDispatcher(CallingThreadDispatcher.Id()); jobManager = system.actorOf(jobManagerProps); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java index 6a02d1f0244824938ed4d46c34c7198ef4e841d6..51cc469d560d83a863e024759620f50fa2225428 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java @@ -626,7 +626,7 @@ public class JobManagerTest extends TestLogger { TestingUtils.defaultExecutor(), TestingUtils.defaultExecutor(), highAvailabilityServices, - new NoOpMetricRegistry(), + NoOpMetricRegistry.INSTANCE, Option.empty(), TestingJobManager.class, MemoryArchivist.class)._1(); @@ -648,7 +648,7 @@ public class JobManagerTest extends TestLogger { ResourceID.generate(), system, highAvailabilityServices, - new NoOpMetricRegistry(), + NoOpMetricRegistry.INSTANCE, "localhost", scala.Option.empty(), true, @@ -845,7 +845,7 @@ public class JobManagerTest extends TestLogger { TestingUtils.defaultExecutor(), TestingUtils.defaultExecutor(), highAvailabilityServices, - new NoOpMetricRegistry(), + NoOpMetricRegistry.INSTANCE, Option.empty(), Option.apply("jm"), Option.apply("arch"), @@ -864,7 +864,7 @@ public class JobManagerTest extends TestLogger { ResourceID.generate(), actorSystem, highAvailabilityServices, - new NoOpMetricRegistry(), + NoOpMetricRegistry.INSTANCE, "localhost", Option.apply("tm"), true, @@ -1057,7 +1057,7 @@ public class JobManagerTest extends TestLogger { TestingUtils.defaultExecutor(), TestingUtils.defaultExecutor(), highAvailabilityServices, - new NoOpMetricRegistry(), + NoOpMetricRegistry.INSTANCE, Option.empty(), Option.apply("jm"), Option.apply("arch"), @@ -1076,7 +1076,7 @@ public class JobManagerTest extends TestLogger { ResourceID.generate(), actorSystem, highAvailabilityServices, - new NoOpMetricRegistry(), + NoOpMetricRegistry.INSTANCE, "localhost", Option.apply("tm"), true, @@ -1172,7 +1172,7 @@ public class JobManagerTest extends TestLogger { TestingUtils.defaultExecutor(), TestingUtils.defaultExecutor(), highAvailabilityServices, - new NoOpMetricRegistry(), + NoOpMetricRegistry.INSTANCE, Option.empty(), Option.apply("jm"), Option.apply("arch"), @@ -1191,7 +1191,7 @@ public class JobManagerTest extends TestLogger { ResourceID.generate(), actorSystem, highAvailabilityServices, - new NoOpMetricRegistry(), + NoOpMetricRegistry.INSTANCE, "localhost", Option.apply("tm"), true, @@ -1285,7 +1285,7 @@ public class JobManagerTest extends TestLogger { TestingUtils.defaultExecutor(), TestingUtils.defaultExecutor(), highAvailabilityServices, - new NoOpMetricRegistry(), + NoOpMetricRegistry.INSTANCE, Option.empty(), Option.apply("jm"), Option.apply("arch"), @@ -1307,7 +1307,7 @@ public class JobManagerTest extends TestLogger { ResourceID.generate(), actorSystem, highAvailabilityServices, - new NoOpMetricRegistry(), + NoOpMetricRegistry.INSTANCE, "localhost", Option.apply("tm"), true, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java index cc93f18951934ff52e7a89c9c47a055e89ff2dea..0ca83ae62d1f29b5909a53e6dfcfd5268aa7947c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java @@ -95,7 +95,7 @@ public class JobSubmitTest { TestingUtils.defaultExecutor(), TestingUtils.defaultExecutor(), highAvailabilityServices, - new NoOpMetricRegistry(), + NoOpMetricRegistry.INSTANCE, Option.empty(), JobManager.class, MemoryArchivist.class)._1(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java index 72c03af5e103564a7c36250cfddadf0c616d7765..703cd0bf08554c6dbbd8c9e255ec4ab29e56f559 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java @@ -35,6 +35,7 @@ import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore; import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.metrics.NoOpMetricRegistry; import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.testingUtils.TestingJobManager; import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages; import org.apache.flink.runtime.testingUtils.TestingUtils; @@ -201,7 +202,7 @@ public class JobManagerLeaderElectionTest extends TestLogger { submittedJobGraphStore, checkpointRecoveryFactory, AkkaUtils.getDefaultTimeoutAsFiniteDuration(), - new JobManagerMetricGroup(new NoOpMetricRegistry(), "localhost"), + UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(), Option.empty()); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java index d934ea9936d4b4faa887f693c751799a1f0fb292..eec71658f76dfa4b51f7fef79099621ed87c69d6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java @@ -78,7 +78,7 @@ public class TaskManagerMetricsTest extends TestLogger { TestingUtils.defaultExecutor(), TestingUtils.defaultExecutor(), highAvailabilityServices, - new NoOpMetricRegistry(), + NoOpMetricRegistry.INSTANCE, Option.empty(), JobManager.class, MemoryArchivist.class)._1(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java index 0fced3316f96d714877c01dc8c425b98e3b3c1bb..4dc5edf8a17b66f3b6c300d474b61c42ced31d97 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java @@ -86,7 +86,7 @@ public class MetricGroupTest extends TestLogger { */ @Test public void testUserDefinedVariable() { - MetricRegistry registry = new NoOpMetricRegistry(); + MetricRegistry registry = NoOpMetricRegistry.INSTANCE; GenericMetricGroup root = new GenericMetricGroup(registry, new DummyAbstractMetricGroup(registry), "root"); String key = "key"; @@ -111,7 +111,7 @@ public class MetricGroupTest extends TestLogger { */ @Test public void testUserDefinedVariableOnKeyGroup() { - MetricRegistry registry = new NoOpMetricRegistry(); + MetricRegistry registry = NoOpMetricRegistry.INSTANCE; GenericMetricGroup root = new GenericMetricGroup(registry, new DummyAbstractMetricGroup(registry), "root"); String key1 = "key1"; @@ -142,7 +142,7 @@ public class MetricGroupTest extends TestLogger { */ @Test public void testNameCollisionForKeyAfterGenericGroup() { - MetricRegistry registry = new NoOpMetricRegistry(); + MetricRegistry registry = NoOpMetricRegistry.INSTANCE; GenericMetricGroup root = new GenericMetricGroup(registry, new DummyAbstractMetricGroup(registry), "root"); String key = "key"; @@ -169,7 +169,7 @@ public class MetricGroupTest extends TestLogger { */ @Test public void testNameCollisionForKeyAndValueAfterGenericGroup() { - MetricRegistry registry = new NoOpMetricRegistry(); + MetricRegistry registry = NoOpMetricRegistry.INSTANCE; GenericMetricGroup root = new GenericMetricGroup(registry, new DummyAbstractMetricGroup(registry), "root"); String key = "key"; @@ -195,7 +195,7 @@ public class MetricGroupTest extends TestLogger { */ @Test public void testNameCollisionAfterKeyValueGroup() { - MetricRegistry registry = new NoOpMetricRegistry(); + MetricRegistry registry = NoOpMetricRegistry.INSTANCE; GenericMetricGroup root = new GenericMetricGroup(registry, new DummyAbstractMetricGroup(registry), "root"); String key = "key"; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java index bcf77de632d7ec1c2a023cb1f54eca97f5c4390f..f23b2f5def0fb0b23ad23a9e067e572e2f4fdf8d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java @@ -21,7 +21,6 @@ package org.apache.flink.runtime.metrics.groups; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.SimpleCounter; import org.apache.flink.runtime.executiongraph.IOMetrics; -import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup; import org.junit.Test; @@ -34,7 +33,7 @@ import static org.junit.Assert.assertNotNull; public class TaskIOMetricGroupTest { @Test public void testTaskIOMetricGroup() { - TaskMetricGroup task = new UnregisteredTaskMetricsGroup(); + TaskMetricGroup task = UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(); TaskIOMetricGroup taskIO = task.getIOMetricGroup(); // test counter forwarding diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java index 4fa74b33b5f622b4efbd1b63ed4edfec8ef8ebf5..a4d14c41aad91b5a28a3646c8b1086c18d8a5b6f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java @@ -29,10 +29,10 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.operators.DriverStrategy; import org.apache.flink.runtime.operators.TaskContext; import org.apache.flink.runtime.operators.testutils.DummyInvokable; -import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup; import org.apache.flink.runtime.operators.util.TaskConfig; import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo; @@ -227,6 +227,6 @@ public class TestTaskContext implements TaskContext { @Override public OperatorMetricGroup getMetricGroup() { - return new UnregisteredTaskMetricsGroup.DummyOperatorMetricGroup(); + return UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup(); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java index 96c8b73cec0f23240b8d83e222bb66aaf858841f..a76f110178bfd065af40ecee140ceb5af2387867 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java @@ -26,13 +26,12 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerFactory; import org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory; import org.apache.flink.configuration.Configuration; -import org.apache.flink.metrics.MetricGroup; -import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.operators.Driver; import org.apache.flink.runtime.operators.ResettableDriver; import org.apache.flink.runtime.operators.TaskContext; @@ -373,7 +372,7 @@ public abstract class BinaryOperatorTestBase extend @Override public OperatorMetricGroup getMetricGroup() { - return new UnregisteredTaskMetricsGroup.DummyOperatorMetricGroup(); + return UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup(); } // -------------------------------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java index d2cedb994d97aac28a302b472fa6f4a4f2ab5ff3..3820bf91b04bec8d391fe4cfe2049277c7ff85af 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java @@ -29,6 +29,7 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.operators.Driver; import org.apache.flink.runtime.operators.ResettableDriver; import org.apache.flink.runtime.operators.TaskContext; @@ -368,7 +369,7 @@ public abstract class DriverTestBase extends TestLogger impl @Override public OperatorMetricGroup getMetricGroup() { - return new UnregisteredTaskMetricsGroup.DummyOperatorMetricGroup(); + return UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup(); } // -------------------------------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java index 718ecfe04e5710d29770a79fb50acf1b827bcbb7..148eb0bd960ca2920fc1d7c30eaa757bd57b2f37 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java @@ -37,6 +37,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.query.KvStateRegistry; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; @@ -100,7 +101,7 @@ public class DummyEnvironment implements Environment { @Override public TaskMetricGroup getMetricGroup() { - return new UnregisteredTaskMetricsGroup(); + return UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(); } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java index f655b127d6abfc8176e46ad6779ee3b7bc5e07e7..861cf35423d3b670345eb80976f33bc93c9f68c4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java @@ -45,6 +45,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.query.KvStateRegistry; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; @@ -281,7 +282,7 @@ public class MockEnvironment implements Environment { @Override public TaskMetricGroup getMetricGroup() { - return new UnregisteredTaskMetricsGroup(); + return UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(); } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java index 141aec68823252895fbb4bf51aa2a8638528ce22..2ef82da935979a6903b5782f11a041336a0345fe 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java @@ -31,6 +31,7 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.operators.Driver; import org.apache.flink.runtime.operators.TaskContext; import org.apache.flink.runtime.operators.ResettableDriver; @@ -364,7 +365,7 @@ public abstract class UnaryOperatorTestBase extends @Override public OperatorMetricGroup getMetricGroup() { - return new UnregisteredTaskMetricsGroup.DummyOperatorMetricGroup(); + return UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup(); } // -------------------------------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java deleted file mode 100644 index 7065e6b806f11442759b2ba7ca6674d8574d5980..0000000000000000000000000000000000000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.operators.testutils; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.metrics.Metric; -import org.apache.flink.metrics.MetricGroup; -import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; -import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.jobgraph.OperatorID; -import org.apache.flink.runtime.metrics.MetricRegistry; -import org.apache.flink.runtime.metrics.NoOpMetricRegistry; -import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup; -import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; -import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup; -import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; -import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; - -import java.util.UUID; - -public class UnregisteredTaskMetricsGroup extends TaskMetricGroup { - - private static final MetricRegistry EMPTY_REGISTRY = new NoOpMetricRegistry(); - - - public UnregisteredTaskMetricsGroup() { - super(EMPTY_REGISTRY, new DummyJobMetricGroup(), - new JobVertexID(), new ExecutionAttemptID(), "testtask", 0, 0); - } - - @Override - protected void addMetric(String name, Metric metric) {} - - @Override - public MetricGroup addGroup(String name) { - return new UnregisteredMetricsGroup(); - } - - // ------------------------------------------------------------------------ - - private static class DummyTaskManagerMetricsGroup extends TaskManagerMetricGroup { - - public DummyTaskManagerMetricsGroup() { - super(EMPTY_REGISTRY, "localhost", UUID.randomUUID().toString()); - } - } - - private static class DummyJobMetricGroup extends TaskManagerJobMetricGroup { - - public DummyJobMetricGroup() { - super(EMPTY_REGISTRY, new DummyTaskManagerMetricsGroup(), new JobID(), "testjob"); - } - } - - public static class DummyTaskIOMetricGroup extends TaskIOMetricGroup { - public DummyTaskIOMetricGroup() { - super(new UnregisteredTaskMetricsGroup()); - } - } - - public static class DummyOperatorMetricGroup extends OperatorMetricGroup { - public DummyOperatorMetricGroup() { - super(EMPTY_REGISTRY, new UnregisteredTaskMetricsGroup(), new OperatorID(), "testoperator"); - } - } -} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java index 3050718e3538ca5ccfcc7ac4785cf7778a9d597d..87cb4a96c546bcfe7672474d6a64c0ccc8911084 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java @@ -97,7 +97,7 @@ public class ResourceManagerTest extends TestLogger { highAvailabilityServices, new HeartbeatServices(1000L, 10000L), slotManager, - new NoOpMetricRegistry(), + NoOpMetricRegistry.INSTANCE, jobLeaderIdService, testingFatalErrorHandler); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java index 98b5b8b11169327a401ca402727a3177c65b06ac..a3c41c59ef5b7667b96791d5a13ae3a3f7b9f2f9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java @@ -98,7 +98,7 @@ public class TaskManagerComponentsStartupShutdownTest extends TestLogger { TestingUtils.defaultExecutor(), TestingUtils.defaultExecutor(), highAvailabilityServices, - new NoOpMetricRegistry(), + NoOpMetricRegistry.INSTANCE, Option.empty(), JobManager.class, MemoryArchivist.class)._1(); @@ -168,7 +168,7 @@ public class TaskManagerComponentsStartupShutdownTest extends TestLogger { network, numberOfSlots, highAvailabilityServices, - new TaskManagerMetricGroup(new NoOpMetricRegistry(), connectionInfo.getHostname(), connectionInfo.getResourceID().getResourceIdString())); + new TaskManagerMetricGroup(NoOpMetricRegistry.INSTANCE, connectionInfo.getHostname(), connectionInfo.getResourceID().getResourceIdString())); taskManager = actorSystem.actorOf(tmProps); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java index 7429ec5e80c5de402299350c65ab1a53ebd7d6ee..fadebce485dd938d22ead6a21657febea0ac6737 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java @@ -122,7 +122,7 @@ public abstract class TaskManagerProcessReapingTestBase extends TestLogger { TestingUtils.defaultExecutor(), TestingUtils.defaultExecutor(), highAvailabilityServices, - new NoOpMetricRegistry(), + NoOpMetricRegistry.INSTANCE, Option.empty(), JobManager.class, MemoryArchivist.class)._1; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java index ed06dc0a472fbf3fcfa827fdf84949c3b7d99a38..4c7a8cfd0bd46477094e48e58b0e00371df18095 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java @@ -250,7 +250,7 @@ public class TaskManagerStartupTest extends TestLogger { ResourceID.generate(), null, highAvailabilityServices, - new NoOpMetricRegistry(), + NoOpMetricRegistry.INSTANCE, "localhost", Option.empty(), false, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java index 38238cd09af3686ff40b724828aaac49eab31a95..086ad71b65b12028d0b55f54e5d0e5aa3bb8a167 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java @@ -52,7 +52,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; import org.apache.flink.runtime.memory.MemoryManager; -import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration; import org.apache.flink.runtime.taskmanager.CheckpointResponder; @@ -193,7 +193,7 @@ public class JvmExitOnFatalErrorTest { new String[0]), new FileCache(tmInfo.getTmpDirectories()), tmInfo, - new UnregisteredTaskMetricsGroup(), + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(), new NoOpResultPartitionConsumableNotifier(), new NoOpPartitionProducerStateChecker(), executor); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java index 58022a9c61ae72897723359419fe9186df328228..4062749ae94947e0438573fface8773685851f6a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java @@ -33,12 +33,12 @@ import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.MetricGroup; -import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.checkpoint.CheckpointOptions.CheckpointType; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.DefaultKeyedStateStore; @@ -155,7 +155,7 @@ public abstract class AbstractStreamOperator // --------------- Metrics --------------------------- /** Metric group for the operator. */ - protected transient MetricGroup metrics; + protected transient OperatorMetricGroup metrics; protected transient LatencyGauge latencyGauge; @@ -191,7 +191,7 @@ public abstract class AbstractStreamOperator this.metrics = operatorMetricGroup; } catch (Exception e) { LOG.warn("An error occurred while instantiating task metrics.", e); - this.metrics = new UnregisteredMetricsGroup(); + this.metrics = UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup(); this.output = output; } Configuration taskManagerConfig = container.getEnvironment().getTaskManagerInfo().getConfiguration(); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java index 609f8b89eff468f9e00230829f5c4c987782c404..0c71a53782be522d8df2214330010e0686d4d3d8 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java @@ -25,6 +25,7 @@ import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.SimpleCounter; import org.apache.flink.runtime.event.AbstractEvent; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; @@ -49,6 +50,9 @@ import org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve; import org.apache.flink.streaming.runtime.streamstatus.StreamStatus; import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.IOException; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -70,6 +74,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull; @Internal public class StreamInputProcessor { + private static final Logger LOG = LoggerFactory.getLogger(StreamInputProcessor.class); + private final RecordDeserializer>[] recordDeserializers; private RecordDeserializer> currentRecordDeserializer; @@ -169,7 +175,12 @@ public class StreamInputProcessor { return false; } if (numRecordsIn == null) { - numRecordsIn = ((OperatorMetricGroup) streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter(); + try { + numRecordsIn = ((OperatorMetricGroup) streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter(); + } catch (Exception e) { + LOG.warn("An exception occurred during the metrics setup.", e); + numRecordsIn = new SimpleCounter(); + } } while (true) { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java index 78741471b1bc50cdbb81dfa867c587051519a9a7..824acad48896736b5c8a1aaef5698ce9eab79346 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java @@ -25,6 +25,7 @@ import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.SimpleCounter; import org.apache.flink.runtime.event.AbstractEvent; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; @@ -49,6 +50,9 @@ import org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve; import org.apache.flink.streaming.runtime.streamstatus.StreamStatus; import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.IOException; import java.util.Collection; @@ -72,6 +76,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull; @Internal public class StreamTwoInputProcessor { + private static final Logger LOG = LoggerFactory.getLogger(StreamTwoInputProcessor.class); + private final RecordDeserializer>[] recordDeserializers; private RecordDeserializer> currentRecordDeserializer; @@ -201,7 +207,12 @@ public class StreamTwoInputProcessor { return false; } if (numRecordsIn == null) { - numRecordsIn = ((OperatorMetricGroup) streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter(); + try { + numRecordsIn = ((OperatorMetricGroup) streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter(); + } catch (Exception e) { + LOG.warn("An exception occurred during the metrics setup.", e); + numRecordsIn = new SimpleCounter(); + } } while (true) { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java index a44cffb437e7df4177d12aca8bb97cab06714821..141a62346746d5f778298ef1dc47f13c8d0d278b 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java @@ -21,11 +21,13 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.SimpleCounter; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; +import org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup; import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup; import org.apache.flink.runtime.plugable.SerializationDelegate; import org.apache.flink.streaming.api.collector.selector.CopyingDirectedOutput; @@ -426,7 +428,21 @@ public class OperatorChain> implements Strea StreamStatusProvider streamStatusProvider, OutputTag outputTag) { this.operator = operator; - this.numRecordsIn = ((OperatorMetricGroup) operator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter(); + + { + Counter tmpNumRecordsIn; + try { + OperatorIOMetricGroup ioMetricGroup = ((OperatorMetricGroup) operator.getMetricGroup()).getIOMetricGroup(); + ioMetricGroup.reuseInputMetricsForTask(); + ioMetricGroup.reuseOutputMetricsForTask(); + tmpNumRecordsIn = ioMetricGroup.getNumRecordsInCounter(); + } catch (Exception e) { + LOG.warn("An exception occurred during the metrics setup.", e); + tmpNumRecordsIn = new SimpleCounter(); + } + numRecordsIn = tmpNumRecordsIn; + } + this.streamStatusProvider = streamStatusProvider; this.outputTag = outputTag; } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java index d6f5e6184501a39709e39a4d53d23921e266ccac..2618e5329cc30906e2eac7d2c2197e8166f3213a 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java @@ -33,6 +33,7 @@ import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.ReducingStateDescriptor; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.junit.Test; @@ -101,7 +102,7 @@ public class RichAsyncFunctionTest { }; final String taskName = "foobarTask"; - final MetricGroup metricGroup = mock(MetricGroup.class); + final MetricGroup metricGroup = new UnregisteredMetricsGroup(); final int numberOfParallelSubtasks = 42; final int indexOfSubtask = 43; final int attemptNumber = 1337; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java index 993bffb484c77f9aeed64517b75a7bc35ba38a70..a556b18a5e4039e3bd80163847096835c869ad30 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java @@ -35,8 +35,8 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; -import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup; import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo; import org.apache.flink.streaming.api.datastream.AsyncDataStream; @@ -658,7 +658,7 @@ public class AsyncWaitOperatorTest extends TestLogger { final Configuration taskConfiguration = new Configuration(); final ExecutionConfig executionConfig = new ExecutionConfig(); - final TaskMetricGroup metricGroup = new UnregisteredTaskMetricsGroup(); + final TaskMetricGroup metricGroup = UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(); final TaskManagerRuntimeInfo taskManagerRuntimeInfo = new TestingTaskManagerRuntimeInfo(); final TaskInfo taskInfo = new TaskInfo("foobarTask", 1, 0, 1, 1); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java index eacded6dc5dcb02c4e5a676ad6b9ad12699c5ba9..0af1471a09a20508267c8abc3420a6c6cf37a850 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java @@ -49,7 +49,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; import org.apache.flink.runtime.memory.MemoryManager; -import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.DefaultOperatorStateBackend; import org.apache.flink.runtime.state.FunctionInitializationContext; @@ -272,7 +272,7 @@ public class InterruptSensitiveRestoreTest { new String[0]), new FileCache(new String[] { EnvironmentInformation.getTemporaryFileDirectory() }), new TestingTaskManagerRuntimeInfo(), - new UnregisteredTaskMetricsGroup(), + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(), mock(ResultPartitionConsumableNotifier.class), mock(PartitionProducerStateChecker.class), mock(Executor.class)); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java index 277ca51d7c5fc3637fe6e3180236636ddbf1c83d..ee7337ce08ae8d1302cb75c7689b99a464e1e721 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java @@ -47,8 +47,8 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; -import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup; import org.apache.flink.runtime.plugable.DeserializationDelegate; import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate; import org.apache.flink.runtime.query.KvStateRegistry; @@ -363,6 +363,6 @@ public class StreamMockEnvironment implements Environment { @Override public TaskMetricGroup getMetricGroup() { - return new UnregisteredTaskMetricsGroup(); + return UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(); } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java index 5480ce7e77ec3610138d9fa3fcc6ddfbecfddb97..e3e51aa0d39ee681d3034280156e7da50b18ba73 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java @@ -49,7 +49,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; import org.apache.flink.runtime.memory.MemoryManager; -import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; import org.apache.flink.runtime.state.AbstractStateBackend; @@ -168,7 +168,7 @@ public class StreamTaskTerminationTest extends TestLogger { new String[0]), mock(FileCache.class), taskManagerRuntimeInfo, - new UnregisteredTaskMetricsGroup(), + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(), mock(ResultPartitionConsumableNotifier.class), mock(PartitionProducerStateChecker.class), Executors.directExecutor()); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java index d0ea7147ee68824129b153e500bd61acda930f6e..8ce8b03a61b6c9e71c846ae055f5f401fbd80edc 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java @@ -58,9 +58,9 @@ import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.operators.testutils.MockEnvironment; import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; -import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; import org.apache.flink.runtime.state.AbstractStateBackend; @@ -935,7 +935,7 @@ public class StreamTaskTest extends TestLogger { libCache, mock(FileCache.class), new TestingTaskManagerRuntimeInfo(taskManagerConfig, new String[] {System.getProperty("java.io.tmpdir")}), - new UnregisteredTaskMetricsGroup(), + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(), consumableNotifier, partitionProducerStateChecker, executor); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java index d61b95d47b5f99cb43f4fac004673623f17df2f2..b1127d5be07db2ae32f50092ccf4951b01251e56 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java @@ -52,7 +52,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; import org.apache.flink.runtime.memory.MemoryManager; -import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.CheckpointStreamFactory; @@ -244,7 +244,7 @@ public class TaskCheckpointingBehaviourTest extends TestLogger { new String[0]), new FileCache(new String[] { EnvironmentInformation.getTemporaryFileDirectory() }), new TestingTaskManagerRuntimeInfo(), - new UnregisteredTaskMetricsGroup(), + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(), mock(ResultPartitionConsumableNotifier.class), mock(PartitionProducerStateChecker.class), Executors.directExecutor()); diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java index 94aed2a439f271bc860407f7d595c002fa26f5e5..29516dcab5a901cbd2d731376e2a4d7450d5739a 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java @@ -146,7 +146,7 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test TestingUtils.defaultExecutor(), TestingUtils.defaultExecutor(), highAvailabilityServices, - new NoOpMetricRegistry(), + NoOpMetricRegistry.INSTANCE, Option.empty(), JobManager.class, MemoryArchivist.class)._1(); diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java index 7c53d523a9eb0e95ee7bc7456eed0d598fa158b7..cefadb4ef7a6ab162da4eb9f1567a6dc44745122 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java @@ -218,7 +218,7 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger { ResourceID.generate(), taskManagerSystem, highAvailabilityServices, - new NoOpMetricRegistry(), + NoOpMetricRegistry.INSTANCE, "localhost", Option.empty(), false, diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java index 8e97e9d17cf0e154d85172115e79111aa9ef84fe..357f7afee9da5ece279a1de873e0f7a703f63ae2 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java @@ -282,7 +282,7 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger { ResourceID.generate(), tmActorSystem[i], highAvailabilityServices, - new NoOpMetricRegistry(), + NoOpMetricRegistry.INSTANCE, "localhost", Option.empty(), false, diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java index ecd0beac5727c3687e08c32fd052c69423f65543..13d6804d40e465a128eb152316ee453e58fb469e 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java @@ -121,7 +121,7 @@ public class ProcessFailureCancelingITCase extends TestLogger { TestingUtils.defaultExecutor(), TestingUtils.defaultExecutor(), highAvailabilityServices, - new NoOpMetricRegistry(), + NoOpMetricRegistry.INSTANCE, Option.empty(), JobManager.class, MemoryArchivist.class)._1(); diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java index c86f21fddc282958f7be94b47302f583aac416d7..9710c2080c07780f2a8ac08da5048c21ec6bd7db 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java @@ -122,7 +122,7 @@ public abstract class AbstractOperatorRestoreTestBase extends TestLogger { TestingUtils.defaultExecutor(), TestingUtils.defaultExecutor(), highAvailabilityServices, - new NoOpMetricRegistry(), + NoOpMetricRegistry.INSTANCE, Option.empty(), Option.apply("jm"), Option.apply("arch"), @@ -144,7 +144,7 @@ public abstract class AbstractOperatorRestoreTestBase extends TestLogger { ResourceID.generate(), actorSystem, highAvailabilityServices, - new NoOpMetricRegistry(), + NoOpMetricRegistry.INSTANCE, "localhost", Option.apply("tm"), true, diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java index 031a4cb013f837121c13e81de8310c410df13b00..39ddfa76b2597e43be8754ef140e177bf86fa14e 100644 --- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java @@ -257,7 +257,7 @@ public class YarnResourceManagerTest extends TestLogger { rmLeaderElectionService = new TestingLeaderElectionService(); highAvailabilityServices.setResourceManagerLeaderElectionService(rmLeaderElectionService); heartbeatServices = new TestingHeartbeatServices(5L, 5L, scheduledExecutor); - metricRegistry = new NoOpMetricRegistry(); + metricRegistry = NoOpMetricRegistry.INSTANCE; slotManager = new SlotManager( new ScheduledExecutorServiceAdapter(new DirectScheduledExecutorService()), Time.seconds(10), Time.seconds(10), Time.minutes(1));