提交 0bf0fdc2 编写于 作者: Z zentol

[FLINK-8213][metrics] Improve fallback behaviors

This closes #8213.
上级 493c2857
......@@ -20,7 +20,7 @@ package org.apache.flink.streaming.connectors.kafka.internal;
import org.apache.flink.core.testutils.MultiShotLatch;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.streaming.connectors.kafka.internals.ClosableBlockingQueue;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
......@@ -716,7 +716,7 @@ public class KafkaConsumerThreadTest {
handover,
new Properties(),
unassignedPartitionsQueue,
mock(MetricGroup.class),
new UnregisteredMetricsGroup(),
new KafkaConsumerCallBridge(),
"test-kafka-consumer-thread",
0,
......
......@@ -25,7 +25,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.UnmodifiableConfiguration;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
import org.apache.flink.storm.util.AbstractTest;
import org.apache.flink.storm.util.SplitStreamType;
......@@ -372,7 +372,7 @@ public class BoltWrapperTest extends AbstractTest {
Environment env = mock(Environment.class);
when(env.getTaskInfo()).thenReturn(new TaskInfo("Mock Task", 1, 0, 1, 0));
when(env.getUserClassLoader()).thenReturn(BoltWrapperTest.class.getClassLoader());
when(env.getMetricGroup()).thenReturn(new UnregisteredTaskMetricsGroup());
when(env.getMetricGroup()).thenReturn(UnregisteredMetricGroups.createUnregisteredTaskMetricGroup());
when(env.getTaskManagerInfo()).thenReturn(new TestingTaskManagerRuntimeInfo());
StreamTask<?, ?> mockTask = mock(StreamTask.class);
......
......@@ -194,7 +194,7 @@ public class WebRuntimeMonitorITCase extends TestLogger {
TestingUtils.defaultExecutor(),
TestingUtils.defaultExecutor(),
highAvailabilityServices,
new NoOpMetricRegistry(),
NoOpMetricRegistry.INSTANCE,
Option.apply(webMonitor[i].getRestAddress()),
JobManager.class,
MemoryArchivist.class)._1();
......
......@@ -27,7 +27,6 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.queryablestate.KvStateID;
import org.apache.flink.runtime.StoppingException;
import org.apache.flink.runtime.blob.BlobServer;
......@@ -78,6 +77,7 @@ import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.query.KvStateLocation;
import org.apache.flink.runtime.query.KvStateLocationRegistry;
import org.apache.flink.runtime.query.UnknownKvStateLocation;
......@@ -264,8 +264,8 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
this.jobManagerMetricGroup = jobManagerMetricGroup;
this.jobMetricGroup = jobManagerMetricGroup.addJob(jobGraph);
} else {
this.jobManagerMetricGroup = new UnregisteredMetricsGroup();
this.jobMetricGroup = new UnregisteredMetricsGroup();
this.jobManagerMetricGroup = UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup();
this.jobMetricGroup = UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup();
}
log.info("Initializing job {} ({}).", jobName, jid);
......
......@@ -26,13 +26,16 @@ import org.apache.flink.runtime.metrics.scope.ScopeFormats;
import javax.annotation.Nullable;
/**
* Metric registry which does nothing and is intended for testing purposes.
* Metric registry which does nothing.
*/
public class NoOpMetricRegistry implements MetricRegistry {
private static final char delimiter = '.';
private static final ScopeFormats scopeFormats = ScopeFormats.fromConfig(new Configuration());
final char delimiter = ',';
public static final MetricRegistry INSTANCE = new NoOpMetricRegistry();
final ScopeFormats scopeFormats = ScopeFormats.fromConfig(new Configuration());
private NoOpMetricRegistry() {
}
@Override
public char getDelimiter() {
......@@ -50,10 +53,12 @@ public class NoOpMetricRegistry implements MetricRegistry {
}
@Override
public void register(Metric metric, String metricName, AbstractMetricGroup group) {}
public void register(Metric metric, String metricName, AbstractMetricGroup group) {
}
@Override
public void unregister(Metric metric, String metricName, AbstractMetricGroup group) {}
public void unregister(Metric metric, String metricName, AbstractMetricGroup group) {
}
@Override
public ScopeFormats getScopeFormats() {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.metrics.groups;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
/**
* A collection of safe drop-in replacements for existing {@link ComponentMetricGroup}s.
*/
public class UnregisteredMetricGroups {
private UnregisteredMetricGroups() {
}
public static JobManagerMetricGroup createUnregisteredJobManagerMetricGroup() {
return new UnregisteredJobManagerMetricGroup();
}
public static JobManagerJobMetricGroup createUnregisteredJobManagerJobMetricGroup() {
return new UnregisteredJobManagerJobMetricGroup();
}
public static TaskManagerMetricGroup createUnregisteredTaskManagerMetricGroup() {
return new UnregisteredTaskManagerMetricGroup();
}
public static TaskManagerJobMetricGroup createUnregisteredTaskManagerJobMetricGroup() {
return new UnregisteredTaskManagerJobMetricGroup();
}
public static TaskMetricGroup createUnregisteredTaskMetricGroup() {
return new UnregisteredTaskMetricGroup();
}
public static OperatorMetricGroup createUnregisteredOperatorMetricGroup() {
return new UnregisteredOperatorMetricGroup();
}
/**
* A safe drop-in replacement for {@link JobManagerMetricGroup}s.
*/
public static class UnregisteredJobManagerMetricGroup extends JobManagerMetricGroup {
private static final String DEFAULT_HOST_NAME = "UnregisteredHost";
private UnregisteredJobManagerMetricGroup() {
super(NoOpMetricRegistry.INSTANCE, DEFAULT_HOST_NAME);
}
@Override
public JobManagerJobMetricGroup addJob(JobGraph job) {
return createUnregisteredJobManagerJobMetricGroup();
}
}
/**
* A safe drop-in replacement for {@link JobManagerJobMetricGroup}s.
*/
public static class UnregisteredJobManagerJobMetricGroup extends JobManagerJobMetricGroup {
private static final JobID DEFAULT_JOB_ID = new JobID(0, 0);
private static final String DEFAULT_JOB_NAME = "UnregisteredJob";
protected UnregisteredJobManagerJobMetricGroup() {
super(NoOpMetricRegistry.INSTANCE, new UnregisteredJobManagerMetricGroup(), DEFAULT_JOB_ID, DEFAULT_JOB_NAME);
}
}
/**
* A safe drop-in replacement for {@link TaskManagerMetricGroup}s.
*/
public static class UnregisteredTaskManagerMetricGroup extends TaskManagerMetricGroup {
private static final String DEFAULT_HOST_NAME = "UnregisteredHost";
private static final String DEFAULT_TASKMANAGER_ID = "0";
protected UnregisteredTaskManagerMetricGroup() {
super(NoOpMetricRegistry.INSTANCE, DEFAULT_HOST_NAME, DEFAULT_TASKMANAGER_ID);
}
@Override
public TaskMetricGroup addTaskForJob(
final JobID jobId,
final String jobName,
final JobVertexID jobVertexId,
final ExecutionAttemptID executionAttemptId,
final String taskName,
final int subtaskIndex,
final int attemptNumber) {
return createUnregisteredTaskMetricGroup();
}
}
/**
* A safe drop-in replacement for {@link TaskManagerJobMetricGroup}s.
*/
public static class UnregisteredTaskManagerJobMetricGroup extends TaskManagerJobMetricGroup {
private static final JobID DEFAULT_JOB_ID = new JobID(0, 0);
private static final String DEFAULT_JOB_NAME = "UnregisteredJob";
protected UnregisteredTaskManagerJobMetricGroup() {
super(NoOpMetricRegistry.INSTANCE, new UnregisteredTaskManagerMetricGroup(), DEFAULT_JOB_ID, DEFAULT_JOB_NAME);
}
@Override
public TaskMetricGroup addTask(
final JobVertexID jobVertexId,
final ExecutionAttemptID executionAttemptID,
final String taskName,
final int subtaskIndex,
final int attemptNumber) {
return createUnregisteredTaskMetricGroup();
}
}
/**
* A safe drop-in replacement for {@link TaskMetricGroup}s.
*/
public static class UnregisteredTaskMetricGroup extends TaskMetricGroup {
private static final JobVertexID DEFAULT_VERTEX_ID = new JobVertexID(0, 0);
private static final ExecutionAttemptID DEFAULT_ATTEMPT_ID = new ExecutionAttemptID(0, 0);
private static final String DEFAULT_TASK_NAME = "UnregisteredTask";
protected UnregisteredTaskMetricGroup() {
super(NoOpMetricRegistry.INSTANCE, new UnregisteredTaskManagerJobMetricGroup(),
DEFAULT_VERTEX_ID, DEFAULT_ATTEMPT_ID, DEFAULT_TASK_NAME, 0, 0);
}
@Override
public OperatorMetricGroup addOperator(OperatorID operatorID, String name) {
return createUnregisteredOperatorMetricGroup();
}
}
/**
* A safe drop-in replacement for {@link OperatorMetricGroup}s.
*/
public static class UnregisteredOperatorMetricGroup extends OperatorMetricGroup {
private static final OperatorID DEFAULT_OPERATOR_ID = new OperatorID(0, 0);
private static final String DEFAULT_OPERATOR_NAME = "UnregisteredOperator";
protected UnregisteredOperatorMetricGroup() {
super(NoOpMetricRegistry.INSTANCE, new UnregisteredTaskMetricGroup(), DEFAULT_OPERATOR_ID, DEFAULT_OPERATOR_NAME);
}
}
}
......@@ -29,12 +29,14 @@ import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.reader.MutableReader;
import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader;
import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup;
import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException;
import org.apache.flink.runtime.operators.sort.UnilateralSortMerger;
......@@ -107,10 +109,22 @@ public class DataSinkTask<IT> extends AbstractInvokable {
LOG.debug(getLogString("Starting data sink operator"));
RuntimeContext ctx = createRuntimeContext();
final Counter numRecordsIn = ((OperatorMetricGroup) ctx.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
((OperatorMetricGroup) ctx.getMetricGroup()).getIOMetricGroup().reuseInputMetricsForTask();
((OperatorMetricGroup) ctx.getMetricGroup()).getIOMetricGroup().reuseOutputMetricsForTask();
final Counter numRecordsIn;
{
Counter tmpNumRecordsIn;
try {
OperatorIOMetricGroup ioMetricGroup = ((OperatorMetricGroup) ctx.getMetricGroup()).getIOMetricGroup();
ioMetricGroup.reuseInputMetricsForTask();
ioMetricGroup.reuseOutputMetricsForTask();
tmpNumRecordsIn = ioMetricGroup.getNumRecordsInCounter();
} catch (Exception e) {
LOG.warn("An exception occurred during the metrics setup.", e);
tmpNumRecordsIn = new SimpleCounter();
}
numRecordsIn = tmpNumRecordsIn;
}
if(RichOutputFormat.class.isAssignableFrom(this.format.getClass())){
((RichOutputFormat) this.format).setRuntimeContext(ctx);
LOG.debug(getLogString("Rich Sink detected. Initializing runtime context."));
......
......@@ -27,12 +27,14 @@ import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException;
import org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup;
import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.operators.chaining.ChainedDriver;
import org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException;
......@@ -102,12 +104,25 @@ public class DataSourceTask<OT> extends AbstractInvokable {
LOG.debug(getLogString("Starting data source operator"));
RuntimeContext ctx = createRuntimeContext();
Counter completedSplitsCounter = ctx.getMetricGroup().counter("numSplitsProcessed");
((OperatorMetricGroup) ctx.getMetricGroup()).getIOMetricGroup().reuseInputMetricsForTask();
Counter numRecordsOut = ((OperatorMetricGroup) ctx.getMetricGroup()).getIOMetricGroup().getNumRecordsOutCounter();
if (this.config.getNumberOfChainedStubs() == 0) {
((OperatorMetricGroup) ctx.getMetricGroup()).getIOMetricGroup().reuseOutputMetricsForTask();
final Counter numRecordsOut;
{
Counter tmpNumRecordsOut;
try {
OperatorIOMetricGroup ioMetricGroup = ((OperatorMetricGroup) ctx.getMetricGroup()).getIOMetricGroup();
ioMetricGroup.reuseInputMetricsForTask();
if (this.config.getNumberOfChainedStubs() == 0) {
ioMetricGroup.reuseOutputMetricsForTask();
}
tmpNumRecordsOut = ioMetricGroup.getNumRecordsOutCounter();
} catch (Exception e) {
LOG.warn("An exception occurred during the metrics setup.", e);
tmpNumRecordsOut = new SimpleCounter();
}
numRecordsOut = tmpNumRecordsOut;
}
Counter completedSplitsCounter = ctx.getMetricGroup().counter("numSplitsProcessed");
if (RichInputFormat.class.isAssignableFrom(this.format.getClass())) {
((RichInputFormat) this.format).setRuntimeContext(ctx);
......
......@@ -45,7 +45,7 @@ import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
......@@ -344,7 +344,7 @@ public class IndividualRestartsConcurrencyTest extends TestLogger {
1,
allVertices,
checkpointCoordinatorConfiguration,
new UnregisteredTaskMetricsGroup()));
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup()));
final CheckpointCoordinator checkpointCoordinator = graph.getCheckpointCoordinator();
......
......@@ -29,7 +29,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.taskmanager.TaskActions;
import org.junit.Test;
......@@ -63,11 +63,11 @@ public class InputGateConcurrentTest {
new IntermediateDataSetID(), ResultPartitionType.PIPELINED,
0, numChannels,
mock(TaskActions.class),
new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
for (int i = 0; i < numChannels; i++) {
LocalInputChannel channel = new LocalInputChannel(gate, i, new ResultPartitionID(),
resultPartitionManager, mock(TaskEventDispatcher.class), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
resultPartitionManager, mock(TaskEventDispatcher.class), UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
gate.setInputChannel(new IntermediateResultPartitionID(), channel);
partitions[i] = new PipelinedSubpartition(0, resultPartition);
......@@ -99,12 +99,12 @@ public class InputGateConcurrentTest {
0,
numChannels,
mock(TaskActions.class),
new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
for (int i = 0; i < numChannels; i++) {
RemoteInputChannel channel = new RemoteInputChannel(
gate, i, new ResultPartitionID(), mock(ConnectionID.class),
connManager, 0, 0, new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
connManager, 0, 0, UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
gate.setInputChannel(new IntermediateResultPartitionID(), channel);
sources[i] = new RemoteChannelSource(channel);
......@@ -148,7 +148,7 @@ public class InputGateConcurrentTest {
0,
numChannels,
mock(TaskActions.class),
new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
for (int i = 0, local = 0; i < numChannels; i++) {
if (localOrRemote.get(i)) {
......@@ -158,14 +158,14 @@ public class InputGateConcurrentTest {
sources[i] = new PipelinedSubpartitionSource(psp);
LocalInputChannel channel = new LocalInputChannel(gate, i, new ResultPartitionID(),
resultPartitionManager, mock(TaskEventDispatcher.class), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
resultPartitionManager, mock(TaskEventDispatcher.class), UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
gate.setInputChannel(new IntermediateResultPartitionID(), channel);
}
else {
//remote channel
RemoteInputChannel channel = new RemoteInputChannel(
gate, i, new ResultPartitionID(), mock(ConnectionID.class),
connManager, 0, 0, new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
connManager, 0, 0, UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
gate.setInputChannel(new IntermediateResultPartitionID(), channel);
sources[i] = new RemoteChannelSource(channel);
......
......@@ -33,7 +33,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.taskmanager.TaskActions;
import org.junit.Test;
......@@ -89,11 +89,11 @@ public class InputGateFairnessTest {
new IntermediateDataSetID(),
0, numChannels,
mock(TaskActions.class),
new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
for (int i = 0; i < numChannels; i++) {
LocalInputChannel channel = new LocalInputChannel(gate, i, new ResultPartitionID(),
resultPartitionManager, mock(TaskEventDispatcher.class), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
resultPartitionManager, mock(TaskEventDispatcher.class), UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
gate.setInputChannel(new IntermediateResultPartitionID(), channel);
}
......@@ -142,11 +142,11 @@ public class InputGateFairnessTest {
new IntermediateDataSetID(),
0, numChannels,
mock(TaskActions.class),
new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
for (int i = 0; i < numChannels; i++) {
LocalInputChannel channel = new LocalInputChannel(gate, i, new ResultPartitionID(),
resultPartitionManager, mock(TaskEventDispatcher.class), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
resultPartitionManager, mock(TaskEventDispatcher.class), UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
gate.setInputChannel(new IntermediateResultPartitionID(), channel);
}
......@@ -192,7 +192,7 @@ public class InputGateFairnessTest {
new IntermediateDataSetID(),
0, numChannels,
mock(TaskActions.class),
new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
final ConnectionManager connManager = createDummyConnectionManager();
......@@ -201,7 +201,7 @@ public class InputGateFairnessTest {
for (int i = 0; i < numChannels; i++) {
RemoteInputChannel channel = new RemoteInputChannel(
gate, i, new ResultPartitionID(), mock(ConnectionID.class),
connManager, 0, 0, new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
connManager, 0, 0, UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
channels[i] = channel;
......@@ -247,7 +247,7 @@ public class InputGateFairnessTest {
new IntermediateDataSetID(),
0, numChannels,
mock(TaskActions.class),
new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
final ConnectionManager connManager = createDummyConnectionManager();
......@@ -257,7 +257,7 @@ public class InputGateFairnessTest {
for (int i = 0; i < numChannels; i++) {
RemoteInputChannel channel = new RemoteInputChannel(
gate, i, new ResultPartitionID(), mock(ConnectionID.class),
connManager, 0, 0, new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
connManager, 0, 0, UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
channels[i] = channel;
gate.setInputChannel(new IntermediateResultPartitionID(), channel);
......
......@@ -38,7 +38,7 @@ import org.apache.flink.runtime.io.network.util.TestPartitionProducer;
import org.apache.flink.runtime.io.network.util.TestProducerSource;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.taskmanager.TaskActions;
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
......@@ -291,7 +291,7 @@ public class LocalInputChannelTest {
0,
1,
mock(TaskActions.class),
new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()
);
ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
......@@ -318,7 +318,7 @@ public class LocalInputChannelTest {
partitionManager,
new TaskEventDispatcher(),
1, 1,
new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
gate.setInputChannel(new IntermediateResultPartitionID(), channel);
......@@ -370,7 +370,7 @@ public class LocalInputChannelTest {
new ResultPartitionID(),
partitionManager,
new TaskEventDispatcher(),
new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
channel.requestSubpartition(0);
......@@ -411,7 +411,7 @@ public class LocalInputChannelTest {
mock(TaskEventDispatcher.class),
initialAndMaxRequestBackoff._1(),
initialAndMaxRequestBackoff._2(),
new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
}
/**
......@@ -487,7 +487,7 @@ public class LocalInputChannelTest {
subpartitionIndex,
numberOfInputChannels,
mock(TaskActions.class),
new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
// Set buffer pool
inputGate.setBufferPool(bufferPool);
......@@ -502,7 +502,7 @@ public class LocalInputChannelTest {
consumedPartitionIds[i],
partitionManager,
taskEventDispatcher,
new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()));
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()));
}
this.numberOfInputChannels = numberOfInputChannels;
......
......@@ -28,7 +28,7 @@ import org.apache.flink.runtime.io.network.netty.PartitionRequestClient;
import org.apache.flink.runtime.io.network.partition.ProducerFailedException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.util.TestBufferFactory;
import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
......@@ -270,7 +270,7 @@ public class RemoteInputChannelTest {
partitionId,
mock(ConnectionID.class),
connectionManager,
new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
ch.onFailedPartitionRequest();
......@@ -290,7 +290,7 @@ public class RemoteInputChannelTest {
new ResultPartitionID(),
mock(ConnectionID.class),
connManager,
new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
ch.onError(new ProducerFailedException(new RuntimeException("Expected test exception.")));
......@@ -401,6 +401,6 @@ public class RemoteInputChannelTest {
connectionManager,
initialAndMaxRequestBackoff._1(),
initialAndMaxRequestBackoff._2(),
new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
}
}
......@@ -43,7 +43,7 @@ import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.util.TestTaskEvent;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.taskmanager.TaskActions;
import org.junit.Test;
......@@ -80,7 +80,7 @@ public class SingleInputGateTest {
new IntermediateDataSetID(), ResultPartitionType.PIPELINED,
0, 2,
mock(TaskActions.class),
new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
assertEquals(ResultPartitionType.PIPELINED, inputGate.getConsumedPartitionType());
......@@ -140,7 +140,7 @@ public class SingleInputGateTest {
resultId, ResultPartitionType.PIPELINED,
0, 2,
mock(TaskActions.class),
new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
final BufferPool bufferPool = mock(BufferPool.class);
when(bufferPool.getNumberOfRequiredMemorySegments()).thenReturn(2);
......@@ -149,12 +149,12 @@ public class SingleInputGateTest {
// Local
ResultPartitionID localPartitionId = new ResultPartitionID(new IntermediateResultPartitionID(), new ExecutionAttemptID());
InputChannel local = new LocalInputChannel(inputGate, 0, localPartitionId, partitionManager, taskEventDispatcher, new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
InputChannel local = new LocalInputChannel(inputGate, 0, localPartitionId, partitionManager, taskEventDispatcher, UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
// Unknown
ResultPartitionID unknownPartitionId = new ResultPartitionID(new IntermediateResultPartitionID(), new ExecutionAttemptID());
InputChannel unknown = new UnknownInputChannel(inputGate, 1, unknownPartitionId, partitionManager, taskEventDispatcher, mock(ConnectionManager.class), 0, 0, new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
InputChannel unknown = new UnknownInputChannel(inputGate, 1, unknownPartitionId, partitionManager, taskEventDispatcher, mock(ConnectionManager.class), 0, 0, UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
// Set channels
inputGate.setInputChannel(localPartitionId.getPartitionId(), local);
......@@ -195,7 +195,7 @@ public class SingleInputGateTest {
ResultPartitionType.PIPELINED,
0,
1,
mock(TaskActions.class), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
mock(TaskActions.class), UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
......@@ -206,7 +206,7 @@ public class SingleInputGateTest {
partitionManager,
new TaskEventDispatcher(),
new LocalConnectionManager(),
0, 0, new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
0, 0, UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
inputGate.setInputChannel(unknown.partitionId.getPartitionId(), unknown);
......@@ -236,7 +236,7 @@ public class SingleInputGateTest {
0,
1,
mock(TaskActions.class),
new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
InputChannel unknown = new UnknownInputChannel(
inputGate,
......@@ -246,7 +246,7 @@ public class SingleInputGateTest {
new TaskEventDispatcher(),
new LocalConnectionManager(),
0, 0,
new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
inputGate.setInputChannel(unknown.partitionId.getPartitionId(), unknown);
......@@ -339,7 +339,7 @@ public class SingleInputGateTest {
gateDesc,
netEnv,
mock(TaskActions.class),
new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
assertEquals(gateDesc.getConsumedPartitionType(), gate.getConsumedPartitionType());
......@@ -388,7 +388,7 @@ public class SingleInputGateTest {
0,
1,
mock(TaskActions.class),
new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
RemoteInputChannel remote = mock(RemoteInputChannel.class);
inputGate.setInputChannel(new IntermediateResultPartitionID(), remote);
......@@ -416,7 +416,7 @@ public class SingleInputGateTest {
0,
1,
mock(TaskActions.class),
new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
UnknownInputChannel unknown = mock(UnknownInputChannel.class);
final ResultPartitionID resultPartitionId = new ResultPartitionID();
......
......@@ -22,7 +22,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.taskmanager.TaskActions;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
......@@ -60,7 +60,7 @@ public class TestSingleInputGate {
0,
numberOfInputChannels,
mock(TaskActions.class),
new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
this.inputGate = spy(realGate);
......
......@@ -21,7 +21,7 @@ package org.apache.flink.runtime.io.network.partition.consumer;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.taskmanager.TaskActions;
import org.junit.Test;
......@@ -48,13 +48,13 @@ public class UnionInputGateTest {
new IntermediateDataSetID(), ResultPartitionType.PIPELINED,
0, 3,
mock(TaskActions.class),
new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
final SingleInputGate ig2 = new SingleInputGate(
testTaskName, new JobID(),
new IntermediateDataSetID(), ResultPartitionType.PIPELINED,
0, 5,
mock(TaskActions.class),
new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
final UnionInputGate union = new UnionInputGate(new SingleInputGate[]{ig1, ig2});
......
......@@ -203,7 +203,7 @@ public class JobManagerHAJobGraphRecoveryITCase extends TestLogger {
ResourceID.generate(),
taskManagerSystem,
highAvailabilityServices,
new NoOpMetricRegistry(),
NoOpMetricRegistry.INSTANCE,
"localhost",
Option.<String>empty(),
false,
......
......@@ -64,6 +64,7 @@ import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.apache.flink.runtime.taskmanager.TaskManager;
......@@ -206,7 +207,7 @@ public class JobManagerHARecoveryTest extends TestLogger {
mySubmittedJobGraphStore,
checkpointStateFactory,
jobRecoveryTimeout,
new JobManagerMetricGroup(new NoOpMetricRegistry(), "localhost"),
UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
Option.<String>empty());
jobManager = system.actorOf(jobManagerProps);
......@@ -217,7 +218,7 @@ public class JobManagerHARecoveryTest extends TestLogger {
ResourceID.generate(),
system,
testingHighAvailabilityServices,
new NoOpMetricRegistry(),
NoOpMetricRegistry.INSTANCE,
"localhost",
Option.apply("taskmanager"),
true,
......@@ -381,7 +382,7 @@ public class JobManagerHARecoveryTest extends TestLogger {
submittedJobGraphStore,
mock(CheckpointRecoveryFactory.class),
jobRecoveryTimeout,
new JobManagerMetricGroup(new NoOpMetricRegistry(), "localhost"),
UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
recoveredJobs).withDispatcher(CallingThreadDispatcher.Id());
jobManager = system.actorOf(jobManagerProps);
......
......@@ -626,7 +626,7 @@ public class JobManagerTest extends TestLogger {
TestingUtils.defaultExecutor(),
TestingUtils.defaultExecutor(),
highAvailabilityServices,
new NoOpMetricRegistry(),
NoOpMetricRegistry.INSTANCE,
Option.empty(),
TestingJobManager.class,
MemoryArchivist.class)._1();
......@@ -648,7 +648,7 @@ public class JobManagerTest extends TestLogger {
ResourceID.generate(),
system,
highAvailabilityServices,
new NoOpMetricRegistry(),
NoOpMetricRegistry.INSTANCE,
"localhost",
scala.Option.<String>empty(),
true,
......@@ -845,7 +845,7 @@ public class JobManagerTest extends TestLogger {
TestingUtils.defaultExecutor(),
TestingUtils.defaultExecutor(),
highAvailabilityServices,
new NoOpMetricRegistry(),
NoOpMetricRegistry.INSTANCE,
Option.empty(),
Option.apply("jm"),
Option.apply("arch"),
......@@ -864,7 +864,7 @@ public class JobManagerTest extends TestLogger {
ResourceID.generate(),
actorSystem,
highAvailabilityServices,
new NoOpMetricRegistry(),
NoOpMetricRegistry.INSTANCE,
"localhost",
Option.apply("tm"),
true,
......@@ -1057,7 +1057,7 @@ public class JobManagerTest extends TestLogger {
TestingUtils.defaultExecutor(),
TestingUtils.defaultExecutor(),
highAvailabilityServices,
new NoOpMetricRegistry(),
NoOpMetricRegistry.INSTANCE,
Option.empty(),
Option.apply("jm"),
Option.apply("arch"),
......@@ -1076,7 +1076,7 @@ public class JobManagerTest extends TestLogger {
ResourceID.generate(),
actorSystem,
highAvailabilityServices,
new NoOpMetricRegistry(),
NoOpMetricRegistry.INSTANCE,
"localhost",
Option.apply("tm"),
true,
......@@ -1172,7 +1172,7 @@ public class JobManagerTest extends TestLogger {
TestingUtils.defaultExecutor(),
TestingUtils.defaultExecutor(),
highAvailabilityServices,
new NoOpMetricRegistry(),
NoOpMetricRegistry.INSTANCE,
Option.empty(),
Option.apply("jm"),
Option.apply("arch"),
......@@ -1191,7 +1191,7 @@ public class JobManagerTest extends TestLogger {
ResourceID.generate(),
actorSystem,
highAvailabilityServices,
new NoOpMetricRegistry(),
NoOpMetricRegistry.INSTANCE,
"localhost",
Option.apply("tm"),
true,
......@@ -1285,7 +1285,7 @@ public class JobManagerTest extends TestLogger {
TestingUtils.defaultExecutor(),
TestingUtils.defaultExecutor(),
highAvailabilityServices,
new NoOpMetricRegistry(),
NoOpMetricRegistry.INSTANCE,
Option.empty(),
Option.apply("jm"),
Option.apply("arch"),
......@@ -1307,7 +1307,7 @@ public class JobManagerTest extends TestLogger {
ResourceID.generate(),
actorSystem,
highAvailabilityServices,
new NoOpMetricRegistry(),
NoOpMetricRegistry.INSTANCE,
"localhost",
Option.apply("tm"),
true,
......
......@@ -95,7 +95,7 @@ public class JobSubmitTest {
TestingUtils.defaultExecutor(),
TestingUtils.defaultExecutor(),
highAvailabilityServices,
new NoOpMetricRegistry(),
NoOpMetricRegistry.INSTANCE,
Option.empty(),
JobManager.class,
MemoryArchivist.class)._1();
......
......@@ -35,6 +35,7 @@ import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.testingUtils.TestingJobManager;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingUtils;
......@@ -201,7 +202,7 @@ public class JobManagerLeaderElectionTest extends TestLogger {
submittedJobGraphStore,
checkpointRecoveryFactory,
AkkaUtils.getDefaultTimeoutAsFiniteDuration(),
new JobManagerMetricGroup(new NoOpMetricRegistry(), "localhost"),
UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
Option.<String>empty());
}
}
......@@ -78,7 +78,7 @@ public class TaskManagerMetricsTest extends TestLogger {
TestingUtils.defaultExecutor(),
TestingUtils.defaultExecutor(),
highAvailabilityServices,
new NoOpMetricRegistry(),
NoOpMetricRegistry.INSTANCE,
Option.empty(),
JobManager.class,
MemoryArchivist.class)._1();
......
......@@ -86,7 +86,7 @@ public class MetricGroupTest extends TestLogger {
*/
@Test
public void testUserDefinedVariable() {
MetricRegistry registry = new NoOpMetricRegistry();
MetricRegistry registry = NoOpMetricRegistry.INSTANCE;
GenericMetricGroup root = new GenericMetricGroup(registry, new DummyAbstractMetricGroup(registry), "root");
String key = "key";
......@@ -111,7 +111,7 @@ public class MetricGroupTest extends TestLogger {
*/
@Test
public void testUserDefinedVariableOnKeyGroup() {
MetricRegistry registry = new NoOpMetricRegistry();
MetricRegistry registry = NoOpMetricRegistry.INSTANCE;
GenericMetricGroup root = new GenericMetricGroup(registry, new DummyAbstractMetricGroup(registry), "root");
String key1 = "key1";
......@@ -142,7 +142,7 @@ public class MetricGroupTest extends TestLogger {
*/
@Test
public void testNameCollisionForKeyAfterGenericGroup() {
MetricRegistry registry = new NoOpMetricRegistry();
MetricRegistry registry = NoOpMetricRegistry.INSTANCE;
GenericMetricGroup root = new GenericMetricGroup(registry, new DummyAbstractMetricGroup(registry), "root");
String key = "key";
......@@ -169,7 +169,7 @@ public class MetricGroupTest extends TestLogger {
*/
@Test
public void testNameCollisionForKeyAndValueAfterGenericGroup() {
MetricRegistry registry = new NoOpMetricRegistry();
MetricRegistry registry = NoOpMetricRegistry.INSTANCE;
GenericMetricGroup root = new GenericMetricGroup(registry, new DummyAbstractMetricGroup(registry), "root");
String key = "key";
......@@ -195,7 +195,7 @@ public class MetricGroupTest extends TestLogger {
*/
@Test
public void testNameCollisionAfterKeyValueGroup() {
MetricRegistry registry = new NoOpMetricRegistry();
MetricRegistry registry = NoOpMetricRegistry.INSTANCE;
GenericMetricGroup root = new GenericMetricGroup(registry, new DummyAbstractMetricGroup(registry), "root");
String key = "key";
......
......@@ -21,7 +21,6 @@ package org.apache.flink.runtime.metrics.groups;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.executiongraph.IOMetrics;
import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
import org.junit.Test;
......@@ -34,7 +33,7 @@ import static org.junit.Assert.assertNotNull;
public class TaskIOMetricGroupTest {
@Test
public void testTaskIOMetricGroup() {
TaskMetricGroup task = new UnregisteredTaskMetricsGroup();
TaskMetricGroup task = UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
TaskIOMetricGroup taskIO = task.getIOMetricGroup();
// test counter forwarding
......
......@@ -29,10 +29,10 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.operators.DriverStrategy;
import org.apache.flink.runtime.operators.TaskContext;
import org.apache.flink.runtime.operators.testutils.DummyInvokable;
import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
......@@ -227,6 +227,6 @@ public class TestTaskContext<S, T> implements TaskContext<S, T> {
@Override
public OperatorMetricGroup getMetricGroup() {
return new UnregisteredTaskMetricsGroup.DummyOperatorMetricGroup();
return UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup();
}
}
......@@ -26,13 +26,12 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.operators.Driver;
import org.apache.flink.runtime.operators.ResettableDriver;
import org.apache.flink.runtime.operators.TaskContext;
......@@ -373,7 +372,7 @@ public abstract class BinaryOperatorTestBase<S extends Function, IN, OUT> extend
@Override
public OperatorMetricGroup getMetricGroup() {
return new UnregisteredTaskMetricsGroup.DummyOperatorMetricGroup();
return UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup();
}
// --------------------------------------------------------------------------------------------
......
......@@ -29,6 +29,7 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.operators.Driver;
import org.apache.flink.runtime.operators.ResettableDriver;
import org.apache.flink.runtime.operators.TaskContext;
......@@ -368,7 +369,7 @@ public abstract class DriverTestBase<S extends Function> extends TestLogger impl
@Override
public OperatorMetricGroup getMetricGroup() {
return new UnregisteredTaskMetricsGroup.DummyOperatorMetricGroup();
return UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup();
}
// --------------------------------------------------------------------------------------------
......
......@@ -37,6 +37,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
......@@ -100,7 +101,7 @@ public class DummyEnvironment implements Environment {
@Override
public TaskMetricGroup getMetricGroup() {
return new UnregisteredTaskMetricsGroup();
return UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
}
@Override
......
......@@ -45,6 +45,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
......@@ -281,7 +282,7 @@ public class MockEnvironment implements Environment {
@Override
public TaskMetricGroup getMetricGroup() {
return new UnregisteredTaskMetricsGroup();
return UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
}
@Override
......
......@@ -31,6 +31,7 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.operators.Driver;
import org.apache.flink.runtime.operators.TaskContext;
import org.apache.flink.runtime.operators.ResettableDriver;
......@@ -364,7 +365,7 @@ public abstract class UnaryOperatorTestBase<S extends Function, IN, OUT> extends
@Override
public OperatorMetricGroup getMetricGroup() {
return new UnregisteredTaskMetricsGroup.DummyOperatorMetricGroup();
return UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup();
}
// --------------------------------------------------------------------------------------------
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.operators.testutils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import java.util.UUID;
public class UnregisteredTaskMetricsGroup extends TaskMetricGroup {
private static final MetricRegistry EMPTY_REGISTRY = new NoOpMetricRegistry();
public UnregisteredTaskMetricsGroup() {
super(EMPTY_REGISTRY, new DummyJobMetricGroup(),
new JobVertexID(), new ExecutionAttemptID(), "testtask", 0, 0);
}
@Override
protected void addMetric(String name, Metric metric) {}
@Override
public MetricGroup addGroup(String name) {
return new UnregisteredMetricsGroup();
}
// ------------------------------------------------------------------------
private static class DummyTaskManagerMetricsGroup extends TaskManagerMetricGroup {
public DummyTaskManagerMetricsGroup() {
super(EMPTY_REGISTRY, "localhost", UUID.randomUUID().toString());
}
}
private static class DummyJobMetricGroup extends TaskManagerJobMetricGroup {
public DummyJobMetricGroup() {
super(EMPTY_REGISTRY, new DummyTaskManagerMetricsGroup(), new JobID(), "testjob");
}
}
public static class DummyTaskIOMetricGroup extends TaskIOMetricGroup {
public DummyTaskIOMetricGroup() {
super(new UnregisteredTaskMetricsGroup());
}
}
public static class DummyOperatorMetricGroup extends OperatorMetricGroup {
public DummyOperatorMetricGroup() {
super(EMPTY_REGISTRY, new UnregisteredTaskMetricsGroup(), new OperatorID(), "testoperator");
}
}
}
......@@ -97,7 +97,7 @@ public class ResourceManagerTest extends TestLogger {
highAvailabilityServices,
new HeartbeatServices(1000L, 10000L),
slotManager,
new NoOpMetricRegistry(),
NoOpMetricRegistry.INSTANCE,
jobLeaderIdService,
testingFatalErrorHandler);
......
......@@ -98,7 +98,7 @@ public class TaskManagerComponentsStartupShutdownTest extends TestLogger {
TestingUtils.defaultExecutor(),
TestingUtils.defaultExecutor(),
highAvailabilityServices,
new NoOpMetricRegistry(),
NoOpMetricRegistry.INSTANCE,
Option.empty(),
JobManager.class,
MemoryArchivist.class)._1();
......@@ -168,7 +168,7 @@ public class TaskManagerComponentsStartupShutdownTest extends TestLogger {
network,
numberOfSlots,
highAvailabilityServices,
new TaskManagerMetricGroup(new NoOpMetricRegistry(), connectionInfo.getHostname(), connectionInfo.getResourceID().getResourceIdString()));
new TaskManagerMetricGroup(NoOpMetricRegistry.INSTANCE, connectionInfo.getHostname(), connectionInfo.getResourceID().getResourceIdString()));
taskManager = actorSystem.actorOf(tmProps);
......
......@@ -122,7 +122,7 @@ public abstract class TaskManagerProcessReapingTestBase extends TestLogger {
TestingUtils.defaultExecutor(),
TestingUtils.defaultExecutor(),
highAvailabilityServices,
new NoOpMetricRegistry(),
NoOpMetricRegistry.INSTANCE,
Option.empty(),
JobManager.class,
MemoryArchivist.class)._1;
......
......@@ -250,7 +250,7 @@ public class TaskManagerStartupTest extends TestLogger {
ResourceID.generate(),
null,
highAvailabilityServices,
new NoOpMetricRegistry(),
NoOpMetricRegistry.INSTANCE,
"localhost",
Option.<String>empty(),
false,
......
......@@ -52,7 +52,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration;
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
......@@ -193,7 +193,7 @@ public class JvmExitOnFatalErrorTest {
new String[0]),
new FileCache(tmInfo.getTmpDirectories()),
tmInfo,
new UnregisteredTaskMetricsGroup(),
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(),
new NoOpResultPartitionConsumableNotifier(),
new NoOpPartitionProducerStateChecker(),
executor);
......
......@@ -33,12 +33,12 @@ import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointOptions.CheckpointType;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.DefaultKeyedStateStore;
......@@ -155,7 +155,7 @@ public abstract class AbstractStreamOperator<OUT>
// --------------- Metrics ---------------------------
/** Metric group for the operator. */
protected transient MetricGroup metrics;
protected transient OperatorMetricGroup metrics;
protected transient LatencyGauge latencyGauge;
......@@ -191,7 +191,7 @@ public abstract class AbstractStreamOperator<OUT>
this.metrics = operatorMetricGroup;
} catch (Exception e) {
LOG.warn("An error occurred while instantiating task metrics.", e);
this.metrics = new UnregisteredMetricsGroup();
this.metrics = UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup();
this.output = output;
}
Configuration taskManagerConfig = container.getEnvironment().getTaskManagerInfo().getConfiguration();
......
......@@ -25,6 +25,7 @@ import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
......@@ -49,6 +50,9 @@ import org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import static org.apache.flink.util.Preconditions.checkNotNull;
......@@ -70,6 +74,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
@Internal
public class StreamInputProcessor<IN> {
private static final Logger LOG = LoggerFactory.getLogger(StreamInputProcessor.class);
private final RecordDeserializer<DeserializationDelegate<StreamElement>>[] recordDeserializers;
private RecordDeserializer<DeserializationDelegate<StreamElement>> currentRecordDeserializer;
......@@ -169,7 +175,12 @@ public class StreamInputProcessor<IN> {
return false;
}
if (numRecordsIn == null) {
numRecordsIn = ((OperatorMetricGroup) streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
try {
numRecordsIn = ((OperatorMetricGroup) streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
} catch (Exception e) {
LOG.warn("An exception occurred during the metrics setup.", e);
numRecordsIn = new SimpleCounter();
}
}
while (true) {
......
......@@ -25,6 +25,7 @@ import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
......@@ -49,6 +50,9 @@ import org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collection;
......@@ -72,6 +76,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
@Internal
public class StreamTwoInputProcessor<IN1, IN2> {
private static final Logger LOG = LoggerFactory.getLogger(StreamTwoInputProcessor.class);
private final RecordDeserializer<DeserializationDelegate<StreamElement>>[] recordDeserializers;
private RecordDeserializer<DeserializationDelegate<StreamElement>> currentRecordDeserializer;
......@@ -201,7 +207,12 @@ public class StreamTwoInputProcessor<IN1, IN2> {
return false;
}
if (numRecordsIn == null) {
numRecordsIn = ((OperatorMetricGroup) streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
try {
numRecordsIn = ((OperatorMetricGroup) streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
} catch (Exception e) {
LOG.warn("An exception occurred during the metrics setup.", e);
numRecordsIn = new SimpleCounter();
}
}
while (true) {
......
......@@ -21,11 +21,13 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup;
import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.streaming.api.collector.selector.CopyingDirectedOutput;
......@@ -426,7 +428,21 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
StreamStatusProvider streamStatusProvider,
OutputTag<T> outputTag) {
this.operator = operator;
this.numRecordsIn = ((OperatorMetricGroup) operator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
{
Counter tmpNumRecordsIn;
try {
OperatorIOMetricGroup ioMetricGroup = ((OperatorMetricGroup) operator.getMetricGroup()).getIOMetricGroup();
ioMetricGroup.reuseInputMetricsForTask();
ioMetricGroup.reuseOutputMetricsForTask();
tmpNumRecordsIn = ioMetricGroup.getNumRecordsInCounter();
} catch (Exception e) {
LOG.warn("An exception occurred during the metrics setup.", e);
tmpNumRecordsIn = new SimpleCounter();
}
numRecordsIn = tmpNumRecordsIn;
}
this.streamStatusProvider = streamStatusProvider;
this.outputTag = outputTag;
}
......
......@@ -33,6 +33,7 @@ import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.junit.Test;
......@@ -101,7 +102,7 @@ public class RichAsyncFunctionTest {
};
final String taskName = "foobarTask";
final MetricGroup metricGroup = mock(MetricGroup.class);
final MetricGroup metricGroup = new UnregisteredMetricsGroup();
final int numberOfParallelSubtasks = 42;
final int indexOfSubtask = 43;
final int attemptNumber = 1337;
......
......@@ -35,8 +35,8 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
......@@ -658,7 +658,7 @@ public class AsyncWaitOperatorTest extends TestLogger {
final Configuration taskConfiguration = new Configuration();
final ExecutionConfig executionConfig = new ExecutionConfig();
final TaskMetricGroup metricGroup = new UnregisteredTaskMetricsGroup();
final TaskMetricGroup metricGroup = UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
final TaskManagerRuntimeInfo taskManagerRuntimeInfo = new TestingTaskManagerRuntimeInfo();
final TaskInfo taskInfo = new TaskInfo("foobarTask", 1, 0, 1, 1);
......
......@@ -49,7 +49,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
import org.apache.flink.runtime.state.FunctionInitializationContext;
......@@ -272,7 +272,7 @@ public class InterruptSensitiveRestoreTest {
new String[0]),
new FileCache(new String[] { EnvironmentInformation.getTemporaryFileDirectory() }),
new TestingTaskManagerRuntimeInfo(),
new UnregisteredTaskMetricsGroup(),
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(),
mock(ResultPartitionConsumableNotifier.class),
mock(PartitionProducerStateChecker.class),
mock(Executor.class));
......
......@@ -47,8 +47,8 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
import org.apache.flink.runtime.plugable.DeserializationDelegate;
import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
import org.apache.flink.runtime.query.KvStateRegistry;
......@@ -363,6 +363,6 @@ public class StreamMockEnvironment implements Environment {
@Override
public TaskMetricGroup getMetricGroup() {
return new UnregisteredTaskMetricsGroup();
return UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
}
}
......@@ -49,7 +49,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.AbstractStateBackend;
......@@ -168,7 +168,7 @@ public class StreamTaskTerminationTest extends TestLogger {
new String[0]),
mock(FileCache.class),
taskManagerRuntimeInfo,
new UnregisteredTaskMetricsGroup(),
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(),
mock(ResultPartitionConsumableNotifier.class),
mock(PartitionProducerStateChecker.class),
Executors.directExecutor());
......
......@@ -58,9 +58,9 @@ import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.AbstractStateBackend;
......@@ -935,7 +935,7 @@ public class StreamTaskTest extends TestLogger {
libCache,
mock(FileCache.class),
new TestingTaskManagerRuntimeInfo(taskManagerConfig, new String[] {System.getProperty("java.io.tmpdir")}),
new UnregisteredTaskMetricsGroup(),
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(),
consumableNotifier,
partitionProducerStateChecker,
executor);
......
......@@ -52,7 +52,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
......@@ -244,7 +244,7 @@ public class TaskCheckpointingBehaviourTest extends TestLogger {
new String[0]),
new FileCache(new String[] { EnvironmentInformation.getTemporaryFileDirectory() }),
new TestingTaskManagerRuntimeInfo(),
new UnregisteredTaskMetricsGroup(),
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(),
mock(ResultPartitionConsumableNotifier.class),
mock(PartitionProducerStateChecker.class),
Executors.directExecutor());
......
......@@ -146,7 +146,7 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test
TestingUtils.defaultExecutor(),
TestingUtils.defaultExecutor(),
highAvailabilityServices,
new NoOpMetricRegistry(),
NoOpMetricRegistry.INSTANCE,
Option.empty(),
JobManager.class,
MemoryArchivist.class)._1();
......
......@@ -218,7 +218,7 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger {
ResourceID.generate(),
taskManagerSystem,
highAvailabilityServices,
new NoOpMetricRegistry(),
NoOpMetricRegistry.INSTANCE,
"localhost",
Option.<String>empty(),
false,
......
......@@ -282,7 +282,7 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger {
ResourceID.generate(),
tmActorSystem[i],
highAvailabilityServices,
new NoOpMetricRegistry(),
NoOpMetricRegistry.INSTANCE,
"localhost",
Option.<String>empty(),
false,
......
......@@ -121,7 +121,7 @@ public class ProcessFailureCancelingITCase extends TestLogger {
TestingUtils.defaultExecutor(),
TestingUtils.defaultExecutor(),
highAvailabilityServices,
new NoOpMetricRegistry(),
NoOpMetricRegistry.INSTANCE,
Option.empty(),
JobManager.class,
MemoryArchivist.class)._1();
......
......@@ -122,7 +122,7 @@ public abstract class AbstractOperatorRestoreTestBase extends TestLogger {
TestingUtils.defaultExecutor(),
TestingUtils.defaultExecutor(),
highAvailabilityServices,
new NoOpMetricRegistry(),
NoOpMetricRegistry.INSTANCE,
Option.empty(),
Option.apply("jm"),
Option.apply("arch"),
......@@ -144,7 +144,7 @@ public abstract class AbstractOperatorRestoreTestBase extends TestLogger {
ResourceID.generate(),
actorSystem,
highAvailabilityServices,
new NoOpMetricRegistry(),
NoOpMetricRegistry.INSTANCE,
"localhost",
Option.apply("tm"),
true,
......
......@@ -257,7 +257,7 @@ public class YarnResourceManagerTest extends TestLogger {
rmLeaderElectionService = new TestingLeaderElectionService();
highAvailabilityServices.setResourceManagerLeaderElectionService(rmLeaderElectionService);
heartbeatServices = new TestingHeartbeatServices(5L, 5L, scheduledExecutor);
metricRegistry = new NoOpMetricRegistry();
metricRegistry = NoOpMetricRegistry.INSTANCE;
slotManager = new SlotManager(
new ScheduledExecutorServiceAdapter(new DirectScheduledExecutorService()),
Time.seconds(10), Time.seconds(10), Time.minutes(1));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册