# 度量 > 译者:[flink.sojb.cn](https://flink.sojb.cn/) Flink公开了一个度量系统,允许收集和公开指标到外部系统。 ## 注册指标 您可以通过调用从扩展[RichFunction](https://flink.sojb.cn/dev/api_concepts.html#rich-functions)的任何用户函数访问度量标准系统`getRuntimeContext().getMetricGroup()`。此方法返回一个`MetricGroup`对象,您可以在该对象上创建和注册新指标。 ### 度量类型 Flink支持`Counters`,`Gauges`,`Histograms`和`Meters`。 #### 计数器 A `Counter`用于计算某些东西。可以使用`inc()/inc(long n)`或来Reduce当前值`dec()/dec(long n)`。您可以创建并注册`Counter`调用`counter(String name)`上`MetricGroup`。 * [**Java**](#tab_java_0) * [**Scala**](#tab_scala_0) ``` public class MyMapper extends RichMapFunction { private transient Counter counter; @Override public void open(Configuration config) { this.counter = getRuntimeContext() .getMetricGroup() .counter("myCounter"); } @Override public String map(String value) throws Exception { this.counter.inc(); return value; } } ``` ``` class MyMapper extends RichMapFunction[String,String] { @transient private var counter: Counter = _ override def open(parameters: Configuration): Unit = { counter = getRuntimeContext() .getMetricGroup() .counter("myCounter") } override def map(value: String): String = { counter.inc() value } } ``` 或者,您也可以使用自己的`Counter`实现: * [**Java**](#tab_java_1) * [**Scala**](#tab_scala_1) ``` public class MyMapper extends RichMapFunction { private transient Counter counter; @Override public void open(Configuration config) { this.counter = getRuntimeContext() .getMetricGroup() .counter("myCustomCounter", new CustomCounter()); } @Override public String map(String value) throws Exception { this.counter.inc(); return value; } } ``` ``` class MyMapper extends RichMapFunction[String,String] { @transient private var counter: Counter = _ override def open(parameters: Configuration): Unit = { counter = getRuntimeContext() .getMetricGroup() .counter("myCustomCounter", new CustomCounter()) } override def map(value: String): String = { counter.inc() value } } ``` #### 测量 A根据需要`Gauge`提供任何类型的值。为了使用a,`Gauge`您必须首先创建一个实现该`org.apache.flink.metrics.Gauge`接口的类。返回值的类型没有限制。你可以通过调用注册一个计`gauge(String name, Gauge gauge)`上`MetricGroup`。 * [**Java**](#tab_java_2) * [**Scala**](#tab_scala_2) ``` public class MyMapper extends RichMapFunction { private transient int valueToExpose = 0; @Override public void open(Configuration config) { getRuntimeContext() .getMetricGroup() .gauge("MyGauge", new Gauge() { @Override public Integer getValue() { return valueToExpose; } }); } @Override public String map(String value) throws Exception { valueToExpose++; return value; } } ``` ``` new class MyMapper extends RichMapFunction[String,String] { @transient private var valueToExpose = 0 override def open(parameters: Configuration): Unit = { getRuntimeContext() .getMetricGroup() .gauge[Int, ScalaGauge[Int]]("MyGauge", ScalaGauge[Int]( () => valueToExpose ) ) } override def map(value: String): String = { valueToExpose += 1 value } } ``` 请注意,报告会将公开的对象转换为a `String`,这意味着需要进行有意义的`toString()`实现。 #### 直方图 A `Histogram`衡量长值的分布。你可以通过调用注册一个`histogram(String name, Histogram histogram)`上一个`MetricGroup`。 * [**Java**](#tab_java_3) * [**Scala**](#tab_scala_3) ``` public class MyMapper extends RichMapFunction { private transient Histogram histogram; @Override public void open(Configuration config) { this.histogram = getRuntimeContext() .getMetricGroup() .histogram("myHistogram", new MyHistogram()); } @Override public Long map(Long value) throws Exception { this.histogram.update(value); return value; } } ``` ``` class MyMapper extends RichMapFunction[Long,Long] { @transient private var histogram: Histogram = _ override def open(parameters: Configuration): Unit = { histogram = getRuntimeContext() .getMetricGroup() .histogram("myHistogram", new MyHistogram()) } override def map(value: Long): Long = { histogram.update(value) value } } ``` Flink没有提供默认实现`Histogram`,但提供了一个允许使用Codahale / DropWizard直方图的[Wrapper](https://github.com/apache/flink/blob/master/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardHistogramWrapper.java)。要使用此打包,请在以下内容中添加以下依赖项`pom.xml`: ``` org.apache.flink flink-metrics-dropwizard 1.7-SNAPSHOT ``` 然后你可以像这样注册一个Codahale / DropWizard直方图: * [**Java**](#tab_java_4) * [**Scala**](#tab_scala_4) ``` public class MyMapper extends RichMapFunction { private transient Histogram histogram; @Override public void open(Configuration config) { com.codahale.metrics.Histogram dropwizardHistogram = new com.codahale.metrics.Histogram(new SlidingWindowReservoir(500)); this.histogram = getRuntimeContext() .getMetricGroup() .histogram("myHistogram", new DropwizardHistogramWrapper(dropwizardHistogram)); } @Override public Long map(Long value) throws Exception { this.histogram.update(value); return value; } } ``` ``` class MyMapper extends RichMapFunction[Long, Long] { @transient private var histogram: Histogram = _ override def open(config: Configuration): Unit = { com.codahale.metrics.Histogram dropwizardHistogram = new com.codahale.metrics.Histogram(new SlidingWindowReservoir(500)) histogram = getRuntimeContext() .getMetricGroup() .histogram("myHistogram", new DropwizardHistogramWrapper(dropwizardHistogram)) } override def map(value: Long): Long = { histogram.update(value) value } } ``` #### 仪表 A `Meter`衡量平均吞吐量。可以使用该`markEvent()`方法注册事件的发生。可以使用`markEvent(long n)`方法注册同时发生多个事件。你可以通过调用注册一个仪表`meter(String name, Meter meter)`上`MetricGroup`。 * [**Java**](#tab_java_5) * [**Scala**](#tab_scala_5) ``` public class MyMapper extends RichMapFunction { private transient Meter meter; @Override public void open(Configuration config) { this.meter = getRuntimeContext() .getMetricGroup() .meter("myMeter", new MyMeter()); } @Override public Long map(Long value) throws Exception { this.meter.markEvent(); return value; } } ``` ``` class MyMapper extends RichMapFunction[Long,Long] { @transient private var meter: Meter = _ override def open(config: Configuration): Unit = { meter = getRuntimeContext() .getMetricGroup() .meter("myMeter", new MyMeter()) } override def map(value: Long): Long = { meter.markEvent() value } } ``` Flink提供了一个允许使用Codahale / DropWizard表的[打包器](https://github.com/apache/flink/blob/master/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardMeterWrapper.java)。要使用此打包,请在以下内容中添加以下依赖项`pom.xml`: ``` org.apache.flink flink-metrics-dropwizard 1.7-SNAPSHOT ``` 然后你可以像这样注册一个Codahale / DropWizard仪表: * [**Java**](#tab_java_6) * [**Scala**](#tab_scala_6) ``` public class MyMapper extends RichMapFunction { private transient Meter meter; @Override public void open(Configuration config) { com.codahale.metrics.Meter dropwizardMeter = new com.codahale.metrics.Meter(); this.meter = getRuntimeContext() .getMetricGroup() .meter("myMeter", new DropwizardMeterWrapper(dropwizardMeter)); } @Override public Long map(Long value) throws Exception { this.meter.markEvent(); return value; } } ``` ``` class MyMapper extends RichMapFunction[Long,Long] { @transient private var meter: Meter = _ override def open(config: Configuration): Unit = { com.codahale.metrics.Meter dropwizardMeter = new com.codahale.metrics.Meter() meter = getRuntimeContext() .getMetricGroup() .meter("myMeter", new DropwizardMeterWrapper(dropwizardMeter)) } override def map(value: Long): Long = { meter.markEvent() value } } ``` ## 范围 为每个度量标准分配一个标识符和一组键值对,在该键值对下将报告度量标准。 标识符基于3个组件:注册度量标准时的用户定义名称,可选的用户定义范围和系统提供的范围。例如,如果`A.B`是系统范围,`C.D`用户范围和`E`名称,则度量标识符将是`A.B.C.D.E`。 您可以`.`通过设置`metrics.scope.delimiter`Keys来配置要用于标识符的分隔符(默认值:) `conf/flink-conf.yaml`。 ### 用户范围 你可以通过调用定义用户范围`MetricGroup#addGroup(String name)`,`MetricGroup#addGroup(int name)`或`Metric#addGroup(String key, String value)`。这些方法影响什么`MetricGroup#getMetricIdentifier`和`MetricGroup#getScopeComponents`返回。 * [**Java**](#tab_java_7) * [**Scala**](#tab_scala_7) ``` counter = getRuntimeContext() .getMetricGroup() .addGroup("MyMetrics") .counter("myCounter"); counter = getRuntimeContext() .getMetricGroup() .addGroup("MyMetricsKey", "MyMetricsValue") .counter("myCounter"); ``` ``` counter = getRuntimeContext() .getMetricGroup() .addGroup("MyMetrics") .counter("myCounter") counter = getRuntimeContext() .getMetricGroup() .addGroup("MyMetricsKey", "MyMetricsValue") .counter("myCounter") ``` ### 系统范围 系统范围包含有关度量标准的上下文信息,例如,它在哪个任务中注册或该任务属于哪个作业。 可以通过设置以下键来配置应包含哪些上下文信息`conf/flink-conf.yaml`。这些键中的每一个都期望一个格式字符串可能包含常量(例如“taskmanager”)和变量(例如“<task_id>”),它们将在运行时被替换。 * `metrics.scope.jm` * 默认值:<host> .jobmanager * 应用于作用域JobManager的所有指标。 * `metrics.scope.jm.job` * 默认值:<host> .jobmanager。<job_name> * 应用于作用于JobManager和作业的所有度量标准。 * `metrics.scope.tm` * 默认值:<host> .taskmanager。<tm_id> * 应用于作用于TaskManager的所有度量标准。 * `metrics.scope.tm.job` * 默认值:<host> .taskmanager。<tm_id>。<job_name> * 应用于作用于TaskManager和作业的所有度量标准。 * `metrics.scope.task` * 默认值:<host> .taskmanager。<tm_id>。<job_name>。<task_name>。<subtask_index> * 应用于作用于任务的所有指标。 * `metrics.scope.operator` * 默认值:<host> .taskmanager。<tm_id>。<job_name>。<operator_name>。<subtask_index> * 应用于作用于算子的所有指标。 变量的数量或顺序没有限制。变量区分大小写。 算子指标的默认范围将产生类似于的标识符 `localhost.taskmanager.1234.MyJob.MyOperator.0.MyMetric` 如果您还想包含任务名称但省略TaskManager信息,则可以指定以下格式: `metrics.scope.operator: <host>.<job_name>.<task_name>.<operator_name>.<subtask_index>` 这可以创建标识符`localhost.MyJob.MySource_->_MyOperator.MyOperator.0.MyMetric`。 请注意,对于此格式字符串,如果同时多次运行同一作业,则可能发生标识符冲突,这可能导致度量标准数据不一致。因此,建议使用通过包含ID(例如<job_id>)或通过为作业和 算子分配唯一名称来提供一定程度的唯一性的格式字符串。 ### 所有变量列表 * JobManager:<host> * TaskManager:<host>,<tm_id> * 作业:<job_id>,<作业名称> * 任务:<task_id>,<task_name>,<task_attempt_id>,<task_attempt_num>,<subtask_index> * 算子:<operator_id>,<operator_name>,<subtask_index> **要点:**对于Batch API,<operator_id>始终等于<task_id>。 ### 用户变量 您可以通过调用来定义用户变量`MetricGroup#addGroup(String key, String value)`。这种方法会影响什么`MetricGroup#getMetricIdentifier`,`MetricGroup#getScopeComponents`并`MetricGroup#getAllVariables()`返回。 **重要提示:**用户变量不能用于范围格式。 * [**Java**](#tab_java_8) * [**Scala**](#tab_scala_8) ``` counter = getRuntimeContext() .getMetricGroup() .addGroup("MyMetricsKey", "MyMetricsValue") .counter("myCounter"); ``` ``` counter = getRuntimeContext() .getMetricGroup() .addGroup("MyMetricsKey", "MyMetricsValue") .counter("myCounter") ``` ## 报告 通过配置一个或多个报告,可以将度量标准暴露给外部系统`conf/flink-conf.yaml`。这些报告将在每个工作和TaskManager启动时进行实例化。 * `metrics.reporter.<name>.<config>`:`<config>`报告的通用设置命名`<name>`。 * `metrics.reporter.<name>.class`:报告类用于为报告命名`<name>`。 * `metrics.reporter.<name>.interval`:报告间隔用于报告的名字`<name>`。 * `metrics.reporter.<name>.scope.delimiter`:用于名称的报告者的标识符(默认值使用`metrics.scope.delimiter`)的分隔符`<name>`。 * `metrics.reporters`:(可选)以逗号分隔的包含报告名称列表。默认情况下,将使用所有已配置的报告。 所有报告必须至少拥有该`class`财产,其中一些允许指定报告`interval`。下面,我们将列出针对每位报告的更多设置。 示例报表配置,指定多个报告: ``` metrics.reporters: my_jmx_reporter,my_other_reporter metrics.reporter.my_jmx_reporter.class: org.apache.flink.metrics.jmx.JMXReporter metrics.reporter.my_jmx_reporter.port: 9020-9040 metrics.reporter.my_other_reporter.class: org.apache.flink.metrics.graphite.GraphiteReporter metrics.reporter.my_other_reporter.host: 192.168.1.1 metrics.reporter.my_other_reporter.port: 10000 ``` **重要说明:**启动Flink时,通过将其放在/ lib文件夹中,可以访问包含报告者的jar。 您可以`Reporter`通过实现`org.apache.flink.metrics.reporter.MetricReporter`接口编写自己的。如果Reporter应定期发送报告,您还必须实现该`Scheduled`接口。 以下部分列出了受支持的报告。 ### JMX(org.apache.flink.metrics.jmx.JMXReporter) 您不必包含其他依赖项,因为默认情况下JMX报告器可用但未激活。 参数: * `port` - (可选)JMX侦听连接的端口。为了能够在一个主机上运行多个报告实例(例如,当一个TaskManager与JobManager共同使用时),建议使用类似的端口范围`9250-9260`。指定范围时,实际端口将显示在相关作业或TaskManager日志中。如果设置此设置,Flink将为给定的端口/范围启动额外的JMX连接器。度量标准始终在默认的本地JMX界面上可用。 配置示例: ``` metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter metrics.reporter.jmx.port: 8789 ``` 通过JMX公开的度量标准由域和键属性列表标识,这些键属性一起形成对象名称。 域始终以`org.apache.flink`广义度量标识符开头。与通常的标识符相反,它不受作用域格式的影响,不包含任何变量,并且在作业中保持不变。这种域的一个例子是`org.apache.flink.job.task.numBytesOut`。 键属性列表包含与给定度量关联的所有变量的值,无论配置的范围格式如何。这样一个列表的一个例子是`host=localhost,job_name=MyJob,task_name=MyTask`。 因此,域标识度量标准类,而关键属性列表标识该度量标准的一个(或多个)实例。 ### Ganglia(org.apache.flink.metrics.ganglia.GangliaReporter) 要使用此报告,您必须复制`/opt/flink-metrics-ganglia-1.7-SNAPSHOT.jar`到`/lib`Flink发行版的文件夹中。 参数: * `host`-下配置的的gmond主机地址`udp_recv_channel.bind`在`gmond.conf` * `port`-下配置的端口的gmond `udp_recv_channel.port`在`gmond.conf` * `tmax` - 应保存旧度量标准的软限制 * `dmax` - 应保存旧指标多长时间的硬限制 * `ttl` - 传输的UDP数据包的生存时间 * `addressingMode` - 要使用的UDP寻址模式(UNICAST / MULTICAST) 配置示例: ``` metrics.reporter.gang.class: org.apache.flink.metrics.ganglia.GangliaReporter metrics.reporter.gang.host: localhost metrics.reporter.gang.port: 8649 metrics.reporter.gang.tmax: 60 metrics.reporter.gang.dmax: 0 metrics.reporter.gang.ttl: 1 metrics.reporter.gang.addressingMode: MULTICAST ``` ### Graphite(org.apache.flink.metrics.graphite.GraphiteReporter) 要使用此报告,您必须复制`/opt/flink-metrics-graphite-1.7-SNAPSHOT.jar`到`/lib`Flink发行版的文件夹中。 参数: * `host` - Graphite服务器主机 * `port` - Graphite服务器端口 * `protocol` - 使用协议(TCP / UDP) 配置示例: ``` metrics.reporter.grph.class: org.apache.flink.metrics.graphite.GraphiteReporter metrics.reporter.grph.host: localhost metrics.reporter.grph.port: 2003 metrics.reporter.grph.protocol: TCP ``` ### Prometheus (org.apache.flink.metrics.prometheus.PrometheusReporter) 要使用此报告,您必须复制`/opt/flink-metrics-prometheus-1.7-SNAPSHOT.jar`到`/lib`Flink发行版的文件夹中。 参数: * `port`- (可选)Prometheus导出器侦听的端口,默认为[9249](https://github.com/prometheus/prometheus/wiki/Default-port-allocations)。为了能够在一个主机上运行多个报告实例(例如,当一个TaskManager与JobManager共同使用时),建议使用类似的端口范围`9250-9260`。 配置示例: ``` metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter ``` Flink度量标准类型映射到Prometheus度量标准类型,如下所示: | Flink | Prometheus | 注意 | | --- | --- | --- | | 计数器 | 测量 | Prometheus 计数器不能Reduce。 | | 测量 | 测量 | 仅支持数字和布尔值。 | | 直方图 | 概要 | 分位数.5,.75,.95,.98,.99和.999 | | 仪表 | 测量 | 仪表输出仪表的速率。 | 所有Flink度量变量(请参阅[所有变量列表](#list-of-all-variables))都将作为标签导出到Prometheus。 ### PrometheusPushGateway(org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter) 要使用此报告,您必须复制`/opt/flink-metrics-prometheus-1.7-SNAPSHOT.jar`到`/lib`Flink发行版的文件夹中。 参数: | 键 | 默认 | 描述 | | --- | --- | --- | | `deleteOnShutdown` | true | 指定是否在关闭时从PushGateway中删除指标。 | | `Host` | (none) | PushGateway服务器主机。 | | `jobName` | (none) | 将推送指标的作业名称 | | `port` | -1 | PushGateway服务器端口。 | | `randomJobNameSuffix` | true | 指定是否应将随机后缀附加到作业名称。 | 配置示例: ``` metrics.reporter.promgateway.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter metrics.reporter.promgateway.host: localhost metrics.reporter.promgateway.port: 9091 metrics.reporter.promgateway.jobName: myJob metrics.reporter.promgateway.randomJobNameSuffix: true metrics.reporter.promgateway.deleteOnShutdown: false ``` PrometheusPushGatewayReporter将指标推送到[Pushgateway](https://github.com/prometheus/pushgateway),可由Prometheus [抓取](https://github.com/prometheus/pushgateway)。 有关用例,请参阅[Prometheus文档](https://prometheus.io/docs/practices/pushing/)。 ### StatsD(org.apache.flink.metrics.statsd.StatsDReporter) 要使用此报告,您必须复制`/opt/flink-metrics-statsd-1.7-SNAPSHOT.jar`到`/lib`Flink发行版的文件夹中。 参数: * `host` - StatsD服务器主机 * `port` - StatsD服务器端口 配置示例: ``` metrics.reporter.stsd.class: org.apache.flink.metrics.statsd.StatsDReporter metrics.reporter.stsd.host: localhost metrics.reporter.stsd.port: 8125 ``` ### Datadog(org.apache.flink.metrics.datadog.DatadogHttpReporter) 要使用此报告,您必须复制`/opt/flink-metrics-datadog-1.7-SNAPSHOT.jar`到`/lib`Flink发行版的文件夹中。 注意Flink指标,如任何变量`<host>`,`<job_name>`,`<tm_id>`,`<subtask_index>`,`<task_name>`,和`<operator_name>`,将被发送到Datadog的标签。标签看起来像`host:localhost`和`job_name:myjobname`。 参数: * `apikey` - Datadog APIKeys * `tags` - (可选)发送到Datadog时将应用于度量标准的全局标记。标签应仅以逗号分隔 配置示例: ``` metrics.reporter.dghttp.class: org.apache.flink.metrics.datadog.DatadogHttpReporter metrics.reporter.dghttp.apikey: xxx metrics.reporter.dghttp.tags: myflinkapp,prod ``` ### Slf4j(org.apache.flink.metrics.slf4j.Slf4jReporter) 要使用此报告,您必须复制`/opt/flink-metrics-slf4j-1.7-SNAPSHOT.jar`到`/lib`Flink发行版的文件夹中。 配置示例: ``` metrics.reporter.slf4j.class: org.apache.flink.metrics.slf4j.Slf4jReporter metrics.reporter.slf4j.interval: 60 SECONDS ``` ## 系统指标 默认情况下,Flink会收集几个指标,这些指标可以提供有关当前状态的深入见解。本节是所有这些指标的参考。 下表通常包含5列: * “范围”列描述了用于生成系统范围的范围格式。例如,如果单元格包含“Operator”,则使用“metrics.scope.operator”的范围格式。如果单元格包含多个值(以斜杠分隔),则会针对不同的实体多次报告度量标准,例如作业和TaskManager。 * (可选)“Infix”列描述了哪个中缀附加到系统范围。 * “度量标准”列列出了为给定范围和中缀注册的所有度量标准的名称。 * “描述”列提供有关给定度量正在测量的信息。 * “类型”列描述了用于测量的度量类型。 请注意,中缀/指标名称列中的所有点仍受“metrics.delimiter”设置的约束。 因此,为了推断度量标识符: 1. 根据“范围”列获取范围格式 2. 如果存在,则将值附加到“中缀”列中,并考虑“metrics.delimiter”设置 3. 附加指标名称。 ### 中央处理器 | 范围 | 中缀 | 度量 | 描述 | 类型 | | --- | --- | --- | --- | --- | | **Job-/TaskManager** | Status.JVM.CPU | 加载 | JVM最近的CPU使用情况。 | 测量 | | 时间 | JVM使用的CPU时间。 | 测量 | ### 内存 | 范围 | 中缀 | 度量 | 描述 | 类型 | | --- | --- | --- | --- | --- | | **Job-/TaskManager** | Status.JVM.Memory | Heap.Used | 当前使用的堆内存量(以字节为单位)。 | 测量 | | Heap.Committed | 保证可供JVM使用的堆内存量(以字节为单位)。 | 测量 | | Heap.Max | 可用于内存管理的最大堆内存量(以字节为单位)。 | 测量 | | NonHeap.Used | 当前使用的非堆内存量(以字节为单位)。 | 测量 | | NonHeap.Committed | 保证JVM可用的非堆内存量(以字节为单位)。 | 测量 | | NonHeap.Max | 可用于内存管理的最大非堆内存量(以字节为单位)。 | 测量 | | Direct.Count | 直接缓冲池中的缓冲区数。 | 测量 | | Direct.MemoryUsed | JVM用于直接缓冲池的内存量(以字节为单位)。 | 测量 | | Direct.TotalCapacity | 直接缓冲池中所有缓冲区的总容量(以字节为单位)。 | 测量 | | Mapped.Count | 映射缓冲池中的缓冲区数。 | 测量 | | Mapped.MemoryUsed | JVM用于映射缓冲池的内存量(以字节为单位)。 | 测量 | | Mapped.TotalCapacity | 映射缓冲池中的缓冲区数(以字节为单位)。 | 测量 | ### 线程 | 范围 | 中缀 | 度量 | 描述 | 类型 | | --- | --- | --- | --- | --- | | **Job-/TaskManager** | Status.JVM.Threads | 计数 | 活动线程总数。 | 测量 | ### 垃圾收集 | 范围 | 中缀 | 度量 | 描述 | 类型 | | --- | --- | --- | --- | --- | | **Job-/TaskManager** | Status.JVM.GarbageCollector | <GarbageCollector> .Count之间 | 已发生的集合总数。 | 测量 | | <GarbageCollector>。时间 | 执行垃圾收集所花费的总时间。 | 测量 | ### 类加载器 | 范围 | 中缀 | 度量 | 描述 | 类型 | | --- | --- | --- | --- | --- | | **Job-/TaskManager** | Status.JVM.ClassLoader | ClassesLoaded | 自JVM启动以来加载的类总数。 | 测量 | | ClassesUnloaded | 自JVM启动以来卸载的类总数。 | 测量 | ### 网络 | 范围 | 中缀 | 度量 | 描述 | 类型 | | --- | --- | --- | --- | --- | | **TaskManager** | Status.Network | AvailableMemorySegments | 未使用的内存段数。 | 测量 | | TotalMemorySegments | 分配的内存段数。 | 测量 | | Task | buffers | inputQueueLength | 排队的输入缓冲区数。 | 测量 | | outputQueueLength | 排队输出缓冲区的数量。 | 测量 | | inPoolUsage | 估计输入缓冲区的使用情况。 | 测量 | | outPoolUsage | 估计输出缓冲区的使用情况。 | 测量 | | Network.<Input|Output>.<gate>(only available if taskmanager.net.detailed-metrics config option is set) | totalQueueLen | 所有输入/输出通道中排队缓冲区的总数。 | 测量 | | minQueueLen | 所有输入/输出通道中的最小排队缓冲区数。 | 测量 | | maxQueueLen | 所有输入/输出通道中的最大排队缓冲区数。 | 测量 | | avgQueueLen | 所有输入/输出通道中的平均缓冲区数。 | 测量 | ### 集群 | 范围 | 度量 | 描述 | 类型 | | --- | --- | --- | --- | | **JobManager** | numRegisteredTaskManagers | 注册任务管理员的数量。 | 测量 | | numRunningJobs | 正在运行的作业数量。 | 测量 | | taskSlotsAvailable | 可用任务槽的数量。 | 测量 | | taskSlotsTotal | 任务槽的总数。 | 测量 | ### 可用性 | 范围 | 度量 | 描述 | 类型 | | --- | --- | --- | --- | | **Job (only available on JobManager)** | restartingTime | 重新启动作业所花费的时间,或当前重新启动的持续时间(以毫秒为单位)。 | 测量 | | uptime | 作业运行的时间不间断。对于已完成的作业,返回-1(以毫秒为单位)。 | 测量 | | downtime | 对于当前处于故障/恢复状态的作业,在此中断期间经过的时间。对于正在运行的作业返回0,对于已完成的作业返回-1(以毫秒为单位)。 | 测量 | | fullRestarts | 自提交此作业以来完全重新启动的总次数。 | 测量 | ### 检查点 | 范围 | 度量 | 描述 | 类型 | | --- | --- | --- | --- | | **Job (only available on JobManager)** | lastCheckpointDuration | 完成最后一个检查点所花费的时间(以毫秒为单位)。 | 测量 | | lastCheckpointSize | 最后一个检查点的总大小(以字节为单位)。 | 测量 | | lastCheckpointExternalPath | 存储最后一个外部检查点的路径。 | 测量 | | lastCheckpointRestoreTimestamp | 在协调器上恢复最后一个检查点时的时间戳(以毫秒为单位)。 | 测量 | | lastCheckpointAlignmentBuffered | 在最后一个检查点的所有子任务上进行对齐期间的缓冲字节数(以字节为单位)。 | 测量 | | numberOfInProgressCheckpoints | 进行中检查点的数量。 | 测量 | | numberOfCompletedCheckpoints | 成功完成检查点的数量。 | 测量 | | numberOfFailedCheckpoints | 失败检查点的数量。 | 测量 | | totalNumberOfCheckpoints | 总检查点的数量(正在进行,已完成,失败)。 | 测量 | | Task | checkpointAlignmentTime | 最后一次屏障对齐完成所花费的时间(以纳秒为单位),或当前对齐到目前为止所用的时间(以纳秒为单位)。 | 测量 | ### IO | 范围 | 度量 | 描述 | 类型 | | --- | --- | --- | --- | | **Job (only available on TaskManager)** | <SOURCE_ID> <source_subtask_index> <operator_id> <operator_subtask_index> .latency | 从给定源子任务到算子子任务的延迟分布(以毫秒为单位)。 | 直方图 | | **任务** | numBytesInLocal | 此任务从本地源读取的总字节数。 | 计数器 | | numBytesInLocalPerSecond | 此任务每秒从本地源读取的字节数。 | 仪表 | | numBytesInRemote | 此任务从远程源读取的总字节数。 | 计数器 | | numBytesInRemotePerSecond | 此任务每秒从远程源读取的字节数。 | 仪表 | | numBuffersInLocal | 此任务从本地源读取的网络缓冲区总数。 | 计数器 | | numBuffersInLocalPerSecond | 此任务每秒从本地源读取的网络缓冲区数。 | 仪表 | | numBuffersInRemote | 此任务从远程源读取的网络缓冲区总数。 | 计数器 | | numBuffersInRemotePerSecond | 此任务每秒从远程源读取的网络缓冲区数。 | 仪表 | | numBytesOut | 此任务已发出的总字节数。 | 计数器 | | numBytesOutPerSecond | 此任务每秒发出的字节数。 | 仪表 | | numBuffersOut | 此任务已发出的网络缓冲区总数。 | 计数器 | | numBuffersOutPerSecond | 此任务每秒发出的网络缓冲区数。 | 仪表 | | **任务/算子** | numRecordsIn | 此 算子/任务已收到的记录总数。 | 计数器 | | numRecordsInPerSecond | 此 算子/任务每秒接收的记录数。 | 仪表 | | numRecordsOut | 此 算子/任务已发出的记录总数。 | 计数器 | | numRecordsOutPerSecond | 此 算子/任务每秒发送的记录数。 | 仪表 | | numLateRecordsDropped | 此算子/任务因迟到而丢失的记录数。 | 计数器 | | currentInputWatermark | 此 算子/任务收到的​​最后一个水印(以毫秒为单位)。**注意:**对于具有2个输入的算子/任务,这是最后收到的水印的最小值。 | 测量 | | **算子** | currentInput1Watermark | 此 算子在其第一个输入(毫秒)中收到的最后一个水印。**注意:**仅适用于具有2个输入的算子。 | 测量 | | currentInput2Watermark | 此 算子在其第二个输入中接收的最后一个水印(以毫秒为单位)。**注意:**仅适用于具有2个输入的算子。 | 测量 | | currentOutputWatermark | 此 算子发出的最后一个水印(以毫秒为单位)。 | 测量 | | numSplitsProcessed | 此数据源已处理的InputSplits总数(如果 算子是数据源)。 | 测量 | ### 连接器 #### Kafka连接器 | 范围 | 度量 | 用户变量 | 描述 | 类型 | | --- | --- | --- | --- | --- | | 算子 | commitsSucceeded | N / A | 如果启用了偏移提交并且启用了检查点,则成功向Kafka提交的偏移提交总数。 | 计数器 | | 算子 | commitsFailed | N / A | 如果启用了偏移提交并且启用了检查点,则Kafka的偏移提交失败总数。请注意,将偏移量提交回Kafka只是暴露消费者进度的一种方法,因此提交失败不会影响Flink的检查点分区偏移的完整性。 | 计数器 | | 算子 | committedOffsets | Topic,分区 | 对于每个分区,最后成功提交到Kafka的偏移量。可以通过主题名称和分区ID指定特定分区的度量标准。 | 测量 | | 算子 | currentOffsets | Topic,分区 | 消费者对每个分区的当前读取偏移量。可以通过主题名称和分区ID指定特定分区的度量标准。 | 测量 | #### Kinesis连接器 | 范围 | 度量 | 用户变量 | 描述 | 类型 | | --- | --- | --- | --- | --- | | 算子 | millisBehindLatest | stream,shardId | 对于每个Kinesis分片,消费者在流的头部后面的毫秒数,表示消费者当前时间落后多少。可以通过流名称和分片标识指定特定分片的度量标准。值为0表示记录处理被捕获,此时没有要处理的新记录。值-1表示该度量标准尚未报告。 | 测量 | | 算子 | sleepTimeMillis | stream,shardId | 消费者在从Kinesis获取记录之前花费的毫秒数。可以通过流名称和分片标识指定特定分片的度量标准。 | 测量 | | 算子 | maxNumberOfRecordsPerFetch | stream,shardId | 消费者在单个getRecords调用Kinesis时请求的最大记录数。如果ConsumerConfigConstants.SHARD_USE_ADAPTIVE_READS设置为true,则自适应地计算此值以最大化Kinesis的2 Mbps读取限制。 | 测量 | | 算子 | numberOfAggregatedRecordsPerFetch | stream,shardId | 消费者在单个getRecords调用Kinesis时获取的聚合Kinesis记录数。 | 测量 | | 算子 | numberOfDeggregatedRecordsPerFetch | stream,shardId | 消费者在单个getRecords调用Kinesis时获取的分解Kinesis记录的数量。 | 测量 | | 算子 | averageRecordSizeBytes | stream,shardId | Kinesis记录的平均大小(以字节为单位),由消费者在单个getRecords调用中获取。 | 测量 | | 算子 | runLoopTimeNanos | stream,shardId | 消费者在运行循环中花费的实际时间(以纳秒为单位)。 | 测量 | | 算子 | loopFrequencyHz | stream,shardId | 一秒钟内调用getRecords的次数。 | 测量 | | 算子 | bytesRequestedPerFetch | stream,shardId | 在一次调用getRecords中请求的字节数(2 Mbps / loopFrequencyHz)。 | 测量 | ### 系统资源 默认情况下禁用系统资源报告。当`metrics.system-resource` 启用下面列出的指标将是可利用的作业-与TaskManager。系统资源度量标准会定期更新,并显示已配置间隔(`metrics.system-resource-probing-interval`)的平均值。 系统资源报告要求在类路径上存在可选的依赖项(例如,放在Flink的`lib`目录中): * `com.github.oshi:oshi-core:3.4.0` (根据EPL 1.0许可证授权) 包括它的传递依赖: * `net.java.dev.jna:jna-platform:jar:4.2.2` * `net.java.dev.jna:jna:jar:4.2.2` 这方面的失败将被报告为启动期间`NoClassDefFoundError` 记录的警告消息`SystemResourcesMetricsInitializer`。 #### 系统CPU | 范围 | 中缀 | 度量 | 描述 | | --- | --- | --- | --- | | **Job-/TaskManager** | System.CPU | 用法 | 机器上CPU使用率的总体百分比。 | | 闲 | 机器上CPU空闲使用率的百分比。 | | SYS | 计算机上系统CPU使用率的百分比。 | | 用户 | 计算机上用户CPU使用率的百分比。 | | IOWAIT | 计算机上IOWait CPU使用率的百分比。 | | IRQ | 机器上Irq CPU使用率的百分比。 | | 软中断 | 计算机上SoftIrq CPU使用率的百分比。 | | 尼斯 | 在机器上使用Nice Idle的百分比。 | | Load1min | 平均CPU负载超过1分钟 | | Load5min | 平均CPU负载超过5分钟 | | Load15min | 平均CPU负载超过15分钟 | | UsageCPU * | 每个处理器的CPU使用率百分比 | #### 系统内存 | 范围 | 中缀 | 度量 | 描述 | | --- | --- | --- | --- | | **Job-/TaskManager** | System.Memory | 可得到 | 可用内存字节数 | | 总 | 总内存(字节) | | System.Swap | 用过的 | 使用的交换字节 | | 总 | 总交换字节数 | #### 系统网络 | 范围 | 中缀 | 度量 | 描述 | | --- | --- | --- | --- | | **Job-/TaskManager** | System.Network.INTERFACE_NAME | ReceiveRate | 平均接收速率,以每秒字节数为单位 | | SendRate | 平均发送速率,以字节/秒为单位 | ## 延迟跟踪 Flink允许跟踪通过系统传输的记录的延迟。默认情况下禁用此函数。为了使延迟跟踪你必须设置`latencyTrackingInterval`在无论是正数 [Flink配置](https://flink.sojb.cn/ops/config.html#metrics-latency-interval)或`ExecutionConfig`。 在`latencyTrackingInterval`,源将定期发出一个特殊的记录,称为`LatencyMarker`。标记包含从源发出记录时的时间戳。延迟标记不能超过常规用户记录,因此如果记录在算子面前排队,则会增加标记跟踪的延迟。 请注意,延迟标记不会考虑用户记录在算子中绕过它们的时间。特别是标记不考虑记录在窗口缓冲区中花费的时间。只有当算子无法接受新记录,因此他们排队时,使用标记测量的延迟才会反映出来。 所有中间 算子都会保存`n`每个源的最后一个延迟列表,以计算延迟分布。接收器算子保存每个源的列表,以及每个并行源实例,以允许检测由各个机器引起的延迟问题。 目前,Flink假定群集中所有计算机的时钟都是同步的。我们建议设置自动时钟同步服务(如NTP)以避免错误的延迟结果。 警告启用延迟指标可能会显着影响群集的性能。强烈建议仅将它们用于调试目的。 ## REST API集成 可以通过[Monitoring REST API](https://flink.sojb.cn/monitoring/rest_api.html)查询度量标准。 下面是可用端点列表,带有示例JSON响应。所有端点都是样本表单`http://hostname:8081/jobmanager/metrics`,下面我们仅列出URL 的_路径_部分。 尖括号中的值是变量,例如`http://hostname:8081/jobs/<jobid>/metrics`,必须请求变量`http://hostname:8081/jobs/7684be6004e4e955c2a558a9bc463f65/metrics`。 请求特定实体的指标: * `/jobmanager/metrics` * `/taskmanagers/<taskmanagerid>/metrics` * `/jobs/<jobid>/metrics` * `/jobs/<jobid>/vertices/<vertexid>/subtasks/<subtaskindex>` 请求在相应类型的所有实体之间聚合的指标: * `/taskmanagers/metrics` * `/jobs/metrics` * `/jobs/<jobid>/vertices/<vertexid>/subtasks/metrics` 请求在相应类型的所有实体的子集上聚合的度量标准: * `/taskmanagers/metrics?taskmanagers=A,B,C` * `/jobs/metrics?jobs=D,E,F` * `/jobs/<jobid>/vertices/<vertexid>/subtasks/metrics?subtask=1,2,3` 请求可用指标列表: `GET /jobmanager/metrics` ``` [ { "id": "metric1" }, { "id": "metric2" } ] ``` 请求特定(未聚合)指标的值: `GET taskmanagers/ABCDE/metrics?get=metric1,metric2` ``` [ { "id": "metric1", "value": "34" }, { "id": "metric2", "value": "2" } ] ``` 请求特定指标的汇总值: `GET /taskmanagers/metrics?get=metric1,metric2` ``` [ { "id": "metric1", "min": 1, "max": 34, "avg": 15, "sum": 45 }, { "id": "metric2", "min": 2, "max": 14, "avg": 7, "sum": 16 } ] ``` 请求特定指标的特定聚合值: `GET /taskmanagers/metrics?get=metric1,metric2&agg=min,max` ``` [ { "id": "metric1", "min": 1, "max": 34, }, { "id": "metric2", "min": 2, "max": 14, } ] ``` ## 仪表板集成 为每个任务或算子收集的度量标准也可以在仪表板中显示。在作业的主页面上,选择`Metrics`选项卡。选择顶部图表中的一个任务后,您可以使用`Add Metric`下拉菜单选择要显示的指标。 * 任务指标列为`<subtask_index>.<metric_name>`。 * 算子指标列为`<subtask_index>.<operator_name>.<metric_name>`。 每个度量将可视化为单独的图形,x轴表示时间,y轴表示测量值。所有图表每10秒自动更新一次,并在导航到另一页时继续更新。 可视化指标的数量没有限制; 但是只能显示数字指标。