# 基本API概念 > 译者:[flink.sojb.cn](https://flink.sojb.cn/) Flink程序是实现分布式集合转换的常规程序(例如,Filter,映射,更新状态,Join,分组,定义窗口,聚合)。集合最初是从源创建的(例如,通过读取文件,kafka主题或从本地的内存中集合)。结果通过接收器返回,接收器可以例如将数据写入(分布式)文件或标准输出(例如,命令行终端)。Flink程序可以在各种环境中运行,独立运行或嵌入其他程序中。执行可以在本地JVM中执行,也可以在许多计算机的集群上执行。 根据数据源的类型(即有界或无界源),您可以编写批处理程序或流程序,其中DataSet API用于批处理,DataStream API用于流式处理。本指南将介绍两种API共有的基本概念,但请参阅我们的 [流处理指南](https://flink.sojb.cn/dev/datastream_api.html)和 [批处理指南](https://flink.sojb.cn/dev/batch/index.html),了解有关使用每个API编写程序的具体信息。 **注:**当显示的API时,如何使用,我们将用实际的例子 `StreamingExecutionEnvironment`和`DataStream`API。`DataSet`API中的概念完全相同,只需替换为`ExecutionEnvironment`和`DataSet`。 ## DataSet和DataStream Flink具有特殊类`DataSet`并`DataStream`在程序中表示数据。您可以将它们视为可以包含重复项的不可变数据集合。在`DataSet`数据有限的情况下,对于一个`DataStream`数据元的数量可以是无界的。 这些集合在某些关键方面与常规Java集合不同。首先,它们是不可变的,这意味着一旦创建它们就无法添加或删除数据元。你也不能简单地检查里面的数据元。 集合最初通过在Flink程序添加源创建和新的集合从这些通过将它们使用API方法如衍生`map`,`filter`等等。 ## Flink计划的剖析 Flink程序看起来像是转换数据集合的常规程序。每个程序包含相同的基本部分: 1. 获得一个`execution environment`, 2. 加载/创建初始数据, 3. 指定此数据的转换, 4. 指定放置计算结果的位置, 5. 触发程序执行 * [**Java**](#tab_java_0) * [**Scala**](#tab_scala_0) 我们现在将概述每个步骤,请参阅相应部分以获取更多详细信息。请注意,Java DataSet API的所有核心类都可以在[org.apache.flink.api.java](https://github.com/apache/flink/blob/master//flink-java/src/main/java/org/apache/flink/api/java)包中找到, 而Java DataStream API的类可以在[org.apache.flink.streaming.api中](https://github.com/apache/flink/blob/master//flink-streaming-java/src/main/java/org/apache/flink/streaming/api)找到 。 这`StreamExecutionEnvironment`是所有Flink计划的基础。您可以使用以下静态方法获取一个`StreamExecutionEnvironment`: ``` getExecutionEnvironment() createLocalEnvironment() createRemoteEnvironment(String host, int port, String... jarFiles) ``` 通常,您只需要使用`getExecutionEnvironment()`,因为这将根据上下文做正确的事情:如果您在IDE中执行程序或作为常规Java程序,它将创建一个本地环境,将在本地计算机上执行您的程序。如果您从程序中创建了一个JAR文件,并通过[命令行](https://flink.sojb.cn/ops/cli.html)调用它 ,则Flink集群管理器将执行您的main方法并`getExecutionEnvironment()`返回一个运行环境,以便在集群上执行您的程序。 对于指定数据源,运行环境有几种方法可以使用各种方法从文件中读取:您可以逐行读取它们,CSV文件或使用完全自定义数据输入格式。要将文本文件作为一系列行读取,您可以使用: ``` final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream text = env.readTextFile("file:///path/to/file"); ``` 这将为您提供一个DataStream,然后您可以在其上应用转换来创建新的派生DataStream。 您可以通过使用转换函数调用DataStream上的方法来应用转换。例如,Map转换如下所示: ``` DataStream input = ...; DataStream parsed = input.map(new MapFunction() { @Override public Integer map(String value) { return Integer.parseInt(value); } }); ``` 这将通过将原始集合中的每个String转换为Integer来创建新的DataStream。 一旦有了包含最终结果的DataStream,就可以通过创建接收器将其写入外部系统。这些只是创建接收器的一些示例方法: ``` writeAsText(String path) print() ``` 我们现在将概述每个步骤,请参阅相应部分以获取更多详细信息。请注意,Scala DataSet API的所有核心类都可以在[org.apache.flink.api.scala](https://github.com/apache/flink/blob/master//flink-scala/src/main/scala/org/apache/flink/api/scala)包中找到, 而Scala DataStream API的类可以在[org.apache.flink.streaming.api.scala中](https://github.com/apache/flink/blob/master//flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala)找到 。 这`StreamExecutionEnvironment`是所有Flink计划的基础。您可以使用以下静态方法获取一个`StreamExecutionEnvironment`: ``` getExecutionEnvironment() createLocalEnvironment() createRemoteEnvironment(host: String, port: Int, jarFiles: String*) ``` 通常,您只需要使用`getExecutionEnvironment()`,因为这将根据上下文做正确的事情:如果您在IDE中执行程序或作为常规Java程序,它将创建一个本地环境,将在本地计算机上执行您的程序。如果您从程序中创建了一个JAR文件,并通过[命令行](//ci.apache.org/projects/flink/flink-docs-master/ops/cli.html)调用它 ,则Flink集群管理器将执行您的main方法并`getExecutionEnvironment()`返回一个运行环境,以便在集群上执行您的程序。 对于指定数据源,运行环境有几种方法可以使用各种方法从文件中读取:您可以逐行读取它们,CSV文件或使用完全自定义数据输入格式。要将文本文件作为一系列行读取,您可以使用: ``` val env = StreamExecutionEnvironment.getExecutionEnvironment() val text: DataStream[String] = env.readTextFile("file:///path/to/file") ``` 这将为您提供一个DataStream,然后您可以在其上应用转换来创建新的派生DataStream。 您可以通过使用转换函数调用DataSet上的方法来应用转换。例如,Map转换如下所示: ``` val input: DataSet[String] = ... val mapped = input.map { x => x.toInt } ``` 这将通过将原始集合中的每个String转换为Integer来创建新的DataStream。 一旦有了包含最终结果的DataStream,就可以通过创建接收器将其写入外部系统。这些只是创建接收器的一些示例方法: ``` writeAsText(path: String) print() ``` 一旦您指定的完整程序,你需要**触发执行程序**调用 `execute()`上`StreamExecutionEnvironment`。根据执行的类型,`ExecutionEnvironment`将在本地计算机上触发执行或提交程序以在群集上执行。 该`execute()`方法返回一个`JobExecutionResult`,包含执行时间和累加器结果。 有关流数据源和接收器的信息,请参阅[流指南](https://flink.sojb.cn/dev/datastream_api.html),以及有关DataStream上支持的转换的更深入信息。 有关批处理数据源和接收器的信息,请查看[批处理指南](https://flink.sojb.cn/dev/batch/index.html),以及有关DataSet支持的转换的更深入信息。 ## 懒惰的评价 所有Flink程序都是懒惰地执行:当执行程序的main方法时,数据加载和转换不会直接发生。而是创建每个 算子操作并将其添加到程序的计划中。当`execute()`运行环境上的调用显式触发执行时,实际执行 算子操作。程序是在本地执行还是在集群上执行取决于运行环境的类型 懒惰的评估使您可以构建Flink作为一个整体计划单元执行的复杂程序。 ## 指定Keys 某些转换(join,coGroup,keyBy,groupBy)要求在数据元集合上定义键。其他转换(Reduce,GroupReduce,Aggregate,Windows)允许数据在应用之前在Keys上分组。 DataSet被分组为 ``` DataSet<...> input = // [...] DataSet<...> reduced = input .groupBy(/*define key here*/) .reduceGroup(/*do something*/); ``` 虽然可以使用DataStream指定Keys ``` DataStream<...> input = // [...] DataStream<...> windowed = input .keyBy(/*define key here*/) .window(/*window specification*/); ``` Flink的数据模型不基于键值对。因此,您无需将数据集类型物理打包到键和值中。键是“虚拟的”:它们被定义为实际数据上的函数,以指导分组算子。 **注意:**在下面的讨论中,我们将使用`DataStream`API和`keyBy`。对于DataSet API,您只需要替换为`DataSet`和`groupBy`。 ### 定义元组的键 最简单的情况是在元组的一个或多个字段上对元组进行分组: * [**Java**](#tab_java_1) * [**Scala**](#tab_scala_1) ``` DataStream> input = // [...] KeyedStream,Tuple> keyed = input.keyBy(0) ``` ``` val input: DataStream[(Int, String, Long)] = // [...] val keyed = input.keyBy(0) ``` 元组在第一个字段(整数类型)上分组。 * [**Java**](#tab_java_2) * [**Scala**](#tab_scala_2) ``` DataStream> input = // [...] KeyedStream,Tuple> keyed = input.keyBy(0,1) ``` ``` val input: DataSet[(Int, String, Long)] = // [...] val grouped = input.groupBy(0,1) ``` 在这里,我们将元组分组在由第一个和第二个字段组成的复合键上。 关于嵌套元组的注释:如果你有一个带有嵌套元组的DataStream,例如: ``` DataStream,String,Long>> ds; ``` 指定`keyBy(0)`将导致系统使用full `Tuple2`作为键(以Integer和Float为键)。如果要“导航”到嵌套中`Tuple2`,则必须使用下面解释的字段表达式键。 ### 使用Field Expressions定义键 您可以使用基于字符串的字段表达式来引用嵌套字段,并定义用于分组,排序,连接或coGrouping的键。 字段表达式可以非常轻松地选择(嵌套)复合类型中的字段,例如[Tuple](#tuples-and-case-classes)和[POJO](#pojos)类型。 * [**Java**](#tab_java_3) * [**Scala**](#tab_scala_3) 在下面的示例中,我们有一个`WC`POJO,其中包含两个字段“word”和“count”。要按字段分组`word`,我们只需将其名称传递给`keyBy()`函数即可。 ``` // some ordinary POJO (Plain old Java Object) public class WC { public String word; public int count; } DataStream words = // [...] DataStream wordCounts = words.keyBy("word").window(/*window specification*/); ``` **字段表达式语法**: * 按字段名称选择POJO字段。例如,`"user"`指POJO类型的“用户”字段。 * 按字段名称或0偏移字段索引选择元组字段。例如`"f0"`,分别`"5"`引用Java Tuple类型的第一个和第六个字段。 * 您可以在POJO和Tuples中选择嵌套字段。例如,`"user.zip"`指POJO的“zip”字段,其存储在POJO类型的“user”字段中。支持POJO和元组的任意嵌套和混合,例如`"f1.user.zip"`或`"user.f3.1.zip"`。 * 您可以使用`"*"`通配符表达式选择完整类型。这也适用于非Tuple或POJO类型的类型。 **字段表达示例**: ``` public static class WC { public ComplexNestedClass complex; //nested POJO private int count; // getter / setter for private field (count) public int getCount() { return count; } public void setCount(int c) { this.count = c; } } public static class ComplexNestedClass { public Integer someNumber; public float someFloat; public Tuple3 word; public IntWritable hadoopCitizen; } ``` 这些是上面示例代码的有效字段表达式: * `"count"`:类中的count字段`WC`。 * `"complex"`:递归选择POJO类型的字段复合体的所有字段`ComplexNestedClass`。 * `"complex.word.f2"`:选择嵌套的最后一个字段`Tuple3`。 * `"complex.hadoopCitizen"`:选择Hadoop `IntWritable`类型。 在下面的示例中,我们有一个`WC`POJO,其中包含两个字段“word”和“count”。要按字段分组`word`,我们只需将其名称传递给`keyBy()`函数即可。 ``` // some ordinary POJO (Plain old Java Object) class WC(var word: String, var count: Int) { def this() { this("", 0L) } } val words: DataStream[WC] = // [...] val wordCounts = words.keyBy("word").window(/*window specification*/) // or, as a case class, which is less typing case class WC(word: String, count: Int) val words: DataStream[WC] = // [...] val wordCounts = words.keyBy("word").window(/*window specification*/) ``` **字段表达式语法**: * 按字段名称选择POJO字段。例如,`"user"`指POJO类型的“用户”字段。 * 通过1偏移字段名称或0偏移字段索引选择元组字段。例如`"_1"`,分别`"5"`引用Scala Tuple类型的第一个和第六个字段。 * 您可以在POJO和Tuples中选择嵌套字段。例如,`"user.zip"`指POJO的“zip”字段,其存储在POJO类型的“user”字段中。支持POJO和元组的任意嵌套和混合,例如`"_2.user.zip"`或`"user._4.1.zip"`。 * 您可以使用`"_"`通配符表达式选择完整类型。这也适用于非Tuple或POJO类型的类型。 **字段表达示例**: ``` class WC(var complex: ComplexNestedClass, var count: Int) { def this() { this(null, 0) } } class ComplexNestedClass( var someNumber: Int, someFloat: Float, word: (Long, Long, String), hadoopCitizen: IntWritable) { def this() { this(0, 0, (0, 0, ""), new IntWritable(0)) } } ``` 这些是上面示例代码的有效字段表达式: * `"count"`:类中的count字段`WC`。 * `"complex"`:递归选择POJO类型的字段复合体的所有字段`ComplexNestedClass`。 * `"complex.word._3"`:选择嵌套的最后一个字段`Tuple3`。 * `"complex.hadoopCitizen"`:选择Hadoop `IntWritable`类型。 ### 使用键选择器函数定义键 定义键的另一种方法是“键选择器”函数。键选择器函数将单个数据元作为输入并返回数据元的键。Keys可以是任何类型,并且可以从确定性计算中导出。 以下示例显示了一个键选择器函数,它只返回一个对象的字段: * [**Java**](#tab_java_4) * [**Scala**](#tab_scala_4) ``` // some ordinary POJO public class WC {public String word; public int count;} DataStream words = // [...] KeyedStream keyed = words .keyBy(new KeySelector() { public String getKey(WC wc) { return wc.word; } }); ``` ``` // some ordinary case class case class WC(word: String, count: Int) val words: DataStream[WC] = // [...] val keyed = words.keyBy( _.word ) ``` ## 指定转换函数 大多数转换都需要用户定义的函数。本节列出了如何指定它们的不同方法 * [**Java**](#tab_java_5) * [**Scala**](#tab_scala_5) #### 实现接口 最基本的方法是实现一个提供的接口: ``` class MyMapFunction implements MapFunction { public Integer map(String value) { return Integer.parseInt(value); } }; data.map(new MyMapFunction()); ``` #### 匿名课程 您可以将函数作为匿名类传递: ``` data.map(new MapFunction () { public Integer map(String value) { return Integer.parseInt(value); } }); ``` #### Java 8 Lambdas Flink还支持Java API中的Java 8 Lambdas。 ``` data.filter(s -> s.startsWith("http://")); ``` ``` data.reduce((i1,i2) -> i1 + i2); ``` #### 函数丰富 需要用户定义函数的所有转换都可以将_富_函数作为参数。例如,而不是 ``` class MyMapFunction implements MapFunction { public Integer map(String value) { return Integer.parseInt(value); } }; ``` 你可以写 ``` class MyMapFunction extends RichMapFunction { public Integer map(String value) { return Integer.parseInt(value); } }; ``` 并像往常一样将函数传递给`map`转换: ``` data.map(new MyMapFunction()); ``` 丰富的函数也可以定义为匿名类: ``` data.map (new RichMapFunction() { public Integer map(String value) { return Integer.parseInt(value); } }); ``` #### Lambda函数 正如前面的例子中所见,所有 算子操作都接受lambda函数来描述 算子操作: ``` val data: DataSet[String] = // [...] data.filter { _.startsWith("http://") } ``` ``` val data: DataSet[Int] = // [...] data.reduce { (i1,i2) => i1 + i2 } // or data.reduce { _ + _ } ``` #### 函数丰富 将lambda函数作为参数的所有转换都可以将_富_函数作为参数。例如,而不是 ``` data.map { x => x.toInt } ``` 你可以写 ``` class MyMapFunction extends RichMapFunction[String, Int] { def map(in: String):Int = { in.toInt } }; ``` 并将函数传递给`map`转换: ``` data.map(new MyMapFunction()) ``` 丰富的函数也可以定义为匿名类: ``` data.map (new RichMapFunction[String, Int] { def map(in: String):Int = { in.toInt } }) ``` 丰富的函数提供,除了用户定义的函数(Map,Reduce等),四种方法:`open`,`close`,`getRuntimeContext`,和 `setRuntimeContext`。这些用于参数化函数(请参阅将[参数传递给函数](https://flink.sojb.cn/dev/batch/index.html#passing-parameters-to-functions)),创建和完成本地状态,访问广播变量(请参阅 [广播变量](https://flink.sojb.cn/dev/batch/index.html#broadcast-variables))以及访问运行时信息(如累加器和计数器)(请参阅 [累加器和计数器](#accumulators--counters))以及有关信息的信息。迭代(参见[迭代](https://flink.sojb.cn/dev/batch/iterations.html))。 ## 支持的数据类型 Flink对可以在DataSet或DataStream中的数据元类型进行了一些限制。原因是系统分析类型以确定有效的执行策略。 有六种不同类别的数据类型: 1. **Java元组**和**Scala案例类** 2. **Java POJO** 3. **原始类型** 4. **常规课程** 5. **值** 6. **Hadoop Writables** 7. **特殊类型** #### 元组和案例类 * [**Java**](#tab_java_6) * [**Scala**](#tab_scala_6) 元组是包含固定数量的具有各种类型的字段的复合类型。Java API提供`Tuple1`最多的类`Tuple25`。元组的每个字段都可以是包含更多元组的任意Flink类型,从而产生嵌套元组。可以使用字段名称直接访问元组的字段`tuple.f4`,或使用通用getter方法 `tuple.getField(int position)`。字段索引从0开始。请注意,这与Scala元组形成鲜明对比,但它与Java常规索引更为一致。 ``` DataStream> wordCounts = env.fromElements( new Tuple2("hello", 1), new Tuple2("world", 2)); wordCounts.map(new MapFunction, Integer>() { @Override public Integer map(Tuple2 value) throws Exception { return value.f1; } }); wordCounts.keyBy(0); // also valid .keyBy("f0") ``` Scala案例类(和Scala元组是案例类的特例)是包含固定数量的具有各种类型的字段的复合类型。元组字段通过其1偏移名称来寻址,例如`_1`第一个字段。案例类字段按名称访问。 ``` case class WordCount(word: String, count: Int) val input = env.fromElements( WordCount("hello", 1), WordCount("world", 2)) // Case Class Data Set input.keyBy("word")// key by field expression "word" val input2 = env.fromElements(("hello", 1), ("world", 2)) // Tuple2 Data Set input2.keyBy(0, 1) // key by field positions 0 and 1 ``` #### POJOs 如果满足以下要求,则Flink将Java和Scala类视为特殊的POJO数据类型: * 这堂课必须公开。 * 它必须有一个没有参数的公共构造函数(默认构造函数)。 * 所有字段都是公共的,或者必须通过getter和setter函数访问。对于一个名为`foo`getter和setter方法的字段必须命名`getFoo()`和`setFoo()`。 * Flink必须支持字段的类型。目前,Flink使用[Avro](http://avro.apache.org)序列化任意对象(例如`Date`)。 Flink分析了POJO类型的结构,即它了解了POJO的字段。因此,POJO类型比一般类型更容易使用。此外,Flink可以比一般类型更有效地处理POJO。 以下示例显示了一个包含两个公共字段的简单POJO。 * [**Java**](#tab_java_7) * [**Scala**](#tab_scala_7) ``` public class WordWithCount { public String word; public int count; public WordWithCount() {} public WordWithCount(String word, int count) { this.word = word; this.count = count; } } DataStream wordCounts = env.fromElements( new WordWithCount("hello", 1), new WordWithCount("world", 2)); wordCounts.keyBy("word"); // key by field expression "word" ``` ``` class WordWithCount(var word: String, var count: Int) { def this() { this(null, -1) } } val input = env.fromElements( new WordWithCount("hello", 1), new WordWithCount("world", 2)) // Case Class Data Set input.keyBy("word")// key by field expression "word" ``` #### 原始类型 Flink支持所有Java和Scala的原始类型,如`Integer`,`String`和`Double`。 #### 一般类别 Flink支持大多数Java和Scala类(API和自定义)。限制适用于包含无法序列化的字段的类,如文件指针,I / O流或其他本机资源。遵循Java Beans约定的类通常可以很好地工作。 所有未标识为POJO类型的类(请参阅上面的POJO要求)都由Flink作为常规类类型处理。Flink将这些数据类型视为黑盒子,并且无法访问其内容(即,用于有效排序)。使用序列化框架[Kryo](https://github.com/EsotericSoftware/kryo)对常规类型进行反序列化。 #### 值 _值_类型手动描述它们的序列化和反序列化。它们不是通过通用序列化框架,而是通过`org.apache.flinktypes.Value`使用方法`read`和实现接口为这些 算子操作提供自定义代码`write`。当通用序列化效率非常低时,使用值类型是合理的。一个示例是将数据元的稀疏向量实现为数组的数据类型。知道数组大部分为零,可以对非零数据元使用特殊编码,而通用序列化只需编写所有数组数据元。 该`org.apache.flinktypes.CopyableValue`接口以类似的方式支持手动内部克隆逻辑。 Flink带有与基本数据类型对应的预定义值类型。(`ByteValue`, `ShortValue`,`IntValue`,`LongValue`,`FloatValue`,`DoubleValue`,`StringValue`,`CharValue`, `BooleanValue`)。这些Value类型充当基本数据类型的可变变体:它们的值可以被更改,允许程序员重用对象并从垃圾收集器中消除压力。 #### Hadoop Writables 您可以使用实现该`org.apache.hadoop.Writable`接口的类型。`write()`和`readFields()`方法中定义的序列化逻辑将用于序列化。 #### 特殊类型 您可以使用特殊类型,包括Scala的`Either`,`Option`和`Try`。Java API有自己的自定义实现`Either`。与Scala类似`Either`,它代表两种可能类型的值,_左_或_右_。 `Either`可用于错误处理或需要输出两种不同类型记录的 算子。 #### 类型擦除和类型推断 _注意:本节仅适用于Java。_ Java编译器在编译后抛弃了大部分泛型类型信息。这在Java中称为_类型擦除_。这意味着在运行时,对象的实例不再知道其泛型类型。例如,JVM的实例`DataStream<String>`和`DataStream<Long>`外观相同。 Flink在准备执行程序时(当调用程序的主要方法时)需要类型信息。Flink Java API尝试重建以各种方式丢弃的类型信息,并将其显式存储在数据集和 算子中。您可以通过检索类型`DataStream.getType()`。该方法返回一个实例`TypeInformation`,这是Flink表示类型的内部方式。 类型推断有其局限性,在某些情况下需要程序员的“合作”。这方面的示例是从集合创建数据集的方法,例如 `ExecutionEnvironment.fromCollection(),`您可以传递描述类型的参数的位置。但是通用函数`MapFunction<I, O>`也可能需要额外的类型信息。 该 [ResultTypeQueryable](https://github.com/apache/flink/blob/master//flink-core/src/main/java/org/apache/flink/api/java/typeutils/ResultTypeQueryable.java) 接口可以通过输入格式和函数来实现明确地告诉API他们的返回类型。调用函数的_输入类型_通常可以通过先前 算子操作的结果类型来推断。 ## 累加器和计数器 累加器是具有**添加 算子操作**和**最终累积结果的**简单构造,可在作业结束后使用。 最直接的累加器是一个**计数器**:您可以使用该`Accumulator.add(V value)`方法递增它 。在工作结束时,Flink将汇总(合并)所有部分结果并将结果发送给客户。在调试过程中,或者如果您想快速了解有关数据的更多信息,累加器非常有用。 Flink目前有以下**内置累加器**。它们中的每一个都实现了 [Accumulator](https://github.com/apache/flink/blob/master//flink-core/src/main/java/org/apache/flink/api/common/accumulators/Accumulator.java) 接口。 * [**IntCounter**](https://github.com/apache/flink/blob/master//flink-core/src/main/java/org/apache/flink/api/common/accumulators/IntCounter.java), [**LongCounter**](https://github.com/apache/flink/blob/master//flink-core/src/main/java/org/apache/flink/api/common/accumulators/LongCounter.java) 和[ **DoubleCounter**](https://github.com/apache/flink/blob/master//flink-core/src/main/java/org/apache/flink/api/common/accumulators/DoubleCounter.java):请参阅下面的使用计数器的示例。 * [**直方图**](https://github.com/apache/flink/blob/master//flink-core/src/main/java/org/apache/flink/api/common/accumulators/Histogram.java):离散数量的区间的直方图实现。在内部,它只是一个从Integer到Integer的映射。您可以使用它来计算值的分布,例如字数统计程序的每行字数的分布。 **如何使用累加器:** 首先,您必须在要使用它的用户定义转换函数中创建累加器对象(此处为计数器)。 ``` private IntCounter numLines = new IntCounter(); ``` 其次,您必须注册累加器对象,通常在_富_函数的`open()`方法中 。在这里您还可以定义名称。 ``` getRuntimeContext().addAccumulator("num-lines", this.numLines); ``` 您现在可以在 算子函数中的任何位置使用累加器,包括在`open()`和 `close()`方法中。 ``` this.numLines.add(1); ``` 整个结果将存储在`JobExecutionResult`从`execute()`运行环境的方法返回的对象中(当前这仅在执行等待作业完成时才有效)。 ``` myJobExecutionResult.getAccumulatorResult("num-lines") ``` 所有累加器每个作业共享一个命名空间。因此,您可以在作业的不同算子函数中使用相同的累加器。Flink将在内部合并所有具有相同名称的累加器。 关于累加器和迭代的注释:目前累加器的结果仅在整个作业结束后才可用。我们还计划在下一次迭代中使前一次迭代的结果可用。您可以使用 [聚合器](https://github.com/apache/flink/blob/master//flink-java/src/main/java/org/apache/flink/api/java/operators/IterativeDataSet.java#L98) 来计算每次迭代统计信息,并根据此类统计信息确定迭代的终止。 **定制累加器:** 要实现自己的累加器,只需编写Accumulator接口的实现即可。如果您认为自定义累加器应与Flink一起提供,请随意创建拉取请求。 您可以选择实现 [Accumulator](https://github.com/apache/flink/blob/master//flink-core/src/main/java/org/apache/flink/api/common/accumulators/Accumulator.java) 或[SimpleAccumulator](https://github.com/apache/flink/blob/master//flink-core/src/main/java/org/apache/flink/api/common/accumulators/SimpleAccumulator.java)。 `Accumulator<V,R>`最灵活:它定义`V`要添加的值的类型`R`,以及最终结果的结果类型。例如,对于直方图,`V`是数字并且`R`是直方图。`SimpleAccumulator`适用于两种类型相同的情况,例如计数器。