提交 344fe94d 编写于 作者: Z zhuhaifengleon 提交者: Stephan Ewen

[FLINK-4923] [metrics] Expose Task's input/output buffer queue lengths and...

[FLINK-4923] [metrics] Expose Task's input/output buffer queue lengths and bufferPool usage as a metrics

This closes #2693
上级 5b54009e
...@@ -220,4 +220,9 @@ class PipelinedSubpartition extends ResultSubpartition { ...@@ -220,4 +220,9 @@ class PipelinedSubpartition extends ResultSubpartition {
throw new IllegalStateException("Already registered listener."); throw new IllegalStateException("Already registered listener.");
} }
} }
@Override
public int getNumberOfQueuedBuffers() {
return buffers.size();
}
} }
...@@ -220,6 +220,10 @@ public class ResultPartition implements BufferPoolOwner { ...@@ -220,6 +220,10 @@ public class ResultPartition implements BufferPoolOwner {
return bufferPool; return bufferPool;
} }
public BufferPool getBufferPool() {
return bufferPool;
}
public int getTotalNumberOfBuffers() { public int getTotalNumberOfBuffers() {
return totalNumberOfBuffers; return totalNumberOfBuffers;
} }
...@@ -228,6 +232,16 @@ public class ResultPartition implements BufferPoolOwner { ...@@ -228,6 +232,16 @@ public class ResultPartition implements BufferPoolOwner {
return totalNumberOfBytes; return totalNumberOfBytes;
} }
public int getNumberOfQueuedBuffers() {
int totalBuffers = 0;
for (ResultSubpartition subpartition : subpartitions) {
totalBuffers += subpartition.getNumberOfQueuedBuffers();
}
return totalBuffers;
}
// ------------------------------------------------------------------------ // ------------------------------------------------------------------------
/** /**
......
...@@ -83,4 +83,6 @@ public abstract class ResultSubpartition { ...@@ -83,4 +83,6 @@ public abstract class ResultSubpartition {
abstract public boolean isReleased(); abstract public boolean isReleased();
abstract public int getNumberOfQueuedBuffers();
} }
...@@ -227,4 +227,9 @@ class SpillableSubpartition extends ResultSubpartition { ...@@ -227,4 +227,9 @@ class SpillableSubpartition extends ResultSubpartition {
getTotalNumberOfBuffers(), getTotalNumberOfBytes(), isFinished, readView != null, getTotalNumberOfBuffers(), getTotalNumberOfBytes(), isFinished, readView != null,
spillWriter != null); spillWriter != null);
} }
@Override
public int getNumberOfQueuedBuffers() {
return buffers.size();
}
} }
...@@ -212,6 +212,10 @@ public class SingleInputGate implements InputGate { ...@@ -212,6 +212,10 @@ public class SingleInputGate implements InputGate {
return bufferPool; return bufferPool;
} }
public BufferPool getBufferPool() {
return bufferPool;
}
@Override @Override
public int getPageSize() { public int getPageSize() {
if (bufferPool != null) { if (bufferPool != null) {
...@@ -222,6 +226,26 @@ public class SingleInputGate implements InputGate { ...@@ -222,6 +226,26 @@ public class SingleInputGate implements InputGate {
} }
} }
public int getNumberOfQueuedBuffers() {
// re-try 3 times, if fails, return 0 for "unknown"
for (int retry = 0; retry < 3; retry++) {
try {
int totalBuffers = 0;
for (InputChannel channel : inputChannels.values()) {
if (channel instanceof RemoteInputChannel) {
totalBuffers += ((RemoteInputChannel) channel).getNumberOfQueuedBuffers();
}
}
return totalBuffers;
}
catch (Exception ignored) {}
}
return 0;
}
// ------------------------------------------------------------------------ // ------------------------------------------------------------------------
// Setup/Life-cycle // Setup/Life-cycle
// ------------------------------------------------------------------------ // ------------------------------------------------------------------------
......
...@@ -19,8 +19,13 @@ ...@@ -19,8 +19,13 @@
package org.apache.flink.runtime.metrics.groups; package org.apache.flink.runtime.metrics.groups;
import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Meter; import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.MeterView; import org.apache.flink.metrics.MeterView;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.taskmanager.Task;
/** /**
* Metric group that contains shareable pre-defined IO-related metrics. The metrics registration is * Metric group that contains shareable pre-defined IO-related metrics. The metrics registration is
...@@ -36,6 +41,8 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> { ...@@ -36,6 +41,8 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> {
private final Meter numBytesInRateRemote; private final Meter numBytesInRateRemote;
private final Meter numBytesOutRate; private final Meter numBytesOutRate;
private final MetricGroup buffers;
public TaskIOMetricGroup(TaskMetricGroup parent) { public TaskIOMetricGroup(TaskMetricGroup parent) {
super(parent); super(parent);
...@@ -45,6 +52,8 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> { ...@@ -45,6 +52,8 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> {
this.numBytesOutRate = meter("numBytesOutPerSecond", new MeterView(numBytesOut, 60)); this.numBytesOutRate = meter("numBytesOutPerSecond", new MeterView(numBytesOut, 60));
this.numBytesInRateLocal = meter("numBytesInLocalPerSecond", new MeterView(numBytesInLocal, 60)); this.numBytesInRateLocal = meter("numBytesInLocalPerSecond", new MeterView(numBytesInLocal, 60));
this.numBytesInRateRemote = meter("numBytesInRemotePerSecond", new MeterView(numBytesInRemote, 60)); this.numBytesInRateRemote = meter("numBytesInRemotePerSecond", new MeterView(numBytesInRemote, 60));
this.buffers = addGroup("buffers");
} }
public Counter getNumBytesOutCounter() { public Counter getNumBytesOutCounter() {
...@@ -70,4 +79,116 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> { ...@@ -70,4 +79,116 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> {
public Meter getNumBytesOutRateMeter() { public Meter getNumBytesOutRateMeter() {
return numBytesOutRate; return numBytesOutRate;
} }
public MetricGroup getBuffersGroup() {
return buffers;
}
// ------------------------------------------------------------------------
// metrics of Buffers group
// ------------------------------------------------------------------------
/**
* Input received buffers gauge of a task
*/
public static final class InputBuffersGauge implements Gauge<Integer> {
private final Task task;
public InputBuffersGauge(Task task) {
this.task = task;
}
@Override
public Integer getValue() {
int totalBuffers = 0;
for (SingleInputGate inputGate : task.getAllInputGates()) {
totalBuffers += inputGate.getNumberOfQueuedBuffers();
}
return totalBuffers;
}
}
/**
* Output produced buffers gauge of a task
*/
public static final class OutputBuffersGauge implements Gauge<Integer> {
private final Task task;
public OutputBuffersGauge(Task task) {
this.task = task;
}
@Override
public Integer getValue() {
int totalBuffers = 0;
for (ResultPartition producedPartition : task.getProducedPartitions()) {
totalBuffers += producedPartition.getNumberOfQueuedBuffers();
}
return totalBuffers;
}
}
/**
* Input buffer pool usage gauge of a task
*/
public static final class InputBufferPoolUsageGauge implements Gauge<Float> {
private final Task task;
public InputBufferPoolUsageGauge(Task task) {
this.task = task;
}
@Override
public Float getValue() {
int availableBuffers = 0;
int bufferPoolSize = 0;
for (SingleInputGate inputGate : task.getAllInputGates()) {
availableBuffers += inputGate.getBufferPool().getNumberOfAvailableMemorySegments();
bufferPoolSize += inputGate.getBufferPool().getNumBuffers();
}
if (bufferPoolSize != 0) {
return ((float)(bufferPoolSize - availableBuffers)) / bufferPoolSize;
} else {
return 0.0f;
}
}
}
/**
* Output buffer pool usage gauge of a task
*/
public static final class OutputBufferPoolUsageGauge implements Gauge<Float> {
private final Task task;
public OutputBufferPoolUsageGauge(Task task) {
this.task = task;
}
@Override
public Float getValue() {
int availableBuffers = 0;
int bufferPoolSize = 0;
for (ResultPartition resultPartition : task.getProducedPartitions()) {
availableBuffers += resultPartition.getBufferPool().getNumberOfAvailableMemorySegments();
bufferPoolSize += resultPartition.getBufferPool().getNumBuffers();
}
if (bufferPoolSize != 0) {
return ((float)(bufferPoolSize - availableBuffers)) / bufferPoolSize;
} else {
return 0.0f;
}
}
}
} }
...@@ -26,6 +26,7 @@ import org.apache.flink.api.common.cache.DistributedCache; ...@@ -26,6 +26,7 @@ import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.fs.Path; import org.apache.flink.core.fs.Path;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager; import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
...@@ -57,6 +58,7 @@ import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; ...@@ -57,6 +58,7 @@ import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.jobgraph.tasks.StatefulTask; import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
import org.apache.flink.runtime.jobgraph.tasks.StoppableTask; import org.apache.flink.runtime.jobgraph.tasks.StoppableTask;
import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.TaskStateHandles; import org.apache.flink.runtime.state.TaskStateHandles;
...@@ -353,6 +355,15 @@ public class Task implements Runnable, TaskActions { ...@@ -353,6 +355,15 @@ public class Task implements Runnable, TaskActions {
// finally, create the executing thread, but do not start it // finally, create the executing thread, but do not start it
executingThread = new Thread(TASK_THREADS_GROUP, this, taskNameWithSubtask); executingThread = new Thread(TASK_THREADS_GROUP, this, taskNameWithSubtask);
// add metrics for buffers
if (this.metrics != null && this.metrics.getIOMetricGroup() != null) {
MetricGroup bufferMetrics = this.metrics.getIOMetricGroup().getBuffersGroup();
bufferMetrics.gauge("inputQueueLength", new TaskIOMetricGroup.InputBuffersGauge(this));
bufferMetrics.gauge("outputQueueLength", new TaskIOMetricGroup.OutputBuffersGauge(this));
bufferMetrics.gauge("inPoolUsage", new TaskIOMetricGroup.InputBufferPoolUsageGauge(this));
bufferMetrics.gauge("outPoolUsage", new TaskIOMetricGroup.OutputBufferPoolUsageGauge(this));
}
} }
// ------------------------------------------------------------------------ // ------------------------------------------------------------------------
......
...@@ -44,19 +44,14 @@ import org.apache.flink.runtime.jobgraph.tasks.StatefulTask; ...@@ -44,19 +44,14 @@ import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.ChainedStateHandle;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.TaskStateHandles; import org.apache.flink.runtime.state.TaskStateHandles;
import org.apache.flink.util.SerializedValue; import org.apache.flink.util.SerializedValue;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import java.net.URL; import java.net.URL;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册