提交 a1379630 编写于 作者: P Piotr Nowojski 提交者: Piotr Nowojski

[FLINK-20718][metrics] Add busyTimeMsPerSecond metric

It's defined as inverted value of idleTimeMsPerSecond
上级 9c68f02c
......@@ -1258,6 +1258,11 @@ Certain RocksDB native metrics are available but disabled by default, you can fi
<td>The time (in milliseconds) this task is back pressured per second.</td>
<td>Meter</td>
</tr>
<tr>
<td>busyTimeMsPerSecond</td>
<td>The time (in milliseconds) this task is busy (neither idle nor back pressured) per second. Can be NaN, if the value could not be calculated.</td>
<td>Meter</td>
</tr>
<tr>
<th rowspan="6"><strong>Task/Operator</strong></th>
<td>numRecordsIn</td>
......
......@@ -1258,6 +1258,11 @@ Certain RocksDB native metrics are available but disabled by default, you can fi
<td>The time (in milliseconds) this task is back pressured per second.</td>
<td>Meter</td>
</tr>
<tr>
<td>busyTimeMsPerSecond</td>
<td>The time (in milliseconds) this task is busy (neither idle nor back pressured) per second. Can be NaN, if the value could not be calculated.</td>
<td>Meter</td>
</tr>
<tr>
<th rowspan="6"><strong>Task/Operator</strong></th>
<td>numRecordsIn</td>
......
......@@ -67,5 +67,6 @@ public class MetricNames {
}
public static final String TASK_IDLE_TIME = "idleTimeMs" + SUFFIX_RATE;
public static final String TASK_BUSY_TIME = "busyTimeMs" + SUFFIX_RATE;
public static final String TASK_BACK_PRESSURED_TIME = "backPressuredTimeMs" + SUFFIX_RATE;
}
......@@ -19,6 +19,7 @@
package org.apache.flink.runtime.metrics.groups;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.MeterView;
import org.apache.flink.metrics.SimpleCounter;
......@@ -46,8 +47,11 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> {
private final Meter numRecordsOutRate;
private final Meter numBuffersOutRate;
private final Meter idleTimePerSecond;
private final Gauge busyTimePerSecond;
private final Meter backPressuredTimePerSecond;
private volatile boolean busyTimeEnabled;
public TaskIOMetricGroup(TaskMetricGroup parent) {
super(parent);
......@@ -71,6 +75,7 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> {
meter(MetricNames.TASK_IDLE_TIME, new MeterView(new SimpleCounter()));
this.backPressuredTimePerSecond =
meter(MetricNames.TASK_BACK_PRESSURED_TIME, new MeterView(new SimpleCounter()));
this.busyTimePerSecond = gauge(MetricNames.TASK_BUSY_TIME, this::getBusyTimePerSecond);
}
public IOMetrics createSnapshot() {
......@@ -109,6 +114,15 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> {
return backPressuredTimePerSecond;
}
public void setEnableBusyTime(boolean enabled) {
busyTimeEnabled = enabled;
}
private double getBusyTimePerSecond() {
double busyTime = idleTimePerSecond.getRate() + backPressuredTimePerSecond.getRate();
return busyTimeEnabled ? 1000.0 - Math.min(busyTime, 1000.0) : Double.NaN;
}
// ============================================================================================
// Metric Reuse
// ============================================================================================
......
......@@ -80,6 +80,8 @@ public class SourceStreamTask<
StreamTaskActionExecutor.synchronizedExecutor(lock));
this.lock = Preconditions.checkNotNull(lock);
this.sourceThread = new LegacySourceFunctionThread();
getEnvironment().getMetricGroup().getIOMetricGroup().setEnableBusyTime(false);
}
@Override
......
......@@ -343,6 +343,8 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends Ab
new ExecutorThreadFactory("channel-state-unspilling"));
injectChannelStateWriterIntoChannels();
environment.getMetricGroup().getIOMetricGroup().setEnableBusyTime(true);
}
private void injectChannelStateWriterIntoChannels() {
......
......@@ -110,7 +110,7 @@ public class SourceStreamTaskTest {
}
@Test(timeout = 60_000)
public void testStartDelayMetric() throws Exception {
public void testMetrics() throws Exception {
long sleepTime = 42;
StreamTaskMailboxTestHarnessBuilder<String> builder =
new StreamTaskMailboxTestHarnessBuilder<>(
......@@ -145,6 +145,8 @@ public class SourceStreamTaskTest {
(Gauge<Long>) metrics.get(MetricNames.CHECKPOINT_START_DELAY_TIME);
assertThat(
checkpointStartDelayGauge.getValue(), greaterThanOrEqualTo(sleepTime * 1_000_000));
Gauge<Double> busyTimeGauge = (Gauge<Double>) metrics.get(MetricNames.TASK_BUSY_TIME);
assertTrue(Double.isNaN(busyTimeGauge.getValue()));
}
/**
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册