提交 c78b3c49 编写于 作者: Z zentol

[FLINK-3949] Collect Metrics in Runtime Operators

currentLowWatermark
lastCheckpointSize
numBytes(In/Out)(Local/Remote)
numRecords(In/Out)
numSplitsProcessed (Batch only)

This closes #2090
上级 10495852
......@@ -26,33 +26,27 @@ import org.apache.flink.metrics.MetricRegistry;
*/
public class IOMetricGroup extends AbstractMetricGroup {
private final Counter numBytesIn;
private final Counter numBytesOut;
private final Counter numRecordsIn;
private final Counter numRecordsOut;
private final Counter numBytesInLocal;
private final Counter numBytesInRemote;
public IOMetricGroup(MetricRegistry registry, TaskMetricGroup parent) {
super(registry, parent.getScopeComponents());
this.numBytesIn = parent.counter("numBytesIn");
this.numBytesOut = parent.counter("numBytesOut");
this.numRecordsIn = parent.counter("numRecordsIn");
this.numRecordsOut = parent.counter("numRecordsOut");
}
public Counter getBytesInCounter() {
return numBytesIn;
this.numBytesInLocal = parent.counter("numBytesInLocal");
this.numBytesInRemote = parent.counter("numBytesInRemote");
}
public Counter getBytesOutCounter() {
return numBytesOut;
}
public Counter getRecordsInCounter() {
return numRecordsIn;
public Counter getNumBytesInLocalCounter() {
return numBytesInLocal;
}
public Counter getRecordsOutCounter() {
return numRecordsOut;
public Counter getNumBytesInRemoteCounter() {
return numBytesInRemote;
}
}
......@@ -19,7 +19,6 @@
package org.apache.flink.runtime.io.network.api.reader;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.metrics.groups.IOMetricGroup;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult;
......@@ -131,11 +130,4 @@ abstract class AbstractRecordReader<T extends IOReadableWritable> extends Abstra
deserializer.setReporter(reporter);
}
}
@Override
public void setMetricGroup(IOMetricGroup metrics) {
for (RecordDeserializer<?> deserializer : recordDeserializers) {
deserializer.instantiateMetrics(metrics);
}
}
}
......@@ -18,7 +18,6 @@
package org.apache.flink.runtime.io.network.api.reader;
import org.apache.flink.metrics.groups.IOMetricGroup;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
......@@ -54,8 +53,4 @@ public final class BufferReader extends AbstractReader {
public void setReporter(AccumulatorRegistry.Reporter reporter) {
}
@Override
public void setMetricGroup(IOMetricGroup metrics) {
}
}
......@@ -20,7 +20,6 @@ package org.apache.flink.runtime.io.network.api.reader;
import java.io.IOException;
import org.apache.flink.metrics.groups.IOMetricGroup;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.util.event.EventListener;
......@@ -58,11 +57,4 @@ public interface ReaderBase {
*/
void setReporter(AccumulatorRegistry.Reporter reporter);
/**
* Setter for the metric group.
*
* @param metrics metric group to set
*/
void setMetricGroup(IOMetricGroup metrics);
}
......@@ -21,8 +21,6 @@ package org.apache.flink.runtime.io.network.api.serialization;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.groups.IOMetricGroup;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.util.DataInputDeserializer;
......@@ -49,9 +47,6 @@ public class AdaptiveSpanningRecordDeserializer<T extends IOReadableWritable> im
private Buffer currentBuffer;
private AccumulatorRegistry.Reporter reporter;
private transient Counter numRecordsIn;
private transient Counter numBytesIn;
public AdaptiveSpanningRecordDeserializer() {
this.nonSpanningWrapper = new NonSpanningWrapper();
......@@ -101,9 +96,6 @@ public class AdaptiveSpanningRecordDeserializer<T extends IOReadableWritable> im
if (reporter != null) {
reporter.reportNumBytesIn(len);
}
if (numBytesIn != null) {
numBytesIn.inc(len);
}
if (len <= nonSpanningRemaining - 4) {
// we can get a full record from here
......@@ -112,9 +104,6 @@ public class AdaptiveSpanningRecordDeserializer<T extends IOReadableWritable> im
if (reporter != null) {
reporter.reportNumRecordsIn(1);
}
if (numRecordsIn != null) {
numRecordsIn.inc();
}
return (this.nonSpanningWrapper.remaining() == 0) ?
DeserializationResult.LAST_RECORD_FROM_BUFFER :
......@@ -142,9 +131,6 @@ public class AdaptiveSpanningRecordDeserializer<T extends IOReadableWritable> im
if (reporter != null) {
reporter.reportNumRecordsIn(1);
}
if (numRecordsIn != null) {
numRecordsIn.inc();
}
// move the remainder to the non-spanning wrapper
// this does not copy it, only sets the memory segment
......@@ -179,12 +165,6 @@ public class AdaptiveSpanningRecordDeserializer<T extends IOReadableWritable> im
this.spanningWrapper.setReporter(reporter);
}
@Override
public void instantiateMetrics(IOMetricGroup metrics) {
numBytesIn = metrics.getBytesInCounter();
numRecordsIn = metrics.getRecordsInCounter();
}
// -----------------------------------------------------------------------------------------------------------------
private static final class NonSpanningWrapper implements DataInputView {
......
......@@ -23,7 +23,6 @@ import java.io.IOException;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.metrics.groups.IOMetricGroup;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.io.network.buffer.Buffer;
......@@ -71,11 +70,4 @@ public interface RecordDeserializer<T extends IOReadableWritable> {
* Setter for the reporter, e.g. for the number of records emitted and the number of bytes read.
*/
void setReporter(AccumulatorRegistry.Reporter reporter);
/**
* Instantiates all metrics.
*
* @param metrics metric group
*/
void instantiateMetrics(IOMetricGroup metrics);
}
......@@ -55,7 +55,6 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R
private AccumulatorRegistry.Reporter reporter;
private transient Counter numRecordsOut;
private transient Counter numBytesOut;
public SpanningRecordSerializer() {
......@@ -94,10 +93,6 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R
if (numBytesOut != null) {
numBytesOut.inc(len);
}
if (numRecordsOut != null) {
numRecordsOut.inc();
}
this.dataBuffer = this.serializationBuffer.wrapAsByteBuffer();
......@@ -204,6 +199,5 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R
@Override
public void instantiateMetrics(IOMetricGroup metrics) {
numBytesOut = metrics.getBytesOutCounter();
numRecordsOut = metrics.getRecordsOutCounter();
}
}
......@@ -22,8 +22,6 @@ import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.groups.IOMetricGroup;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.util.DataInputDeserializer;
......@@ -63,9 +61,6 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit
private AccumulatorRegistry.Reporter reporter;
private Counter numRecordsIn;
private Counter numBytesIn;
public SpillingAdaptiveSpanningRecordDeserializer(String[] tmpDirectories) {
this.nonSpanningWrapper = new NonSpanningWrapper();
this.spanningWrapper = new SpanningWrapper(tmpDirectories);
......@@ -114,9 +109,6 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit
if (reporter != null) {
reporter.reportNumBytesIn(len);
}
if (numBytesIn != null) {
numBytesIn.inc(len);
}
if (len <= nonSpanningRemaining - 4) {
// we can get a full record from here
......@@ -126,9 +118,6 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit
if (reporter != null) {
reporter.reportNumRecordsIn(1);
}
if (numRecordsIn != null) {
numRecordsIn.inc();
}
int remaining = this.nonSpanningWrapper.remaining();
if (remaining > 0) {
......@@ -168,9 +157,6 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit
if (reporter != null) {
reporter.reportNumRecordsIn(1);
}
if (numRecordsIn != null) {
numRecordsIn.inc();
}
// move the remainder to the non-spanning wrapper
// this does not copy it, only sets the memory segment
......@@ -202,12 +188,6 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit
this.spanningWrapper.setReporter(reporter);
}
@Override
public void instantiateMetrics(IOMetricGroup metrics) {
numBytesIn = metrics.getBytesInCounter();
numRecordsIn = metrics.getRecordsInCounter();
}
// -----------------------------------------------------------------------------------------------------------------
......
......@@ -18,6 +18,7 @@
package org.apache.flink.runtime.io.network.partition.consumer;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.io.network.buffer.Buffer;
......@@ -61,6 +62,8 @@ public abstract class InputChannel {
/** The maximum backoff (in ms). */
private final int maxBackoff;
protected final Counter numBytesIn;
/** The current backoff (in ms) */
private int currentBackoff;
......@@ -68,7 +71,8 @@ public abstract class InputChannel {
SingleInputGate inputGate,
int channelIndex,
ResultPartitionID partitionId,
Tuple2<Integer, Integer> initialAndMaxBackoff) {
Tuple2<Integer, Integer> initialAndMaxBackoff,
Counter numBytesIn) {
checkArgument(channelIndex >= 0);
......@@ -84,6 +88,8 @@ public abstract class InputChannel {
this.initialBackoff = initial;
this.maxBackoff = max;
this.currentBackoff = initial == 0 ? -1 : 0;
this.numBytesIn = numBytesIn;
}
// ------------------------------------------------------------------------
......
......@@ -18,6 +18,7 @@
package org.apache.flink.runtime.io.network.partition.consumer;
import org.apache.flink.metrics.groups.IOMetricGroup;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
......@@ -67,10 +68,11 @@ public class LocalInputChannel extends InputChannel implements NotificationListe
int channelIndex,
ResultPartitionID partitionId,
ResultPartitionManager partitionManager,
TaskEventDispatcher taskEventDispatcher) {
TaskEventDispatcher taskEventDispatcher,
IOMetricGroup metrics) {
this(inputGate, channelIndex, partitionId, partitionManager, taskEventDispatcher,
new Tuple2<Integer, Integer>(0, 0));
new Tuple2<Integer, Integer>(0, 0), metrics);
}
LocalInputChannel(
......@@ -79,9 +81,10 @@ public class LocalInputChannel extends InputChannel implements NotificationListe
ResultPartitionID partitionId,
ResultPartitionManager partitionManager,
TaskEventDispatcher taskEventDispatcher,
Tuple2<Integer, Integer> initialAndMaxBackoff) {
Tuple2<Integer, Integer> initialAndMaxBackoff,
IOMetricGroup metrics) {
super(inputGate, channelIndex, partitionId, initialAndMaxBackoff);
super(inputGate, channelIndex, partitionId, initialAndMaxBackoff, metrics.getNumBytesInLocalCounter());
this.partitionManager = checkNotNull(partitionManager);
this.taskEventDispatcher = checkNotNull(taskEventDispatcher);
......@@ -165,6 +168,7 @@ public class LocalInputChannel extends InputChannel implements NotificationListe
getNextLookAhead();
numBytesIn.inc(next.getSize());
return next;
}
......
......@@ -18,6 +18,7 @@
package org.apache.flink.runtime.io.network.partition.consumer;
import org.apache.flink.metrics.groups.IOMetricGroup;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.ConnectionManager;
......@@ -80,10 +81,11 @@ public class RemoteInputChannel extends InputChannel {
int channelIndex,
ResultPartitionID partitionId,
ConnectionID connectionId,
ConnectionManager connectionManager) {
ConnectionManager connectionManager,
IOMetricGroup metrics) {
this(inputGate, channelIndex, partitionId, connectionId, connectionManager,
new Tuple2<Integer, Integer>(0, 0));
new Tuple2<Integer, Integer>(0, 0), metrics);
}
public RemoteInputChannel(
......@@ -92,9 +94,10 @@ public class RemoteInputChannel extends InputChannel {
ResultPartitionID partitionId,
ConnectionID connectionId,
ConnectionManager connectionManager,
Tuple2<Integer, Integer> initialAndMaxBackoff) {
Tuple2<Integer, Integer> initialAndMaxBackoff,
IOMetricGroup metrics) {
super(inputGate, channelIndex, partitionId, initialAndMaxBackoff);
super(inputGate, channelIndex, partitionId, initialAndMaxBackoff, metrics.getNumBytesInRemoteCounter());
this.connectionId = checkNotNull(connectionId);
this.connectionManager = checkNotNull(connectionManager);
......@@ -148,6 +151,7 @@ public class RemoteInputChannel extends InputChannel {
throw new IOException("Queried input channel for data although non is available.");
}
numBytesIn.inc(buffer.getSize());
return buffer;
}
}
......
......@@ -20,6 +20,7 @@ package org.apache.flink.runtime.io.network.partition.consumer;
import com.google.common.collect.Maps;
import org.apache.flink.api.common.JobID;
import org.apache.flink.metrics.groups.IOMetricGroup;
import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionLocation;
......@@ -173,7 +174,8 @@ public class SingleInputGate implements InputGate {
IntermediateDataSetID consumedResultId,
int consumedSubpartitionIndex,
int numberOfInputChannels,
PartitionStateChecker partitionStateChecker) {
PartitionStateChecker partitionStateChecker,
IOMetricGroup metrics) {
this.owningTaskName = checkNotNull(owningTaskName);
this.jobId = checkNotNull(jobId);
......@@ -502,7 +504,8 @@ public class SingleInputGate implements InputGate {
JobID jobId,
ExecutionAttemptID executionId,
InputGateDeploymentDescriptor igdd,
NetworkEnvironment networkEnvironment) {
NetworkEnvironment networkEnvironment,
IOMetricGroup metrics) {
final IntermediateDataSetID consumedResultId = checkNotNull(igdd.getConsumedResultId());
......@@ -512,7 +515,8 @@ public class SingleInputGate implements InputGate {
final InputChannelDeploymentDescriptor[] icdd = checkNotNull(igdd.getInputChannelDeploymentDescriptors());
final SingleInputGate inputGate = new SingleInputGate(
owningTaskName, jobId, executionId, consumedResultId, consumedSubpartitionIndex, icdd.length, networkEnvironment.getPartitionStateChecker());
owningTaskName, jobId, executionId, consumedResultId, consumedSubpartitionIndex,
icdd.length, networkEnvironment.getPartitionStateChecker(), metrics);
// Create the input channels. There is one input channel for each consumed partition.
final InputChannel[] inputChannels = new InputChannel[icdd.length];
......@@ -526,13 +530,16 @@ public class SingleInputGate implements InputGate {
inputChannels[i] = new LocalInputChannel(inputGate, i, partitionId,
networkEnvironment.getPartitionManager(),
networkEnvironment.getTaskEventDispatcher(),
networkEnvironment.getPartitionRequestInitialAndMaxBackoff());
networkEnvironment.getPartitionRequestInitialAndMaxBackoff(),
metrics
);
}
else if (partitionLocation.isRemote()) {
inputChannels[i] = new RemoteInputChannel(inputGate, i, partitionId,
partitionLocation.getConnectionId(),
networkEnvironment.getConnectionManager(),
networkEnvironment.getPartitionRequestInitialAndMaxBackoff()
networkEnvironment.getPartitionRequestInitialAndMaxBackoff(),
metrics
);
}
else if (partitionLocation.isUnknown()) {
......@@ -540,7 +547,8 @@ public class SingleInputGate implements InputGate {
networkEnvironment.getPartitionManager(),
networkEnvironment.getTaskEventDispatcher(),
networkEnvironment.getConnectionManager(),
networkEnvironment.getPartitionRequestInitialAndMaxBackoff()
networkEnvironment.getPartitionRequestInitialAndMaxBackoff(),
metrics
);
}
else {
......
......@@ -18,6 +18,7 @@
package org.apache.flink.runtime.io.network.partition.consumer;
import org.apache.flink.metrics.groups.IOMetricGroup;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.ConnectionManager;
......@@ -47,6 +48,8 @@ public class UnknownInputChannel extends InputChannel {
/** Initial and maximum backoff (in ms) after failed partition requests. */
private final Tuple2<Integer, Integer> partitionRequestInitialAndMaxBackoff;
private final IOMetricGroup metrics;
public UnknownInputChannel(
SingleInputGate gate,
int channelIndex,
......@@ -54,14 +57,16 @@ public class UnknownInputChannel extends InputChannel {
ResultPartitionManager partitionManager,
TaskEventDispatcher taskEventDispatcher,
ConnectionManager connectionManager,
Tuple2<Integer, Integer> partitionRequestInitialAndMaxBackoff) {
Tuple2<Integer, Integer> partitionRequestInitialAndMaxBackoff,
IOMetricGroup metrics) {
super(gate, channelIndex, partitionId, partitionRequestInitialAndMaxBackoff);
super(gate, channelIndex, partitionId, partitionRequestInitialAndMaxBackoff, null);
this.partitionManager = checkNotNull(partitionManager);
this.taskEventDispatcher = checkNotNull(taskEventDispatcher);
this.connectionManager = checkNotNull(connectionManager);
this.partitionRequestInitialAndMaxBackoff = checkNotNull(partitionRequestInitialAndMaxBackoff);
this.metrics = checkNotNull(metrics);
}
@Override
......@@ -112,10 +117,10 @@ public class UnknownInputChannel extends InputChannel {
// ------------------------------------------------------------------------
public RemoteInputChannel toRemoteInputChannel(ConnectionID producerAddress) {
return new RemoteInputChannel(inputGate, channelIndex, partitionId, checkNotNull(producerAddress), connectionManager, partitionRequestInitialAndMaxBackoff);
return new RemoteInputChannel(inputGate, channelIndex, partitionId, checkNotNull(producerAddress), connectionManager, partitionRequestInitialAndMaxBackoff, metrics);
}
public LocalInputChannel toLocalInputChannel() {
return new LocalInputChannel(inputGate, channelIndex, partitionId, partitionManager, taskEventDispatcher, partitionRequestInitialAndMaxBackoff);
return new LocalInputChannel(inputGate, channelIndex, partitionId, partitionManager, taskEventDispatcher, partitionRequestInitialAndMaxBackoff, metrics);
}
}
......@@ -25,12 +25,15 @@ import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.operators.hash.NonReusingBuildFirstReOpenableHashJoinIterator;
import org.apache.flink.runtime.operators.hash.NonReusingBuildSecondReOpenableHashJoinIterator;
import org.apache.flink.runtime.operators.hash.ReusingBuildFirstReOpenableHashJoinIterator;
import org.apache.flink.runtime.operators.hash.ReusingBuildSecondReOpenableHashJoinIterator;
import org.apache.flink.runtime.operators.util.JoinTaskIterator;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.runtime.operators.util.metrics.CountingCollector;
import org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
......@@ -63,13 +66,15 @@ public abstract class AbstractCachedBuildSideJoinDriver<IT1, IT2, OT> extends Jo
@Override
public void initialize() throws Exception {
TaskConfig config = this.taskContext.getTaskConfig();
final Counter numRecordsIn = taskContext.getMetricGroup().counter("numRecordsIn");
TypeSerializer<IT1> serializer1 = this.taskContext.<IT1>getInputSerializer(0).getSerializer();
TypeSerializer<IT2> serializer2 = this.taskContext.<IT2>getInputSerializer(1).getSerializer();
TypeComparator<IT1> comparator1 = this.taskContext.getDriverComparator(0);
TypeComparator<IT2> comparator2 = this.taskContext.getDriverComparator(1);
MutableObjectIterator<IT1> input1 = this.taskContext.getInput(0);
MutableObjectIterator<IT2> input2 = this.taskContext.getInput(1);
MutableObjectIterator<IT1> input1 = new CountingMutableObjectIterator<>(this.taskContext.<IT1>getInput(0), numRecordsIn);
MutableObjectIterator<IT2> input2 = new CountingMutableObjectIterator<>(this.taskContext.<IT2>getInput(1), numRecordsIn);
TypePairComparatorFactory<IT1, IT2> pairComparatorFactory =
this.taskContext.getTaskConfig().getPairComparatorFactory(this.taskContext.getUserCodeClassLoader());
......@@ -164,8 +169,9 @@ public abstract class AbstractCachedBuildSideJoinDriver<IT1, IT2, OT> extends Jo
@Override
public void run() throws Exception {
final Counter numRecordsOut = taskContext.getMetricGroup().counter("numRecordsOut");
final FlatJoinFunction<IT1, IT2, OT> matchStub = this.taskContext.getStub();
final Collector<OT> collector = this.taskContext.getOutputCollector();
final Collector<OT> collector = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut);
while (this.running && matchIterator != null && matchIterator.callWithNextKey(matchStub, collector));
}
......
......@@ -23,10 +23,13 @@ import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.util.JoinTaskIterator;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.runtime.operators.util.metrics.CountingCollector;
import org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
import org.slf4j.Logger;
......@@ -84,9 +87,10 @@ public abstract class AbstractOuterJoinDriver<IT1, IT2, OT> implements Driver<Fl
final double driverMemFraction = config.getRelativeMemoryDriver();
final DriverStrategy ls = config.getDriverStrategy();
final MutableObjectIterator<IT1> in1 = this.taskContext.getInput(0);
final MutableObjectIterator<IT2> in2 = this.taskContext.getInput(1);
final Counter numRecordsIn = this.taskContext.getMetricGroup().counter("numRecordsIn");
final MutableObjectIterator<IT1> in1 = new CountingMutableObjectIterator<>(this.taskContext.<IT1>getInput(0), numRecordsIn);
final MutableObjectIterator<IT2> in2 = new CountingMutableObjectIterator<>(this.taskContext.<IT2>getInput(1), numRecordsIn);
// get serializers and comparators
final TypeSerializer<IT1> serializer1 = this.taskContext.<IT1>getInputSerializer(0).getSerializer();
......@@ -147,8 +151,10 @@ public abstract class AbstractOuterJoinDriver<IT1, IT2, OT> implements Driver<Fl
@Override
public void run() throws Exception {
final Counter numRecordsOut = this.taskContext.getMetricGroup().counter("numRecordsOut");
final FlatJoinFunction<IT1, IT2, OT> joinStub = this.taskContext.getStub();
final Collector<OT> collector = this.taskContext.getOutputCollector();
final Collector<OT> collector = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut);
final JoinTaskIterator<IT1, IT2, OT> outerJoinIterator = this.outerJoinIterator;
while (this.running && outerJoinIterator.callWithNextKey(joinStub, collector)) ;
......
......@@ -23,6 +23,9 @@ import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.operators.util.metrics.CountingCollector;
import org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator;
import org.apache.flink.runtime.util.NonReusingMutableToRegularIteratorWrapper;
import org.apache.flink.runtime.util.ReusingMutableToRegularIteratorWrapper;
import org.apache.flink.util.Collector;
......@@ -91,12 +94,15 @@ public class AllGroupCombineDriver<IN, OUT> implements Driver<GroupCombineFuncti
LOG.debug("AllGroupCombine starting.");
}
final Counter numRecordsIn = this.taskContext.getMetricGroup().counter("numRecordsIn");
final Counter numRecordsOut = this.taskContext.getMetricGroup().counter("numRecordsOut");
final TypeSerializerFactory<IN> serializerFactory = this.taskContext.getInputSerializer(0);
TypeSerializer<IN> serializer = serializerFactory.getSerializer();
final MutableObjectIterator<IN> in = this.taskContext.getInput(0);
final MutableObjectIterator<IN> in = new CountingMutableObjectIterator<>(this.taskContext.<IN>getInput(0), numRecordsIn);
final GroupCombineFunction<IN, OUT> reducer = this.taskContext.getStub();
final Collector<OUT> output = this.taskContext.getOutputCollector();
final Collector<OUT> output = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut);
if (objectReuseEnabled) {
final ReusingMutableToRegularIteratorWrapper<IN> inIter = new ReusingMutableToRegularIteratorWrapper<IN>(in, serializer);
......
......@@ -20,6 +20,9 @@
package org.apache.flink.runtime.operators;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.operators.util.metrics.CountingCollector;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flink.api.common.functions.ReduceFunction;
......@@ -104,14 +107,19 @@ public class AllReduceDriver<T> implements Driver<ReduceFunction<T>, T> {
LOG.debug(this.taskContext.formatLogString("AllReduce preprocessing done. Running Reducer code."));
}
final Counter numRecordsIn = this.taskContext.getMetricGroup().counter("numRecordsIn");
final Counter numRecordsOut = this.taskContext.getMetricGroup().counter("numRecordsOut");
final ReduceFunction<T> stub = this.taskContext.getStub();
final MutableObjectIterator<T> input = this.input;
final TypeSerializer<T> serializer = this.serializer;
final Collector<T> collector = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut);
T val1;
if ((val1 = input.next()) == null) {
return;
}
numRecordsIn.inc();
if (objectReuseEnabled) {
// We only need two objects. The first reference stores results and is
......@@ -121,6 +129,7 @@ public class AllReduceDriver<T> implements Driver<ReduceFunction<T>, T> {
T value = val1;
while (running && (val2 = input.next(val2)) != null) {
numRecordsIn.inc();
value = stub.reduce(value, val2);
// we must never read into the object returned
......@@ -132,14 +141,15 @@ public class AllReduceDriver<T> implements Driver<ReduceFunction<T>, T> {
}
}
this.taskContext.getOutputCollector().collect(value);
collector.collect(value);
} else {
T val2;
while (running && (val2 = input.next()) != null) {
numRecordsIn.inc();
val1 = stub.reduce(val1, val2);
}
this.taskContext.getOutputCollector().collect(val1);
collector.collect(val1);
}
}
......
......@@ -676,7 +676,6 @@ public class BatchTask<S extends Function, OT> extends AbstractInvokable impleme
}
inputReaders[i].setReporter(reporter);
inputReaders[i].setMetricGroup(getEnvironment().getMetricGroup().getIOMetricGroup());
currentReaderOffset += groupSize;
}
......
......@@ -20,7 +20,10 @@
package org.apache.flink.runtime.operators;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.operators.sort.NonReusingSortMergeCoGroupIterator;
import org.apache.flink.runtime.operators.util.metrics.CountingCollector;
import org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flink.api.common.functions.CoGroupFunction;
......@@ -93,9 +96,11 @@ public class CoGroupDriver<IT1, IT2, OT> implements Driver<CoGroupFunction<IT1,
if (config.getDriverStrategy() != DriverStrategy.CO_GROUP) {
throw new Exception("Unrecognized driver strategy for CoGoup driver: " + config.getDriverStrategy().name());
}
final Counter numRecordsIn = this.taskContext.getMetricGroup().counter("numRecordsIn");
final MutableObjectIterator<IT1> in1 = this.taskContext.getInput(0);
final MutableObjectIterator<IT2> in2 = this.taskContext.getInput(1);
final MutableObjectIterator<IT1> in1 = new CountingMutableObjectIterator<>(this.taskContext.<IT1>getInput(0), numRecordsIn);
final MutableObjectIterator<IT2> in2 = new CountingMutableObjectIterator<>(this.taskContext.<IT2>getInput(1), numRecordsIn);
// get the key positions and types
final TypeSerializer<IT1> serializer1 = this.taskContext.<IT1>getInputSerializer(0).getSerializer();
......@@ -144,8 +149,10 @@ public class CoGroupDriver<IT1, IT2, OT> implements Driver<CoGroupFunction<IT1,
@Override
public void run() throws Exception
{
final Counter numRecordsOut = this.taskContext.getMetricGroup().counter("numRecordsOut");
final CoGroupFunction<IT1, IT2, OT> coGroupStub = this.taskContext.getStub();
final Collector<OT> collector = this.taskContext.getOutputCollector();
final Collector<OT> collector = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut);
final CoGroupTaskIterator<IT1, IT2> coGroupIterator = this.coGroupIterator;
while (this.running && coGroupIterator.next()) {
......
......@@ -20,6 +20,9 @@
package org.apache.flink.runtime.operators;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.operators.util.metrics.CountingCollector;
import org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flink.api.common.functions.CrossFunction;
......@@ -194,9 +197,12 @@ public class CrossDriver<T1, T2, OT> implements Driver<CrossFunction<T1, T2, OT>
LOG.debug(this.taskContext.formatLogString("Running Cross with Block-Nested-Loops: " +
"First input is outer (blocking) side, second input is inner (spilling) side."));
}
final MutableObjectIterator<T1> in1 = this.taskContext.getInput(0);
final MutableObjectIterator<T2> in2 = this.taskContext.getInput(1);
final Counter numRecordsIn = taskContext.getMetricGroup().counter("numRecordsIn");
final Counter numRecordsOut = taskContext.getMetricGroup().counter("numRecordsOut");
final MutableObjectIterator<T1> in1 = new CountingMutableObjectIterator<>(this.taskContext.<T1>getInput(0), numRecordsIn);
final MutableObjectIterator<T2> in2 = new CountingMutableObjectIterator<>(this.taskContext.<T2>getInput(1), numRecordsIn);
final TypeSerializer<T1> serializer1 = this.taskContext.<T1>getInputSerializer(0).getSerializer();
final TypeSerializer<T2> serializer2 = this.taskContext.<T2>getInputSerializer(1).getSerializer();
......@@ -213,7 +219,7 @@ public class CrossDriver<T1, T2, OT> implements Driver<CrossFunction<T1, T2, OT>
final CrossFunction<T1, T2, OT> crosser = this.taskContext.getStub();
final Collector<OT> collector = this.taskContext.getOutputCollector();
final Collector<OT> collector = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut);
if (objectReuseEnabled) {
......@@ -259,9 +265,12 @@ public class CrossDriver<T1, T2, OT> implements Driver<CrossFunction<T1, T2, OT>
LOG.debug(this.taskContext.formatLogString("Running Cross with Block-Nested-Loops: " +
"First input is inner (spilling) side, second input is outer (blocking) side."));
}
final MutableObjectIterator<T1> in1 = this.taskContext.getInput(0);
final MutableObjectIterator<T2> in2 = this.taskContext.getInput(1);
final Counter numRecordsIn = taskContext.getMetricGroup().counter("numRecordsIn");
final Counter numRecordsOut = taskContext.getMetricGroup().counter("numRecordsOut");
final MutableObjectIterator<T1> in1 = new CountingMutableObjectIterator<>(this.taskContext.<T1>getInput(0), numRecordsIn);
final MutableObjectIterator<T2> in2 = new CountingMutableObjectIterator<>(this.taskContext.<T2>getInput(1), numRecordsIn);
final TypeSerializer<T1> serializer1 = this.taskContext.<T1>getInputSerializer(0).getSerializer();
final TypeSerializer<T2> serializer2 = this.taskContext.<T2>getInputSerializer(1).getSerializer();
......@@ -277,7 +286,7 @@ public class CrossDriver<T1, T2, OT> implements Driver<CrossFunction<T1, T2, OT>
this.blockIter = blockVals;
final CrossFunction<T1, T2, OT> crosser = this.taskContext.getStub();
final Collector<OT> collector = this.taskContext.getOutputCollector();
final Collector<OT> collector = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut);
if (objectReuseEnabled) {
final T1 val1Reuse = serializer1.createInstance();
......@@ -322,9 +331,12 @@ public class CrossDriver<T1, T2, OT> implements Driver<CrossFunction<T1, T2, OT>
LOG.debug(this.taskContext.formatLogString("Running Cross with Nested-Loops: " +
"First input is outer side, second input is inner (spilling) side."));
}
final MutableObjectIterator<T1> in1 = this.taskContext.getInput(0);
final MutableObjectIterator<T2> in2 = this.taskContext.getInput(1);
final Counter numRecordsIn = taskContext.getMetricGroup().counter("numRecordsIn");
final Counter numRecordsOut = taskContext.getMetricGroup().counter("numRecordsOut");
final MutableObjectIterator<T1> in1 = new CountingMutableObjectIterator<>(this.taskContext.<T1>getInput(0), numRecordsIn);
final MutableObjectIterator<T2> in2 = new CountingMutableObjectIterator<>(this.taskContext.<T2>getInput(1), numRecordsIn);
final TypeSerializer<T1> serializer1 = this.taskContext.<T1>getInputSerializer(0).getSerializer();
final TypeSerializer<T2> serializer2 = this.taskContext.<T2>getInputSerializer(1).getSerializer();
......@@ -335,7 +347,7 @@ public class CrossDriver<T1, T2, OT> implements Driver<CrossFunction<T1, T2, OT>
this.spillIter = spillVals;
final CrossFunction<T1, T2, OT> crosser = this.taskContext.getStub();
final Collector<OT> collector = this.taskContext.getOutputCollector();
final Collector<OT> collector = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut);
if (objectReuseEnabled) {
final T1 val1Reuse = serializer1.createInstance();
......@@ -372,8 +384,12 @@ public class CrossDriver<T1, T2, OT> implements Driver<CrossFunction<T1, T2, OT>
LOG.debug(this.taskContext.formatLogString("Running Cross with Nested-Loops: " +
"First input is inner (spilling) side, second input is outer side."));
}
final MutableObjectIterator<T1> in1 = this.taskContext.getInput(0);
final MutableObjectIterator<T2> in2 = this.taskContext.getInput(1);
final Counter numRecordsIn = taskContext.getMetricGroup().counter("numRecordsIn");
final Counter numRecordsOut = taskContext.getMetricGroup().counter("numRecordsOut");
final MutableObjectIterator<T1> in1 = new CountingMutableObjectIterator<>(this.taskContext.<T1>getInput(0), numRecordsIn);
final MutableObjectIterator<T2> in2 = new CountingMutableObjectIterator<>(this.taskContext.<T2>getInput(1), numRecordsIn);
final TypeSerializer<T1> serializer1 = this.taskContext.<T1>getInputSerializer(0).getSerializer();
final TypeSerializer<T2> serializer2 = this.taskContext.<T2>getInputSerializer(1).getSerializer();
......@@ -384,7 +400,7 @@ public class CrossDriver<T1, T2, OT> implements Driver<CrossFunction<T1, T2, OT>
this.spillIter = spillVals;
final CrossFunction<T1, T2, OT> crosser = this.taskContext.getStub();
final Collector<OT> collector = this.taskContext.getOutputCollector();
final Collector<OT> collector = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut);
if (objectReuseEnabled) {
final T1 val1Reuse = serializer1.createInstance();
......
......@@ -19,6 +19,7 @@
package org.apache.flink.runtime.operators;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.io.CleanupWhenUnsuccessful;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.io.RichOutputFormat;
......@@ -27,6 +28,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.Environment;
......@@ -104,8 +106,11 @@ public class DataSinkTask<IT> extends AbstractInvokable {
// --------------------------------------------------------------------
LOG.debug(getLogString("Starting data sink operator"));
RuntimeContext ctx = createRuntimeContext();
final Counter numRecordsIn = ctx.getMetricGroup().counter("numRecordsIn");
if(RichOutputFormat.class.isAssignableFrom(this.format.getClass())){
((RichOutputFormat) this.format).setRuntimeContext(createRuntimeContext());
((RichOutputFormat) this.format).setRuntimeContext(ctx);
LOG.debug(getLogString("Rich Sink detected. Initializing runtime context."));
}
......@@ -174,6 +179,7 @@ public class DataSinkTask<IT> extends AbstractInvokable {
// work!
while (!this.taskCanceled && ((record = input.next(record)) != null)) {
numRecordsIn.inc();
format.writeRecord(record);
}
} else {
......@@ -181,6 +187,7 @@ public class DataSinkTask<IT> extends AbstractInvokable {
// work!
while (!this.taskCanceled && ((record = input.next()) != null)) {
numRecordsIn.inc();
format.writeRecord(record);
}
}
......@@ -349,8 +356,6 @@ public class DataSinkTask<IT> extends AbstractInvokable {
inputReader.setReporter(reporter);
inputReader.setMetricGroup(getEnvironment().getMetricGroup().getIOMetricGroup());
this.inputTypeSerializerFactory = this.config.getInputSerializer(0, getUserCodeClassLoader());
@SuppressWarnings({ "rawtypes" })
final MutableObjectIterator<?> iter = new ReaderIterator(inputReader, this.inputTypeSerializerFactory.getSerializer());
......
......@@ -19,12 +19,14 @@
package org.apache.flink.runtime.operators;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.io.RichInputFormat;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.Environment;
......@@ -35,6 +37,7 @@ import org.apache.flink.runtime.operators.chaining.ChainedDriver;
import org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException;
import org.apache.flink.runtime.operators.util.DistributedRuntimeUDFContext;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.runtime.operators.util.metrics.CountingCollector;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -97,8 +100,12 @@ public class DataSourceTask<OT> extends AbstractInvokable {
// --------------------------------------------------------------------
LOG.debug(getLogString("Starting data source operator"));
RuntimeContext ctx = createRuntimeContext();
Counter splitCounter = ctx.getMetricGroup().counter("numSplitsProcessed");
Counter numRecordsOut = ctx.getMetricGroup().counter("numRecordsOut");
if (RichInputFormat.class.isAssignableFrom(this.format.getClass())) {
((RichInputFormat) this.format).setRuntimeContext(createRuntimeContext());
((RichInputFormat) this.format).setRuntimeContext(ctx);
LOG.debug(getLogString("Rich Source detected. Initializing runtime context."));
((RichInputFormat) this.format).openInputFormat();
LOG.debug(getLogString("Rich Source detected. Opening the InputFormat."));
......@@ -135,7 +142,7 @@ public class DataSourceTask<OT> extends AbstractInvokable {
LOG.debug(getLogString("Starting to read input from split " + split.toString()));
try {
final Collector<OT> output = this.output;
final Collector<OT> output = new CountingCollector<>(this.output, numRecordsOut);
if (objectReuseEnabled) {
OT reuse = serializer.createInstance();
......@@ -165,6 +172,7 @@ public class DataSourceTask<OT> extends AbstractInvokable {
// close. We close here such that a regular close throwing an exception marks a task as failed.
format.close();
}
splitCounter.inc();
} // end for all input splits
// close the collector. if it is a chaining task collector, it will close its chained tasks
......
......@@ -21,6 +21,8 @@ package org.apache.flink.runtime.operators;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.operators.util.metrics.CountingCollector;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
import org.slf4j.Logger;
......@@ -83,22 +85,26 @@ public class FlatMapDriver<IT, OT> implements Driver<FlatMapFunction<IT, OT>, OT
@Override
public void run() throws Exception {
final Counter numRecordsIn = this.taskContext.getMetricGroup().counter("numRecordsIn");
final Counter numRecordsOut = this.taskContext.getMetricGroup().counter("numRecordsOut");
// cache references on the stack
final MutableObjectIterator<IT> input = this.taskContext.getInput(0);
final FlatMapFunction<IT, OT> function = this.taskContext.getStub();
final Collector<OT> output = this.taskContext.getOutputCollector();
final Collector<OT> output = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut);
if (objectReuseEnabled) {
IT record = this.taskContext.<IT>getInputSerializer(0).getSerializer().createInstance();
while (this.running && ((record = input.next(record)) != null)) {
numRecordsIn.inc();
function.flatMap(record, output);
}
} else {
IT record;
while (this.running && ((record = input.next()) != null)) {
numRecordsIn.inc();
function.flatMap(record, output);
}
}
......
......@@ -19,6 +19,9 @@
package org.apache.flink.runtime.operators;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.operators.util.metrics.CountingCollector;
import org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flink.api.common.functions.GroupReduceFunction;
......@@ -89,9 +92,11 @@ public class GroupReduceDriver<IT, OT> implements Driver<GroupReduceFunction<IT,
if (config.getDriverStrategy() != DriverStrategy.SORTED_GROUP_REDUCE) {
throw new Exception("Unrecognized driver strategy for GroupReduce driver: " + config.getDriverStrategy().name());
}
final Counter numRecordsIn = this.taskContext.getMetricGroup().counter("numRecordsIn");
this.serializer = this.taskContext.<IT>getInputSerializer(0).getSerializer();
this.comparator = this.taskContext.getDriverComparator(0);
this.input = this.taskContext.getInput(0);
this.input = new CountingMutableObjectIterator<>(this.taskContext.<IT>getInput(0), numRecordsIn);
ExecutionConfig executionConfig = taskContext.getExecutionConfig();
this.objectReuseEnabled = executionConfig.isObjectReuseEnabled();
......@@ -106,10 +111,11 @@ public class GroupReduceDriver<IT, OT> implements Driver<GroupReduceFunction<IT,
if (LOG.isDebugEnabled()) {
LOG.debug(this.taskContext.formatLogString("GroupReducer preprocessing done. Running GroupReducer code."));
}
final Counter numRecordsOut = this.taskContext.getMetricGroup().counter("numRecordsOut");
// cache references on the stack
final GroupReduceFunction<IT, OT> stub = this.taskContext.getStub();
final Collector<OT> output = this.taskContext.getOutputCollector();
final Collector<OT> output = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut);
if (objectReuseEnabled) {
final ReusingKeyGroupedIterator<IT> iter = new ReusingKeyGroupedIterator<IT>(this.input, this.serializer, this.comparator);
......
......@@ -24,6 +24,7 @@ import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashJoinIterator;
......@@ -34,6 +35,8 @@ import org.apache.flink.runtime.operators.sort.NonReusingMergeInnerJoinIterator;
import org.apache.flink.runtime.operators.sort.ReusingMergeInnerJoinIterator;
import org.apache.flink.runtime.operators.util.JoinTaskIterator;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.runtime.operators.util.metrics.CountingCollector;
import org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
......@@ -84,6 +87,8 @@ public class JoinDriver<IT1, IT2, OT> implements Driver<FlatJoinFunction<IT1, IT
@Override
public void prepare() throws Exception{
final TaskConfig config = this.taskContext.getTaskConfig();
final Counter numRecordsIn = this.taskContext.getMetricGroup().counter("numRecordsIn");
// obtain task manager's memory manager and I/O manager
final MemoryManager memoryManager = this.taskContext.getMemoryManager();
......@@ -96,8 +101,8 @@ public class JoinDriver<IT1, IT2, OT> implements Driver<FlatJoinFunction<IT1, IT
// test minimum memory requirements
final DriverStrategy ls = config.getDriverStrategy();
final MutableObjectIterator<IT1> in1 = this.taskContext.getInput(0);
final MutableObjectIterator<IT2> in2 = this.taskContext.getInput(1);
final MutableObjectIterator<IT1> in1 = new CountingMutableObjectIterator<>(this.taskContext.<IT1>getInput(0), numRecordsIn);
final MutableObjectIterator<IT2> in2 = new CountingMutableObjectIterator<>(this.taskContext.<IT2>getInput(1), numRecordsIn);
// get the key positions and types
final TypeSerializer<IT1> serializer1 = this.taskContext.<IT1>getInputSerializer(0).getSerializer();
......@@ -209,8 +214,9 @@ public class JoinDriver<IT1, IT2, OT> implements Driver<FlatJoinFunction<IT1, IT
@Override
public void run() throws Exception {
final Counter numRecordsOut = this.taskContext.getMetricGroup().counter("numRecordsOut");
final FlatJoinFunction<IT1, IT2, OT> joinStub = this.taskContext.getStub();
final Collector<OT> collector = this.taskContext.getOutputCollector();
final Collector<OT> collector = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut);
final JoinTaskIterator<IT1, IT2, OT> joinIterator = this.joinIterator;
while (this.running && joinIterator.callWithNextKey(joinStub, collector));
......
......@@ -20,6 +20,8 @@ package org.apache.flink.runtime.operators;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.operators.util.metrics.CountingCollector;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
......@@ -78,15 +80,18 @@ public class MapDriver<IT, OT> implements Driver<MapFunction<IT, OT>, OT> {
@Override
public void run() throws Exception {
final Counter numRecordsIn = this.taskContext.getMetricGroup().counter("numRecordsIn");
final Counter numRecordsOut = this.taskContext.getMetricGroup().counter("numRecordsOut");
// cache references on the stack
final MutableObjectIterator<IT> input = this.taskContext.getInput(0);
final MapFunction<IT, OT> function = this.taskContext.getStub();
final Collector<OT> output = this.taskContext.getOutputCollector();
final Collector<OT> output = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut);
if (objectReuseEnabled) {
IT record = this.taskContext.<IT>getInputSerializer(0).getSerializer().createInstance();
while (this.running && ((record = input.next(record)) != null)) {
numRecordsIn.inc();
output.collect(function.map(record));
}
}
......@@ -94,6 +99,7 @@ public class MapDriver<IT, OT> implements Driver<MapFunction<IT, OT>, OT> {
IT record = null;
while (this.running && ((record = input.next()) != null)) {
numRecordsIn.inc();
output.collect(function.map(record));
}
}
......
......@@ -21,6 +21,9 @@ package org.apache.flink.runtime.operators;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.MapPartitionFunction;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.operators.util.metrics.CountingCollector;
import org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator;
import org.apache.flink.runtime.util.NonReusingMutableToRegularIteratorWrapper;
import org.apache.flink.runtime.util.ReusingMutableToRegularIteratorWrapper;
import org.apache.flink.util.Collector;
......@@ -83,10 +86,12 @@ public class MapPartitionDriver<IT, OT> implements Driver<MapPartitionFunction<I
@Override
public void run() throws Exception {
final Counter numRecordsIn = this.taskContext.getMetricGroup().counter("numRecordsIn");
final Counter numRecordsOut = this.taskContext.getMetricGroup().counter("numRecordsOut");
// cache references on the stack
final MutableObjectIterator<IT> input = this.taskContext.getInput(0);
final MutableObjectIterator<IT> input = new CountingMutableObjectIterator<>(this.taskContext.<IT>getInput(0), numRecordsIn);
final MapPartitionFunction<IT, OT> function = this.taskContext.getStub();
final Collector<OT> output = this.taskContext.getOutputCollector();
final Collector<OT> output = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut);
if (objectReuseEnabled) {
final ReusingMutableToRegularIteratorWrapper<IT> inIter = new ReusingMutableToRegularIteratorWrapper<IT>(input, this.taskContext.<IT>getInputSerializer(0).getSerializer());
......
......@@ -61,6 +61,7 @@ public class NoOpChainedDriver<IT> extends ChainedDriver<IT, IT> {
@Override
public void collect(IT record) {
try {
this.numRecordsIn.inc();
this.outputCollector.collect(record);
} catch (Exception ex) {
throw new ExceptionInChainedStubException(this.taskName, ex);
......
......@@ -21,6 +21,8 @@ package org.apache.flink.runtime.operators;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.operators.util.metrics.CountingCollector;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
import org.slf4j.Logger;
......@@ -75,18 +77,22 @@ public class NoOpDriver<T> implements Driver<AbstractRichFunction, T> {
@Override
public void run() throws Exception {
// cache references on the stack
final Counter numRecordsIn = this.taskContext.getMetricGroup().counter("numRecordsIn");
final Counter numRecordsOut = this.taskContext.getMetricGroup().counter("numRecordsOut");
final MutableObjectIterator<T> input = this.taskContext.getInput(0);
final Collector<T> output = this.taskContext.getOutputCollector();
final Collector<T> output = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut);
if (objectReuseEnabled) {
T record = this.taskContext.<T>getInputSerializer(0).getSerializer().createInstance();
while (this.running && ((record = input.next(record)) != null)) {
numRecordsIn.inc();
output.collect(record);
}
} else {
T record;
while (this.running && ((record = input.next()) != null)) {
numRecordsIn.inc();
output.collect(record);
}
......
......@@ -23,6 +23,8 @@ import java.io.IOException;
import java.util.List;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.operators.util.metrics.CountingCollector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flink.api.common.functions.ReduceFunction;
......@@ -104,13 +106,15 @@ public class ReduceCombineDriver<T> implements Driver<ReduceFunction<T>, T> {
if (this.taskContext.getTaskConfig().getDriverStrategy() != DriverStrategy.SORTED_PARTIAL_REDUCE) {
throw new Exception("Invalid strategy " + this.taskContext.getTaskConfig().getDriverStrategy() + " for reduce combiner.");
}
final Counter numRecordsOut = this.taskContext.getMetricGroup().counter("numRecordsOut");
// instantiate the serializer / comparator
final TypeSerializerFactory<T> serializerFactory = this.taskContext.getInputSerializer(0);
this.comparator = this.taskContext.getDriverComparator(0);
this.serializer = serializerFactory.getSerializer();
this.reducer = this.taskContext.getStub();
this.output = this.taskContext.getOutputCollector();
this.output = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut);
MemoryManager memManager = this.taskContext.getMemoryManager();
final int numMemoryPages = memManager.computeNumberOfPages(
......@@ -140,6 +144,8 @@ public class ReduceCombineDriver<T> implements Driver<ReduceFunction<T>, T> {
LOG.debug("Combiner starting.");
}
final Counter numRecordsIn = this.taskContext.getMetricGroup().counter("numRecordsIn");
final MutableObjectIterator<T> in = this.taskContext.getInput(0);
final TypeSerializer<T> serializer = this.serializer;
......@@ -147,6 +153,7 @@ public class ReduceCombineDriver<T> implements Driver<ReduceFunction<T>, T> {
T value = serializer.createInstance();
while (running && (value = in.next(value)) != null) {
numRecordsIn.inc();
// try writing to the sorter first
if (this.sorter.write(value)) {
......@@ -166,6 +173,7 @@ public class ReduceCombineDriver<T> implements Driver<ReduceFunction<T>, T> {
else {
T value;
while (running && (value = in.next()) != null) {
numRecordsIn.inc();
// try writing to the sorter first
if (this.sorter.write(value)) {
......
......@@ -20,6 +20,8 @@
package org.apache.flink.runtime.operators;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.operators.util.metrics.CountingCollector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flink.api.common.functions.ReduceFunction;
......@@ -106,6 +108,9 @@ public class ReduceDriver<T> implements Driver<ReduceFunction<T>, T> {
LOG.debug(this.taskContext.formatLogString("Reducer preprocessing done. Running Reducer code."));
}
final Counter numRecordsIn = this.taskContext.getMetricGroup().counter("numRecordsIn");
final Counter numRecordsOut = this.taskContext.getMetricGroup().counter("numRecordsOut");
// cache references on the stack
final MutableObjectIterator<T> input = this.input;
final TypeSerializer<T> serializer = this.serializer;
......@@ -113,7 +118,7 @@ public class ReduceDriver<T> implements Driver<ReduceFunction<T>, T> {
final ReduceFunction<T> function = this.taskContext.getStub();
final Collector<T> output = this.taskContext.getOutputCollector();
final Collector<T> output = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut);
if (objectReuseEnabled) {
// We only need two objects. The first reference stores results and is
......@@ -128,10 +133,12 @@ public class ReduceDriver<T> implements Driver<ReduceFunction<T>, T> {
// iterate over key groups
while (this.running && value != null) {
numRecordsIn.inc();
comparator.setReference(value);
// iterate within a key group
while ((reuse2 = input.next(reuse2)) != null) {
numRecordsIn.inc();
if (comparator.equalToReference(reuse2)) {
// same group, reduce
value = function.reduce(value, reuse2);
......@@ -163,11 +170,13 @@ public class ReduceDriver<T> implements Driver<ReduceFunction<T>, T> {
// iterate over key groups
while (this.running && value != null) {
numRecordsIn.inc();
comparator.setReference(value);
T res = value;
// iterate within a key group
while ((value = input.next()) != null) {
numRecordsIn.inc();
if (comparator.equalToReference(value)) {
// same group, reduce
res = function.reduce(res, value);
......
......@@ -19,6 +19,8 @@
package org.apache.flink.runtime.operators;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.operators.util.metrics.CountingCollector;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
......@@ -58,18 +60,22 @@ public class UnionWithTempOperator<T> implements Driver<Function, T> {
@Override
public void run() throws Exception {
final Counter numRecordsIn = this.taskContext.getMetricGroup().counter("numRecordsIn");
final Counter numRecordsOut = this.taskContext.getMetricGroup().counter("numRecordsOut");
final Collector<T> output = this.taskContext.getOutputCollector();
final Collector<T> output = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut);
T reuse = this.taskContext.<T>getInputSerializer(STREAMED_INPUT).getSerializer().createInstance();
T record;
final MutableObjectIterator<T> input = this.taskContext.getInput(STREAMED_INPUT);
while (this.running && ((record = input.next(reuse)) != null)) {
numRecordsIn.inc();
output.collect(record);
}
final MutableObjectIterator<T> cache = this.taskContext.getInput(CACHED_INPUT);
while (this.running && ((record = cache.next(reuse)) != null)) {
numRecordsIn.inc();
output.collect(record);
}
}
......
......@@ -86,6 +86,7 @@ public class ChainedAllReduceDriver<IT> extends ChainedDriver<IT, IT> {
// --------------------------------------------------------------------------------------------
@Override
public void collect(IT record) {
numRecordsIn.inc();
try {
if (base == null) {
base = objectReuseEnabled ? record : serializer.copy(record);
......
......@@ -22,12 +22,14 @@ import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.operators.BatchTask;
import org.apache.flink.runtime.operators.util.DistributedRuntimeUDFContext;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.runtime.operators.util.metrics.CountingCollector;
import org.apache.flink.util.Collector;
import java.util.Map;
......@@ -53,6 +55,10 @@ public abstract class ChainedDriver<IT, OT> implements Collector<IT> {
protected boolean objectReuseEnabled = false;
protected MetricGroup metrics;
protected Counter numRecordsIn;
protected Counter numRecordsOut;
public void setup(TaskConfig config, String taskName, Collector<OT> outputCollector,
......@@ -61,9 +67,11 @@ public abstract class ChainedDriver<IT, OT> implements Collector<IT> {
{
this.config = config;
this.taskName = taskName;
this.outputCollector = outputCollector;
this.userCodeClassLoader = userCodeClassLoader;
this.metrics = parent.getEnvironment().getMetricGroup().addOperator(taskName);
this.numRecordsIn = this.metrics.counter("numRecordsIn");
this.numRecordsOut = this.metrics.counter("numRecordsOut");
this.outputCollector = new CountingCollector<>(outputCollector, numRecordsOut);
Environment env = parent.getEnvironment();
......@@ -103,7 +111,7 @@ public abstract class ChainedDriver<IT, OT> implements Collector<IT> {
@SuppressWarnings("unchecked")
public void setOutputCollector(Collector<?> outputCollector) {
this.outputCollector = (Collector<OT>) outputCollector;
this.outputCollector = new CountingCollector<>((Collector<OT>) outputCollector, numRecordsOut);
}
public Collector<OT> getOutputCollector() {
......
......@@ -76,6 +76,7 @@ public class ChainedFlatMapDriver<IT, OT> extends ChainedDriver<IT, OT> {
@Override
public void collect(IT record) {
try {
this.numRecordsIn.inc();
this.mapper.flatMap(record, this.outputCollector);
} catch (Exception ex) {
throw new ExceptionInChainedStubException(this.taskName, ex);
......
......@@ -75,6 +75,7 @@ public class ChainedMapDriver<IT, OT> extends ChainedDriver<IT, OT> {
@Override
public void collect(IT record) {
try {
this.numRecordsIn.inc();
this.outputCollector.collect(this.mapper.map(record));
} catch (Exception ex) {
throw new ExceptionInChainedStubException(this.taskName, ex);
......
......@@ -59,6 +59,7 @@ public class ChainedTerminationCriterionDriver<IT, OT> extends ChainedDriver<IT,
@Override
public void collect(IT record) {
numRecordsIn.inc();
agg.aggregate(1);
}
......
......@@ -167,6 +167,7 @@ public class GroupCombineChainedDriver<IN, OUT> extends ChainedDriver<IN, OUT> {
@Override
public void collect(IN record) {
numRecordsIn.inc();
// try writing to the sorter first
try {
if (this.sorter.write(record)) {
......
......@@ -163,6 +163,7 @@ public class SynchronousChainedCombineDriver<IN, OUT> extends ChainedDriver<IN,
@Override
public void collect(IN record) {
this.numRecordsIn.inc();
// try writing to the sorter first
try {
if (this.sorter.write(record)) {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.operators.util.metrics;
import org.apache.flink.metrics.Counter;
import org.apache.flink.util.Collector;
public class CountingCollector<OUT> implements Collector<OUT> {
private final Collector<OUT> collector;
private final Counter numRecordsOut;
public CountingCollector(Collector<OUT> collector, Counter numRecordsOut) {
this.collector = collector;
this.numRecordsOut = numRecordsOut;
}
@Override
public void collect(OUT record) {
this.numRecordsOut.inc();
this.collector.collect(record);
}
@Override
public void close() {
this.collector.close();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.operators.util.metrics;
import org.apache.flink.metrics.Counter;
import java.util.Iterator;
public class CountingIterable<IN> implements Iterable<IN> {
private final Iterable<IN> iterable;
private final Counter numRecordsIn;
public CountingIterable(Iterable<IN> iterable, Counter numRecordsIn) {
this.iterable = iterable;
this.numRecordsIn = numRecordsIn;
}
@Override
public Iterator<IN> iterator() {
return new CountingIterator<>(this.iterable.iterator(), this.numRecordsIn);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.operators.util.metrics;
import org.apache.flink.metrics.Counter;
import java.util.Iterator;
public class CountingIterator<IN> implements Iterator<IN> {
private final Iterator<IN> iterator;
private final Counter numRecordsIn;
public CountingIterator(Iterator<IN> iterator, Counter numRecordsIn) {
this.iterator = iterator;
this.numRecordsIn = numRecordsIn;
}
@Override
public boolean hasNext() {
return this.iterator.hasNext();
}
@Override
public IN next() {
numRecordsIn.inc();
return this.iterator.next();
}
@Override
public void remove() {
this.iterator.remove();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.operators.util.metrics;
import org.apache.flink.metrics.Counter;
import org.apache.flink.util.MutableObjectIterator;
import java.io.IOException;
public class CountingMutableObjectIterator<IN> implements MutableObjectIterator<IN> {
private final MutableObjectIterator<IN> iterator;
private final Counter numRecordsIn;
public CountingMutableObjectIterator(MutableObjectIterator<IN> iterator, Counter numRecordsIn) {
this.iterator = iterator;
this.numRecordsIn = numRecordsIn;
}
@Override
public IN next(IN reuse) throws IOException {
IN next = iterator.next(reuse);
if (next != null) {
numRecordsIn.inc();
}
return next;
}
@Override
public IN next() throws IOException {
IN next = iterator.next();
if (next != null) {
numRecordsIn.inc();
}
return next;
}
}
......@@ -319,7 +319,8 @@ public class Task implements Runnable {
for (int i = 0; i < this.inputGates.length; i++) {
SingleInputGate gate = SingleInputGate.create(
taskNameWithSubtaskAndId, jobId, executionId, consumedPartitions.get(i), networkEnvironment);
taskNameWithSubtaskAndId, jobId, executionId, consumedPartitions.get(i), networkEnvironment,
metricGroup.getIOMetricGroup());
this.inputGates[i] = gate;
inputGatesById.put(gate.getConsumedResultId(), gate);
......
......@@ -20,7 +20,6 @@ package org.apache.flink.runtime.io.network.api.reader;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.metrics.groups.IOMetricGroup;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
......@@ -190,9 +189,5 @@ public class AbstractReaderTest {
public void setReporter(AccumulatorRegistry.Reporter reporter) {
}
@Override
public void setMetricGroup(IOMetricGroup metrics) {
}
}
}
......@@ -18,6 +18,7 @@
package org.apache.flink.runtime.io.network.partition.consumer;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
......@@ -118,7 +119,7 @@ public class InputChannelTest {
ResultPartitionID partitionId,
Tuple2<Integer, Integer> initialAndMaxBackoff) {
super(inputGate, channelIndex, partitionId, initialAndMaxBackoff);
super(inputGate, channelIndex, partitionId, initialAndMaxBackoff, new Counter());
}
@Override
......
......@@ -43,6 +43,7 @@ import org.apache.flink.runtime.io.network.util.TestProducerSource;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
......@@ -271,7 +272,8 @@ public class LocalInputChannelTest {
new ResultPartitionID(),
partitionManager,
mock(TaskEventDispatcher.class),
initialAndMaxRequestBackoff);
initialAndMaxRequestBackoff,
new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
}
/**
......@@ -346,7 +348,8 @@ public class LocalInputChannelTest {
new IntermediateDataSetID(),
subpartitionIndex,
numberOfInputChannels,
mock(PartitionStateChecker.class));
mock(PartitionStateChecker.class),
new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
// Set buffer pool
inputGate.setBufferPool(bufferPool);
......@@ -360,7 +363,8 @@ public class LocalInputChannelTest {
i,
consumedPartitionIds[i],
partitionManager,
taskEventDispatcher));
taskEventDispatcher,
new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()));
}
this.numberOfInputChannels = numberOfInputChannels;
......
......@@ -26,6 +26,7 @@ import org.apache.flink.runtime.io.network.netty.PartitionRequestClient;
import org.apache.flink.runtime.io.network.partition.ProducerFailedException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.util.TestBufferFactory;
import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
import org.junit.Test;
import scala.Tuple2;
......@@ -247,7 +248,8 @@ public class RemoteInputChannelTest {
0,
partitionId,
mock(ConnectionID.class),
connectionManager);
connectionManager,
new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
ch.onFailedPartitionRequest();
......@@ -266,7 +268,8 @@ public class RemoteInputChannelTest {
0,
new ResultPartitionID(),
mock(ConnectionID.class),
connManager);
connManager,
new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
ch.onError(new ProducerFailedException(new RuntimeException("Expected test exception.")));
......@@ -301,6 +304,7 @@ public class RemoteInputChannelTest {
new ResultPartitionID(),
mock(ConnectionID.class),
connectionManager,
initialAndMaxRequestBackoff);
initialAndMaxRequestBackoff,
new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
}
}
......@@ -38,6 +38,7 @@ import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.util.TestTaskEvent;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
import org.junit.Test;
import scala.Tuple2;
......@@ -66,7 +67,7 @@ public class SingleInputGateTest {
public void testBasicGetNextLogic() throws Exception {
// Setup
final SingleInputGate inputGate = new SingleInputGate(
"Test Task Name", new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 2, mock(PartitionStateChecker.class));
"Test Task Name", new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 2, mock(PartitionStateChecker.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
final TestInputChannel[] inputChannels = new TestInputChannel[]{
new TestInputChannel(inputGate, 0),
......@@ -113,7 +114,7 @@ public class SingleInputGateTest {
// Setup reader with one local and one unknown input channel
final IntermediateDataSetID resultId = new IntermediateDataSetID();
final SingleInputGate inputGate = new SingleInputGate("Test Task Name", new JobID(), new ExecutionAttemptID(), resultId, 0, 2, mock(PartitionStateChecker.class));
final SingleInputGate inputGate = new SingleInputGate("Test Task Name", new JobID(), new ExecutionAttemptID(), resultId, 0, 2, mock(PartitionStateChecker.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
final BufferPool bufferPool = mock(BufferPool.class);
when(bufferPool.getNumberOfRequiredMemorySegments()).thenReturn(2);
......@@ -122,12 +123,12 @@ public class SingleInputGateTest {
// Local
ResultPartitionID localPartitionId = new ResultPartitionID(new IntermediateResultPartitionID(), new ExecutionAttemptID());
InputChannel local = new LocalInputChannel(inputGate, 0, localPartitionId, partitionManager, taskEventDispatcher);
InputChannel local = new LocalInputChannel(inputGate, 0, localPartitionId, partitionManager, taskEventDispatcher, new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
// Unknown
ResultPartitionID unknownPartitionId = new ResultPartitionID(new IntermediateResultPartitionID(), new ExecutionAttemptID());
InputChannel unknown = new UnknownInputChannel(inputGate, 1, unknownPartitionId, partitionManager, taskEventDispatcher, mock(ConnectionManager.class), new Tuple2<Integer, Integer>(0, 0));
InputChannel unknown = new UnknownInputChannel(inputGate, 1, unknownPartitionId, partitionManager, taskEventDispatcher, mock(ConnectionManager.class), new Tuple2<Integer, Integer>(0, 0), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
// Set channels
inputGate.setInputChannel(localPartitionId.getPartitionId(), local);
......@@ -168,7 +169,7 @@ public class SingleInputGateTest {
new IntermediateDataSetID(),
0,
1,
mock(PartitionStateChecker.class));
mock(PartitionStateChecker.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
......@@ -179,7 +180,7 @@ public class SingleInputGateTest {
partitionManager,
new TaskEventDispatcher(),
new LocalConnectionManager(),
new Tuple2<>(0, 0));
new Tuple2<Integer, Integer>(0, 0), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
inputGate.setInputChannel(unknown.partitionId.getPartitionId(), unknown);
......@@ -208,7 +209,8 @@ public class SingleInputGateTest {
new IntermediateDataSetID(),
0,
1,
mock(PartitionStateChecker.class));
mock(PartitionStateChecker.class),
new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
InputChannel unknown = new UnknownInputChannel(
inputGate,
......@@ -217,7 +219,8 @@ public class SingleInputGateTest {
new ResultPartitionManager(),
new TaskEventDispatcher(),
new LocalConnectionManager(),
new Tuple2<>(0, 0));
new Tuple2<Integer, Integer>(0, 0),
new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
inputGate.setInputChannel(unknown.partitionId.getPartitionId(), unknown);
......
......@@ -24,6 +24,7 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
import org.apache.flink.runtime.util.event.EventListener;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
......@@ -59,7 +60,7 @@ public class TestSingleInputGate {
checkArgument(numberOfInputChannels >= 1);
SingleInputGate realGate = new SingleInputGate(
"Test Task Name", new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, numberOfInputChannels, mock(PartitionStateChecker.class));
"Test Task Name", new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, numberOfInputChannels, mock(PartitionStateChecker.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
this.inputGate = spy(realGate);
......
......@@ -23,6 +23,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
......@@ -43,8 +44,8 @@ public class UnionInputGateTest {
public void testBasicGetNextLogic() throws Exception {
// Setup
final String testTaskName = "Test Task";
final SingleInputGate ig1 = new SingleInputGate(testTaskName, new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 3, mock(PartitionStateChecker.class));
final SingleInputGate ig2 = new SingleInputGate(testTaskName, new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 5, mock(PartitionStateChecker.class));
final SingleInputGate ig1 = new SingleInputGate(testTaskName, new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 3, mock(PartitionStateChecker.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
final SingleInputGate ig2 = new SingleInputGate(testTaskName, new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 5, mock(PartitionStateChecker.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
final UnionInputGate union = new UnionInputGate(new SingleInputGate[]{ig1, ig2});
......
......@@ -23,6 +23,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.MetricRegistry;
import org.apache.flink.metrics.groups.IOMetricGroup;
import org.apache.flink.metrics.groups.JobMetricGroup;
import org.apache.flink.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.metrics.groups.TaskMetricGroup;
......@@ -65,4 +66,19 @@ public class UnregisteredTaskMetricsGroup extends TaskMetricGroup {
super(EMPTY_REGISTRY, new DummyTaskManagerMetricsGroup(), new JobID(), "testjob");
}
}
public static class DummyIOMetricGroup extends IOMetricGroup {
public DummyIOMetricGroup() {
super(EMPTY_REGISTRY, new UnregisteredTaskMetricsGroup());
}
@Override
protected void addMetric(String name, Metric metric) {
}
@Override
public MetricGroup addGroup(String name) {
return new UnregisteredMetricsGroup();
}
}
}
......@@ -25,10 +25,12 @@ import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.VoidSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.runtime.state.KvStateSnapshot;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.operators.Triggerable;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
......@@ -102,10 +104,10 @@ public abstract class AbstractStreamOperator<OUT>
public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output) {
this.container = containingTask;
this.config = config;
this.output = output;
String operatorName = containingTask.getEnvironment().getTaskInfo().getTaskName().split("->")[config.getChainIndex()].trim();
this.metrics = container.getEnvironment().getMetricGroup().addOperator(operatorName);
this.output = new CountingOutput(output, this.metrics.counter("numRecordsOut"));
this.runtimeContext = new StreamingRuntimeContext(this, container.getEnvironment(), container.getAccumulatorMap());
stateKeySelector1 = config.getStatePartitioner(0, getUserCodeClassloader());
......@@ -334,4 +336,30 @@ public abstract class AbstractStreamOperator<OUT>
public void disableInputCopy() {
this.inputCopyDisabled = true;
}
public class CountingOutput implements Output<StreamRecord<OUT>> {
private final Output<StreamRecord<OUT>> output;
private final Counter numRecordsOut;
public CountingOutput(Output<StreamRecord<OUT>> output, Counter counter) {
this.output = output;
this.numRecordsOut = counter;
}
@Override
public void emitWatermark(Watermark mark) {
output.emitWatermark(mark);
}
@Override
public void collect(StreamRecord<OUT> record) {
numRecordsOut.inc();
output.collect(record);
}
@Override
public void close() {
output.close();
}
}
}
......@@ -51,6 +51,10 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
this.chainingStrategy = ChainingStrategy.HEAD;
}
public void run(final Object lockingObject) throws Exception {
run(lockingObject, output);
}
public void run(final Object lockingObject, final Output<StreamRecord<OUT>> collector) throws Exception {
final TimeCharacteristic timeCharacteristic = getOperatorConfig().getTimeCharacteristic();
......
......@@ -22,6 +22,8 @@ import java.io.IOException;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.groups.IOMetricGroup;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.event.AbstractEvent;
......@@ -81,6 +83,8 @@ public class StreamInputProcessor<IN> {
private final DeserializationDelegate<StreamElement> deserializationDelegate;
private Counter numRecordsIn;
@SuppressWarnings("unchecked")
public StreamInputProcessor(InputGate[] inputGates, TypeSerializer<IN> inputSerializer,
EventListener<CheckpointBarrier> checkpointListener,
......@@ -133,6 +137,9 @@ public class StreamInputProcessor<IN> {
if (isFinished) {
return false;
}
if (numRecordsIn == null) {
numRecordsIn = streamOperator.getMetricGroup().counter("numRecordsIn");
}
while (true) {
if (currentRecordDeserializer != null) {
......@@ -166,6 +173,7 @@ public class StreamInputProcessor<IN> {
// now we can do the actual processing
StreamRecord<IN> record = recordOrWatermark.asRecord();
synchronized (lock) {
numRecordsIn.inc();
streamOperator.setKeyContextElement1(record);
streamOperator.processElement(record);
}
......@@ -211,9 +219,12 @@ public class StreamInputProcessor<IN> {
* @param metrics metric group
*/
public void setMetricGroup(IOMetricGroup metrics) {
for (RecordDeserializer<?> deserializer : recordDeserializers) {
deserializer.instantiateMetrics(metrics);
}
metrics.gauge("currentLowWatermark", new Gauge<Long>() {
@Override
public Long getValue() {
return lastEmittedWatermark;
}
});
}
public void cleanup() throws IOException {
......
......@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.io;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.groups.IOMetricGroup;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.event.AbstractEvent;
......@@ -287,9 +288,12 @@ public class StreamTwoInputProcessor<IN1, IN2> {
* @param metrics metric group
*/
public void setMetricGroup(IOMetricGroup metrics) {
for (RecordDeserializer<?> deserializer : recordDeserializers) {
deserializer.instantiateMetrics(metrics);
}
metrics.gauge("currentLowWatermark", new Gauge<Long>() {
@Override
public Long getValue() {
return Math.min(lastEmittedWatermark1, lastEmittedWatermark2);
}
});
}
public void cleanup() throws IOException {
......
......@@ -26,6 +26,7 @@ import java.util.Map;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
......@@ -298,14 +299,17 @@ public class OperatorChain<OUT> {
private static class ChainingOutput<T> implements Output<StreamRecord<T>> {
protected final OneInputStreamOperator<T, ?> operator;
protected final Counter numRecordsIn;
public ChainingOutput(OneInputStreamOperator<T, ?> operator) {
this.operator = operator;
this.numRecordsIn = operator.getMetricGroup().counter("numRecordsIn");
}
@Override
public void collect(StreamRecord<T> record) {
try {
numRecordsIn.inc();
operator.setKeyContextElement1(record);
operator.processElement(record);
}
......@@ -347,6 +351,7 @@ public class OperatorChain<OUT> {
@Override
public void collect(StreamRecord<T> record) {
try {
numRecordsIn.inc();
StreamRecord<T> copy = record.copy(serializer.copy(record.getValue()));
operator.setKeyContextElement1(copy);
operator.processElement(copy);
......
......@@ -53,7 +53,7 @@ public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends S
@Override
protected void run() throws Exception {
headOperator.run(getCheckpointLock(), getHeadOutput());
headOperator.run(getCheckpointLock());
}
@Override
......
......@@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.flink.annotation.Internal;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.BlockingQueueBroker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
......@@ -56,13 +57,27 @@ public class StreamIterationTail<IN> extends OneInputStreamTask<IN, IN> {
LOG.info("Iteration tail {} acquired feedback queue {}", getName(), brokerID);
this.headOperator = new RecordPusher<>(dataChannel, iterationWaitTime);
this.headOperator = new RecordPusher<>();
this.headOperator.setup(this, getConfiguration(), new IterationTailOutput<>(dataChannel, iterationWaitTime));
}
private static class RecordPusher<IN> extends AbstractStreamOperator<IN> implements OneInputStreamOperator<IN, IN> {
private static final long serialVersionUID = 1L;
@Override
public void processElement(StreamRecord<IN> record) throws Exception {
output.collect(record);
}
@Override
public void processWatermark(Watermark mark) {
// ignore
}
}
private static class IterationTailOutput<IN> implements Output<StreamRecord<IN>> {
@SuppressWarnings("NonSerializableFieldInSerializableClass")
private final BlockingQueue<StreamRecord<IN>> dataChannel;
......@@ -70,25 +85,32 @@ public class StreamIterationTail<IN> extends OneInputStreamTask<IN, IN> {
private final boolean shouldWait;
RecordPusher(BlockingQueue<StreamRecord<IN>> dataChannel, long iterationWaitTime) {
IterationTailOutput(BlockingQueue<StreamRecord<IN>> dataChannel, long iterationWaitTime) {
this.dataChannel = dataChannel;
this.iterationWaitTime = iterationWaitTime;
this.shouldWait = iterationWaitTime > 0;
}
@Override
public void processElement(StreamRecord<IN> record) throws Exception {
if (shouldWait) {
dataChannel.offer(record, iterationWaitTime, TimeUnit.MILLISECONDS);
}
else {
dataChannel.put(record);
public void emitWatermark(Watermark mark) {
}
@Override
public void collect(StreamRecord<IN> record) {
try {
if (shouldWait) {
dataChannel.offer(record, iterationWaitTime, TimeUnit.MILLISECONDS);
}
else {
dataChannel.put(record);
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
@Override
public void processWatermark(Watermark mark) {
// ignore
public void close() {
}
}
}
......@@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
......@@ -154,6 +155,8 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
private long recoveryTimestamp;
private long lastCheckpointSize = 0;
// ------------------------------------------------------------------------
// Life cycle methods for specific implementations
// ------------------------------------------------------------------------
......@@ -194,6 +197,13 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
// allow trigger tasks to be removed if all timers for that timestamp are removed by user
timerService.setRemoveOnCancelPolicy(true);
getEnvironment().getMetricGroup().gauge("lastCheckpointSize", new Gauge<Long>() {
@Override
public Long getValue() {
return StreamTask.this.lastCheckpointSize;
}
});
// task specific initialization
init();
......@@ -538,6 +548,7 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
if (allStates.isEmpty()) {
getEnvironment().acknowledgeCheckpoint(checkpointId);
} else if (!hasAsyncStates) {
this.lastCheckpointSize = allStates.getStateSize();
getEnvironment().acknowledgeCheckpoint(checkpointId, allStates);
} else {
// start a Thread that does the asynchronous materialization and
......@@ -572,6 +583,7 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
}
}
StreamTaskStateList allStates = new StreamTaskStateList(states);
StreamTask.this.lastCheckpointSize = allStates.getStateSize();
getEnvironment().acknowledgeCheckpoint(checkpointId, allStates);
LOG.debug("Finished asynchronous checkpoints for checkpoint {} on task {}", checkpointId, getName());
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册