提交 fff04bfb 编写于 作者: N Nico Kruber 提交者: Aljoscha Krettek

[FLINK-5090] Fixups for "Add metrics for details about inbound/outbound network queues"

This closes #3348.
上级 4795ce8f
......@@ -640,7 +640,7 @@ Thus, in order to infer the metric identifier:
<td>The number of allocated memory segments.</td>
</tr>
<tr>
<th rowspan="4">Task</th>
<th rowspan="8">Task</th>
<td rowspan="4">buffers</td>
<td>inputQueueLength</td>
<td>The number of queued input buffers.</td>
......@@ -657,6 +657,24 @@ Thus, in order to infer the metric identifier:
<td>outPoolUsage</td>
<td>An estimate of the output buffers usage.</td>
</tr>
<tr>
<td rowspan="4">Network.&lt;Input|Output&gt;.&lt;gate&gt;<br />
<strong>(only available if <tt>taskmanager.net.detailed-metrics</tt> config option is set)</strong></td>
<td>totalQueueLen</td>
<td>Total number of queued buffers in all input/output channels.</td>
</tr>
<tr>
<td>minQueueLen</td>
<td>Minimum number of queued buffers in all input/output channels.</td>
</tr>
<tr>
<td>maxQueueLen</td>
<td>Maximum number of queued buffers in all input/output channels.</td>
</tr>
<tr>
<td>avgQueueLen</td>
<td>Average number of queued buffers in all input/output channels.</td>
</tr>
</tbody>
</table>
......
......@@ -250,12 +250,6 @@ public final class ConfigConstants {
@Deprecated
public static final String TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY = "taskmanager.network.numberOfBuffers";
/**
* Boolean flag to enable/disable more detailed metrics about inbound/outbound network queue lengths
*/
@PublicEvolving
public static final String NETWORK_DETAILED_METRICS_KEY = "taskmanager.net.detailed-metrics";
/**
* Config parameter defining the size of memory buffers used by the network stack and the memory manager.
*
......
......@@ -115,6 +115,14 @@ public class TaskManagerOptions {
key("taskmanager.net.memory.extra-buffers-per-gate")
.defaultValue(8);
/**
* Boolean flag to enable/disable more detailed metrics about inbound/outbound network queue
* lengths.
*/
public static final ConfigOption<Boolean> NETWORK_DETAILED_METRICS =
key("taskmanager.net.detailed-metrics")
.defaultValue(false);
// ------------------------------------------------------------------------
// Task Options
// ------------------------------------------------------------------------
......
......@@ -221,10 +221,20 @@ public class ResultPartition implements BufferPoolOwner {
return bufferPool;
}
/**
* Returns the total number of processed network buffers since initialization.
*
* @return overall number of processed network buffers
*/
public int getTotalNumberOfBuffers() {
return totalNumberOfBuffers;
}
/**
* Returns the total size of processed network buffers since initialization.
*
* @return overall size of processed network buffers
*/
public long getTotalNumberOfBytes() {
return totalNumberOfBytes;
}
......
......@@ -23,16 +23,13 @@ import org.apache.flink.metrics.MetricGroup;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* Collects metrics of a result partition.
*/
public class ResultPartitionMetrics {
private final ResultPartition partition;
private int lastMin = -1;
private int lastMax = -1;
private float lastAvg = -1.0f;
// ------------------------------------------------------------------------
private ResultPartitionMetrics(ResultPartition partition) {
......@@ -43,58 +40,93 @@ public class ResultPartitionMetrics {
// these methods are package private to make access from the nested classes faster
int refreshAndGetMin() {
int min;
if ((min = lastMin) == -1) {
refresh();
min = lastMin;
}
/**
* Iterates over all sub-partitions and collects the total number of queued buffers in a
* best-effort way.
*
* @return total number of queued buffers
*/
long refreshAndGetTotal() {
long total = 0;
lastMin = -1;
return min;
for (ResultSubpartition part : partition.getAllPartitions()) {
total += part.unsynchronizedGetNumberOfQueuedBuffers();
}
int refreshAndGetMax() {
int max;
if ((max = lastMax) == -1) {
refresh();
max = lastMax;
return total;
}
lastMax = -1;
return max;
/**
* Iterates over all sub-partitions and collects the minimum number of queued buffers in a
* sub-partition in a best-effort way.
*
* @return minimum number of queued buffers per sub-partition (<tt>0</tt> if sub-partitions exist)
*/
int refreshAndGetMin() {
int min = Integer.MAX_VALUE;
ResultSubpartition[] allPartitions = partition.getAllPartitions();
if (allPartitions.length == 0) {
// meaningful value when no channels exist:
return 0;
}
float refreshAndGetAvg() {
float avg;
if ((avg = lastAvg) < 0.0f) {
refresh();
avg = lastAvg;
for (ResultSubpartition part : allPartitions) {
int size = part.unsynchronizedGetNumberOfQueuedBuffers();
min = Math.min(min, size);
}
lastAvg = -1.0f;
return avg;
return min;
}
private void refresh() {
int min = Integer.MAX_VALUE;
/**
* Iterates over all sub-partitions and collects the maximum number of queued buffers in a
* sub-partition in a best-effort way.
*
* @return maximum number of queued buffers per sub-partition
*/
int refreshAndGetMax() {
int max = 0;
for (ResultSubpartition part : partition.getAllPartitions()) {
int size = part.unsynchronizedGetNumberOfQueuedBuffers();
min = Math.min(min, size);
max = Math.max(max, size);
}
this.lastMin = min;
this.lastMax = max;
this.lastAvg = partition.getTotalNumberOfBuffers() / (float) partition.getNumberOfSubpartitions();
return max;
}
/**
* Iterates over all sub-partitions and collects the average number of queued buffers in a
* sub-partition in a best-effort way.
*
* @return average number of queued buffers per sub-partition
*/
float refreshAndGetAvg() {
long total = 0;
ResultSubpartition[] allPartitions = partition.getAllPartitions();
for (ResultSubpartition part : allPartitions) {
int size = part.unsynchronizedGetNumberOfQueuedBuffers();
total += size;
}
return total / (float) allPartitions.length;
}
// ------------------------------------------------------------------------
// Gauges to access the stats
// ------------------------------------------------------------------------
private Gauge<Long> getTotalQueueLenGauge() {
return new Gauge<Long>() {
@Override
public Long getValue() {
return refreshAndGetTotal();
}
};
}
private Gauge<Integer> getMinQueueLenGauge() {
return new Gauge<Integer>() {
@Override
......@@ -129,8 +161,9 @@ public class ResultPartitionMetrics {
public static void registerQueueLengthMetrics(MetricGroup group, ResultPartition partition) {
ResultPartitionMetrics metrics = new ResultPartitionMetrics(partition);
group.gauge("min-queue-len", metrics.getMinQueueLenGauge());
group.gauge("max-queue-len", metrics.getMaxQueueLenGauge());
group.gauge("avg-queue-len", metrics.getAvgQueueLenGauge());
group.gauge("totalQueueLen", metrics.getTotalQueueLenGauge());
group.gauge("minQueueLen", metrics.getMinQueueLenGauge());
group.gauge("maxQueueLen", metrics.getMaxQueueLenGauge());
group.gauge("avgQueueLen", metrics.getAvgQueueLenGauge());
}
}
......@@ -21,20 +21,17 @@ package org.apache.flink.runtime.io.network.partition.consumer;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import java.util.Collection;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* Collects metrics of an input gate.
*/
public class InputGateMetrics {
private final SingleInputGate inputGate;
private long lastTotal = -1;
private int lastMin = -1;
private int lastMax = -1;
private float lastAvg = -1.0f;
// ------------------------------------------------------------------------
private InputGateMetrics(SingleInputGate inputGate) {
......@@ -45,71 +42,95 @@ public class InputGateMetrics {
// these methods are package private to make access from the nested classes faster
/**
* Iterates over all input channels and collects the total number of queued buffers in a
* best-effort way.
*
* @return total number of queued buffers
*/
long refreshAndGetTotal() {
long total;
if ((total = lastTotal) == -1) {
refresh();
total = lastTotal;
long total = 0;
for (InputChannel channel : inputGate.getInputChannels().values()) {
if (channel instanceof RemoteInputChannel) {
RemoteInputChannel rc = (RemoteInputChannel) channel;
total += rc.unsynchronizedGetNumberOfQueuedBuffers();
}
}
lastTotal = -1;
return total;
}
/**
* Iterates over all input channels and collects the minimum number of queued buffers in a
* channel in a best-effort way.
*
* @return minimum number of queued buffers per channel (<tt>0</tt> if no channels exist)
*/
int refreshAndGetMin() {
int min;
if ((min = lastMin) == -1) {
refresh();
min = lastMin;
int min = Integer.MAX_VALUE;
Collection<InputChannel> channels = inputGate.getInputChannels().values();
if (channels.isEmpty()) {
// meaningful value when no channels exist:
return 0;
}
for (InputChannel channel : channels) {
if (channel instanceof RemoteInputChannel) {
RemoteInputChannel rc = (RemoteInputChannel) channel;
int size = rc.unsynchronizedGetNumberOfQueuedBuffers();
min = Math.min(min, size);
}
}
lastMin = -1;
return min;
}
/**
* Iterates over all input channels and collects the maximum number of queued buffers in a
* channel in a best-effort way.
*
* @return maximum number of queued buffers per channel
*/
int refreshAndGetMax() {
int max;
if ((max = lastMax) == -1) {
refresh();
max = lastMax;
}
int max = 0;
lastMax = -1;
return max;
}
for (InputChannel channel : inputGate.getInputChannels().values()) {
if (channel instanceof RemoteInputChannel) {
RemoteInputChannel rc = (RemoteInputChannel) channel;
float refreshAndGetAvg() {
float avg;
if ((avg = lastAvg) < 0.0f) {
refresh();
avg = lastAvg;
int size = rc.unsynchronizedGetNumberOfQueuedBuffers();
max = Math.max(max, size);
}
}
lastAvg = -1.0f;
return avg;
return max;
}
private void refresh() {
/**
* Iterates over all input channels and collects the average number of queued buffers in a
* channel in a best-effort way.
*
* @return average number of queued buffers per channel
*/
float refreshAndGetAvg() {
long total = 0;
int min = Integer.MAX_VALUE;
int max = 0;
int count = 0;
for (InputChannel channel : inputGate.getInputChannels().values()) {
if (channel.getClass() == RemoteInputChannel.class) {
if (channel instanceof RemoteInputChannel) {
RemoteInputChannel rc = (RemoteInputChannel) channel;
int size = rc.unsynchronizedGetNumberOfQueuedBuffers();
total += size;
min = Math.min(min, size);
max = Math.max(max, size);
count++;
++count;
}
}
this.lastMin = min;
this.lastMax = max;
this.lastAvg = total / (float) count;
return total / (float) count;
}
// ------------------------------------------------------------------------
......@@ -159,9 +180,9 @@ public class InputGateMetrics {
public static void registerQueueLengthMetrics(MetricGroup group, SingleInputGate gate) {
InputGateMetrics metrics = new InputGateMetrics(gate);
group.gauge("total-queue-len", metrics.getTotalQueueLenGauge());
group.gauge("min-queue-len", metrics.getMinQueueLenGauge());
group.gauge("max-queue-len", metrics.getMaxQueueLenGauge());
group.gauge("avg-queue-len", metrics.getAvgQueueLenGauge());
group.gauge("totalQueueLen", metrics.getTotalQueueLenGauge());
group.gauge("minQueueLen", metrics.getMinQueueLenGauge());
group.gauge("maxQueueLen", metrics.getMaxQueueLenGauge());
group.gauge("avgQueueLen", metrics.getAvgQueueLenGauge());
}
}
......@@ -23,11 +23,11 @@ import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.fs.FileSystemSafetyNet;
import org.apache.flink.core.fs.Path;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
......@@ -390,27 +390,10 @@ public class Task implements Runnable, TaskActions {
++counter;
}
// register detailed network metrics, if configured
if (tmConfig.getBoolean(ConfigConstants.NETWORK_DETAILED_METRICS_KEY, false)) {
// output metrics
for (int i = 0; i < producedPartitions.length; i++) {
ResultPartitionMetrics.registerQueueLengthMetrics(
metricGroup.addGroup("netout_" + i), producedPartitions[i]);
}
for (int i = 0; i < inputGates.length; i++) {
InputGateMetrics.registerQueueLengthMetrics(
metricGroup.addGroup("netin_" + i), inputGates[i]);
}
}
invokableHasBeenCanceled = new AtomicBoolean(false);
// finally, create the executing thread, but do not start it
executingThread = new Thread(TASK_THREADS_GROUP, this, taskNameWithSubtask);
// add metrics for buffers
this.metrics.getIOMetricGroup().initializeBufferMetrics(this);
}
// ------------------------------------------------------------------------
......@@ -616,6 +599,28 @@ public class Task implements Runnable, TaskActions {
network.registerTask(this);
// add metrics for buffers
this.metrics.getIOMetricGroup().initializeBufferMetrics(this);
// register detailed network metrics, if configured
if (taskManagerConfig.getConfiguration().getBoolean(TaskManagerOptions.NETWORK_DETAILED_METRICS)) {
// similar to MetricUtils.instantiateNetworkMetrics() but inside this IOMetricGroup
MetricGroup networkGroup = this.metrics.getIOMetricGroup().addGroup("Network");
MetricGroup outputGroup = networkGroup.addGroup("Output");
MetricGroup inputGroup = networkGroup.addGroup("Input");
// output metrics
for (int i = 0; i < producedPartitions.length; i++) {
ResultPartitionMetrics.registerQueueLengthMetrics(
outputGroup.addGroup(i), producedPartitions[i]);
}
for (int i = 0; i < inputGates.length; i++) {
InputGateMetrics.registerQueueLengthMetrics(
inputGroup.addGroup(i), inputGates[i]);
}
}
// next, kick off the background copying of files for the distributed cache
try {
for (Map.Entry<String, DistributedCache.DistributedCacheEntry> entry :
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册