# 视窗 > 译者:[flink.sojb.cn](https://flink.sojb.cn/) Windows是处理无限流的核心。Windows将流拆分为有限大小的“桶”,我们可以在其上应用计算。本文档重点介绍如何在Flink中执行窗口,以及程序员如何从其提供的函数中获益最大化。 窗口Flink程序的一般结构如下所示。第一个片段指的是_被Keys化_流,而第二个片段指的_是非__被Keys化_流。正如人们所看到的,唯一的区别是`keyBy(...)`呼吁Keys流和`window(...)`成为`windowAll(...)`非被Key化的数据流。这也将作为页面其余部分的路线图。 **被Keys化Windows** ``` stream .keyBy(...) <- keyed versus non-keyed windows .window(...) <- required: "assigner" [.trigger(...)] <- optional: "trigger" (else default trigger) [.evictor(...)] <- optional: "evictor" (else no evictor) [.allowedLateness(...)] <- optional: "lateness" (else zero) [.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data) .reduce/aggregate/fold/apply() <- required: "function" [.getSideOutput(...)] <- optional: "output tag" ``` **非被Keys化Windows** ``` stream .windowAll(...) <- required: "assigner" [.trigger(...)] <- optional: "trigger" (else default trigger) [.evictor(...)] <- optional: "evictor" (else no evictor) [.allowedLateness(...)] <- optional: "lateness" (else zero) [.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data) .reduce/aggregate/fold/apply() <- required: "function" [.getSideOutput(...)] <- optional: "output tag" ``` 在上面,方括号([...])中的命令是可选的。这表明Flink允许您以多种不同方式自定义窗口逻辑,以便最适合您的需求。 ## 窗口生命周期 简而言之,只要应该属于此窗口的第一个数据元到达,就会**创建**一个窗口,当时间(事件或处理时间)超过其结束时间戳加上用户指定 时,窗口将被**完全删除**`allowed lateness`(请参阅[允许的延迟)](#allowed-lateness))。Flink保证仅删除基于时间的窗口而不是其他类型,_例如_全局窗口(请参阅[窗口分配器](#window-assigners))。例如,使用基于事件时间的窗口策略,每5分钟创建一个非重叠(或翻滚)的窗口,并允许延迟1分钟,Flink将创建一个新窗口,用于间隔`12:00`和`12:05`当具有落入此间隔的时间戳的第一个数据元到达时,当水印通过`12:06` 时间戳时它将删除它。 此外,每个窗口将具有`Trigger`(参见[触发器](#triggers))和一个函数(`ProcessWindowFunction`,`ReduceFunction`, `AggregateFunction`或`FoldFunction`)(见[窗口函数](#window-functions))连接到它。该函数将包含要应用于窗口内容的计算,而`Trigger`指定窗口被认为准备好应用该函数的条件。触发策略可能类似于“当窗口中的数据元数量大于4”时,或“当水印通过窗口结束时”。触发器还可以决定在创建和删除之间的任何时间清除窗口的内容。在这种情况下,清除仅指窗口中的数据元,而_不是_窗口元数据。这意味着仍然可以将新数据添加到该窗口。 除了上述内容之外,您还可以指定一个`Evictor`(参见[Evictors](#evictors)),它可以在触发器触发后以及应用函数之前和/或之后从窗口中删除数据元。 在下文中,我们将详细介绍上述每个组件。在转到可选部分之前,我们从上面代码段中的必需部分开始(请参阅[被Keys化与非被Keys化窗口](#keyed-vs-non-keyed-windows),[窗口分配器](#window-assigner)和 [窗口函数](#window-function))。 ## 被Keys化与非被Keys化Windows 要指定的第一件事是您的流是否应该键入。必须在定义窗口之前完成此 算子操作。使用the `keyBy(...)`将您的无限流分成逻辑被Key化的数据流。如果`keyBy(...)`未调用,则表示您的流不是被Keys化的。 对于被Key化的数据流,可以将传入事件的任何属性用作键([此处有](https://flink.sojb.cn/dev/api_concepts.html#specifying-keys)更多详细信息)。拥有被Key化的数据流将允许您的窗口计算由多个任务并行执行,因为每个逻辑被Key化的数据流可以独立于其余任务进行处理。引用相同Keys的所有数据元将被发送到同一个并行任务。 在非被Key化的数据流的情况下,您的原始流将不会被拆分为多个逻辑流,并且所有窗口逻辑将由单个任务执行,_即_并行度为1。 ## 窗口分配器 指定您的流是否已键入后,下一步是定义一个_窗口分配器_。窗口分配器定义如何将数据元分配给窗口。这是通过`WindowAssigner` 在`window(...)`(对于_被Keys化_流)或`windowAll()`(对于_非被Keys化_流)调用中指定您的选择来完成的。 A `WindowAssigner`负责将每个传入数据元分配给一个或多个窗口。Flink带有预定义的窗口分配器,用于最常见的用例,即_翻滚窗口_, _滑动窗口_,_会话窗口_和_全局窗口_。您还可以通过扩展`WindowAssigner`类来实现自定义窗口分配器。所有内置窗口分配器(全局窗口除外)都根据时间为窗口分配数据元,这可以是处理时间或事件时间。请查看我们关于[活动时间](https://flink.sojb.cn/dev/event_time.html)的部分,了解处理时间和事件时间之间的差异以及时间戳和水印的生成方式。 基于时间的窗口具有_开始时间戳_(包括)和_结束时间戳_(不包括),它们一起描述窗口的大小。在代码中,Flink在使用`TimeWindow`基于时间的窗口时使用,该窗口具有查询开始和结束时间戳的方法`maxTimestamp()`,以及返回给定窗口的最大允许时间戳的附加方法。 在下文中,我们将展示Flink的预定义窗口分配器如何工作以及如何在DataStream程序中使用它们。下图显示了每个分配者的工作情况。紫色圆圈表示流的数据元,这些数据元由某个键(在这种情况下是_用户1_,_用户2_和_用户3_)划分。x轴显示时间的进度。 ### 翻滚的Windows 一个_翻滚窗口_分配器的每个数据元分配给指定的窗口_的窗口大小_。翻滚窗具有固定的尺寸,不重叠。例如,如果指定大小为5分钟的翻滚窗口,则将评估当前窗口,并且每五分钟将启动一个新窗口,如下图所示。 ![](img/tumbling-windows.svg) 以下代码段显示了如何使用翻滚窗口。 * [**Java**](#tab_java_0) * [**Scala**](#tab_scala_0) ``` DataStream input = ...; // tumbling event-time windows input .keyBy() .window(TumblingEventTimeWindows.of(Time.seconds(5))) .(); // tumbling processing-time windows input .keyBy() .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .(); // daily tumbling event-time windows offset by -8 hours. input .keyBy() .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))) .(); ``` ``` val input: DataStream[T] = ... // tumbling event-time windows input .keyBy() .window(TumblingEventTimeWindows.of(Time.seconds(5))) .() // tumbling processing-time windows input .keyBy() .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .() // daily tumbling event-time windows offset by -8 hours. input .keyBy() .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))) .() ``` 时间间隔可以通过使用一个指定`Time.milliseconds(x)`,`Time.seconds(x)`, `Time.minutes(x)`,等等。 如上一个示例所示,翻滚窗口分配器还采用可选`offset` 参数,可用于更改窗口的对齐方式。例如,如果没有偏移每小时翻滚窗口划时代对齐,这是你会得到如窗口 `1:00:00.000 - 1:59:59.999`,`2:00:00.000 - 2:59:59.999`等等。如果你想改变它,你可以给出一个偏移量。随着15分钟的偏移量,你会,例如,拿 `1:15:00.000 - 2:14:59.999`,`2:15:00.000 - 3:14:59.999`等一个重要的用例的偏移是窗口调整到比UTC-0时区等。例如,在中国,您必须指定偏移量`Time.hours(-8)`。 ### 滑动窗口 该_滑动窗口_分配器分配元件以固定长度的窗口。与翻滚窗口分配器类似,_窗口大小_由_窗口大小_参数配置。附加的_窗口滑动_参数控制滑动窗口的启动频率。因此,如果幻灯片小于窗口大小,则滑动窗口可以重叠。在这种情况下,数据元被分配给多个窗口。 例如,您可以将大小为10分钟的窗口滑动5分钟。有了这个,你每隔5分钟就会得到一个窗口,其中包含过去10分钟内到达的事件,如下图所示。 ![](img/sliding-windows.svg) 以下代码段显示了如何使用滑动窗口。 * [**Java**](#tab_java_1) * [**Scala**](#tab_scala_1) ``` DataStream input = ...; // sliding event-time windows input .keyBy() .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))) .(); // sliding processing-time windows input .keyBy() .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))) .(); // sliding processing-time windows offset by -8 hours input .keyBy() .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8))) .(); ``` ``` val input: DataStream[T] = ... // sliding event-time windows input .keyBy() .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))) .() // sliding processing-time windows input .keyBy() .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))) .() // sliding processing-time windows offset by -8 hours input .keyBy() .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8))) .() ``` 时间间隔可以通过使用一个指定`Time.milliseconds(x)`,`Time.seconds(x)`, `Time.minutes(x)`,等等。 如上一个示例所示,滑动窗口分配器还采用可选`offset`参数,可用于更改窗口的对齐方式。例如,如果没有偏移每小时窗口半小时滑动与时代一致,那就是你会得到如窗口 `1:00:00.000 - 1:59:59.999`,`1:30:00.000 - 2:29:59.999`等等。如果你想改变它,你可以给出一个偏移量。随着15分钟的偏移量,你会,例如,拿 `1:15:00.000 - 2:14:59.999`,`1:45:00.000 - 2:44:59.999`等一个重要的用例的偏移是窗口调整到比UTC-0时区等。例如,在中国,您必须指定偏移量`Time.hours(-8)`。 ### 会话窗口 在_会话窗口_中按活动会话分配器组中的数据元。与_翻滚窗口_和_滑动窗口_相比,会话窗口不重叠并且没有固定的开始和结束时间。相反,当会话窗口在一段时间内没有接收到数据元时,_即_当发生不活动的间隙时,会关闭会话窗口。会话窗口分配器可以配置静态_会话间隙_或 _会话间隙提取器_函数,该函数定义不活动时间段的长度。当此期限到期时,当前会话将关闭,后续数据元将分配给新的会话窗口。 ![](img/session-windows.svg) 以下代码段显示了如何使用会话窗口。 * [**Java**](#tab_java_2) * [**Scala**](#tab_scala_2) ``` DataStream input = ...; // event-time session windows with static gap input .keyBy() .window(EventTimeSessionWindows.withGap(Time.minutes(10))) .(); // event-time session windows with dynamic gap input .keyBy() .window(EventTimeSessionWindows.withDynamicGap((element) -> { // determine and return session gap })) .(); // processing-time session windows with static gap input .keyBy() .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10))) .(); // processing-time session windows with dynamic gap input .keyBy() .window(ProcessingTimeSessionWindows.withDynamicGap((element) -> { // determine and return session gap })) .(); ``` ``` val input: DataStream[T] = ... // event-time session windows with static gap input .keyBy() .window(EventTimeSessionWindows.withGap(Time.minutes(10))) .() // event-time session windows with dynamic gap input .keyBy() .window(EventTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[String] { override def extract(element: String): Long = { // determine and return session gap } })) .() // processing-time session windows with static gap input .keyBy() .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10))) .() // processing-time session windows with dynamic gap input .keyBy() .window(DynamicProcessingTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[String] { override def extract(element: String): Long = { // determine and return session gap } })) .() ``` 静态间隙可以通过使用中的一个来指定`Time.milliseconds(x)`,`Time.seconds(x)`, `Time.minutes(x)`,等。 通过实现`SessionWindowTimeGapExtractor`接口指定动态间隙。 注意由于会话窗口没有固定的开始和结束,因此它们的评估方式与翻滚和滑动窗口不同。在内部,会话窗口算子为每个到达的记录创建一个新窗口,如果它们彼此之间的距离比定义的间隙更接近,则将窗口合并在一起。为了可合并的,会话窗口 算子操作者需要一个合并[触发器](#triggers)和一个合并 [的窗函数](#window-functions),如`ReduceFunction`,`AggregateFunction`,或`ProcessWindowFunction` (`FoldFunction`不能合并。) ### 全局Windows 一个_全局性的窗口_分配器分配使用相同的Keys相同的单个的所有数据元_全局窗口_。此窗口方案仅在您还指定自定义[触发器](#triggers)时才有用。否则,将不执行任何计算,因为全局窗口没有我们可以处理聚合数据元的自然结束。 ![](img/non-windowed.svg) 以下代码段显示了如何使用全局窗口。 * [**Java**](#tab_java_3) * [**Scala**](#tab_scala_3) ``` DataStream input = ...; input .keyBy() .window(GlobalWindows.create()) .(); ``` ``` val input: DataStream[T] = ... input .keyBy() .window(GlobalWindows.create()) .() ``` ## 窗口函数 定义窗口分配器后,我们需要指定要在每个窗口上执行的计算。这是_窗口函数_的职责,_窗口函数_用于在系统确定窗口准备好进行处理后处理每个(可能是被Keys化的)窗口的数据元(请参阅Flink如何确定窗口何时准备好的[触发器](#triggers))。 的窗函数可以是一个`ReduceFunction`,`AggregateFunction`,`FoldFunction`或`ProcessWindowFunction`。前两个可以更有效地执行(参见[State Size](#state size)部分),因为Flink可以在每个窗口到达时递增地聚合它们的数据元。A `ProcessWindowFunction`获取`Iterable`窗口中包含的所有数据元以及有关数据元所属窗口的其他元信息。 具有a的窗口转换`ProcessWindowFunction`不能像其他情况一样有效地执行,因为Flink必须在调用函数之前在内部缓冲窗口的_所有_数据元。这可以通过组合来减轻`ProcessWindowFunction`与`ReduceFunction`,`AggregateFunction`或`FoldFunction`以获得两个窗口元件的增量聚合并且该附加元数据窗口 `ProcessWindowFunction`接收。我们将查看每个变体的示例。 ### ReduceFunction A `ReduceFunction`指定如何组合输入中的两个数据元以生成相同类型的输出数据元。Flink使用a `ReduceFunction`来递增地聚合窗口的数据元。 A `ReduceFunction`可以像这样定义和使用: * [**Java**](#tab_java_4) * [**Scala**](#tab_scala_4) ``` DataStream> input = ...; input .keyBy() .window() .reduce(new ReduceFunction> { public Tuple2 reduce(Tuple2 v1, Tuple2 v2) { return new Tuple2<>(v1.f0, v1.f1 + v2.f1); } }); ``` ``` val input: DataStream[(String, Long)] = ... input .keyBy() .window() .reduce { (v1, v2) => (v1._1, v1._2 + v2._2) } ``` 上面的示例总结了窗口中所有数据元的元组的第二个字段。 ### 聚合函数 An `AggregateFunction`是一个通用版本,`ReduceFunction`它有三种类型:输入类型(`IN`),累加器类型(`ACC`)和输出类型(`OUT`)。输入类型是输入流中数据元的类型,并且`AggregateFunction`具有将一个输入数据元添加到累加器的方法。该接口还具有用于创建初始累加器的方法,用于将两个累加器合并到一个累加器中以及用于`OUT`从累加器提取输出(类型)。我们将在下面的示例中看到它的工作原理。 与之相同`ReduceFunction`,Flink将在窗口到达时递增地聚合窗口的输入数据元。 一个`AggregateFunction`可以被定义并这样使用: * [**Java**](#tab_java_5) * [**Scala**](#tab_scala_5) ``` /** * The accumulator is used to keep a running sum and a count. The {@code getResult} method * computes the average. */ private static class AverageAggregate implements AggregateFunction, Tuple2, Double> { @Override public Tuple2 createAccumulator() { return new Tuple2<>(0L, 0L); } @Override public Tuple2 add(Tuple2 value, Tuple2 accumulator) { return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L); } @Override public Double getResult(Tuple2 accumulator) { return ((double) accumulator.f0) / accumulator.f1; } @Override public Tuple2 merge(Tuple2 a, Tuple2 b) { return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1); } } DataStream> input = ...; input .keyBy() .window() .aggregate(new AverageAggregate()); ``` ``` /** * The accumulator is used to keep a running sum and a count. The [getResult] method * computes the average. */ class AverageAggregate extends AggregateFunction[(String, Long), (Long, Long), Double] { override def createAccumulator() = (0L, 0L) override def add(value: (String, Long), accumulator: (Long, Long)) = (accumulator._1 + value._2, accumulator._2 + 1L) override def getResult(accumulator: (Long, Long)) = accumulator._1 / accumulator._2 override def merge(a: (Long, Long), b: (Long, Long)) = (a._1 + b._1, a._2 + b._2) } val input: DataStream[(String, Long)] = ... input .keyBy() .window() .aggregate(new AverageAggregate) ``` 上面的示例计算窗口中数据元的第二个字段的平均值。 ### FoldFunction A `FoldFunction`指定窗口的输入数据元如何与输出类型的数据元组合。所述`FoldFunction`递增称为该被添加到窗口和电流输出值的每个数据元。第一个数据元与输出类型的预定义初始值组合。 A `FoldFunction`可以像这样定义和使用: * [**Java**](#tab_java_6) * [**Scala**](#tab_scala_6) ``` DataStream> input = ...; input .keyBy() .window() .fold("", new FoldFunction, String>> { public String fold(String acc, Tuple2 value) { return acc + value.f1; } }); ``` ``` val input: DataStream[(String, Long)] = ... input .keyBy() .window() .fold("") { (acc, v) => acc + v._2 } ``` 上面的示例将所有输入`Long`值附加到最初为空`String`。 注意 `fold()`不能与会话窗口或其他可合并窗口一起使用。 ### ProcessWindowFunction ProcessWindowFunction获取包含窗口的所有数据元的Iterable,以及可访问时间和状态信息的Context对象,这使其能够提供比其他窗口函数更多的灵活性。这是以性能和资源消耗为代价的,因为数据元不能以递增方式聚合,而是需要在内部进行缓冲,直到窗口被认为已准备好进行处理。 `ProcessWindowFunction`外观签名如下: * [**Java**](#tab_java_7) * [**Scala**](#tab_scala_7) ``` public abstract class ProcessWindowFunction implements Function { /** * Evaluates the window and outputs none or several elements. * * @param key The key for which this window is evaluated. * @param context The context in which the window is being evaluated. * @param elements The elements in the window being evaluated. * @param out A collector for emitting elements. * * @throws Exception The function may throw exceptions to fail the program and trigger recovery. */ public abstract void process( KEY key, Context context, Iterable elements, Collector out) throws Exception; /** * The context holding window metadata. */ public abstract class Context implements java.io.Serializable { /** * Returns the window that is being evaluated. */ public abstract W window(); /** Returns the current processing time. */ public abstract long currentProcessingTime(); /** Returns the current event-time watermark. */ public abstract long currentWatermark(); /** * State accessor for per-key and per-window state. * *

NOTE:If you use per-window state you have to ensure that you clean it up * by implementing {@link ProcessWindowFunction#clear(Context)}. */ public abstract KeyedStateStore windowState(); /** * State accessor for per-key global state. */ public abstract KeyedStateStore globalState(); } } ``` ``` abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window] extends Function { /** * Evaluates the window and outputs none or several elements. * * @param key The key for which this window is evaluated. * @param context The context in which the window is being evaluated. * @param elements The elements in the window being evaluated. * @param out A collector for emitting elements. * @throws Exception The function may throw exceptions to fail the program and trigger recovery. */ def process( key: KEY, context: Context, elements: Iterable[IN], out: Collector[OUT]) /** * The context holding window metadata */ abstract class Context { /** * Returns the window that is being evaluated. */ def window: W /** * Returns the current processing time. */ def currentProcessingTime: Long /** * Returns the current event-time watermark. */ def currentWatermark: Long /** * State accessor for per-key and per-window state. */ def windowState: KeyedStateStore /** * State accessor for per-key global state. */ def globalState: KeyedStateStore } } ``` 注意该`key`参数是通过`KeySelector`为`keyBy()`调用指定的Keys提取的Keys。在元组索引键或字符串字段引用的情况下,此键类型始终是`Tuple`,您必须手动将其转换为正确大小的元组以提取键字段。 A `ProcessWindowFunction`可以像这样定义和使用: * [**Java**](#tab_java_8) * [**Scala**](#tab_scala_8) ``` DataStream> input = ...; input .keyBy(t -> t.f0) .timeWindow(Time.minutes(5)) .process(new MyProcessWindowFunction()); /* ... */ public class MyProcessWindowFunction extends ProcessWindowFunction, String, String, TimeWindow> { @Override public void process(String key, Context context, Iterable> input, Collector out) { long count = 0; for (Tuple2 in: input) { count++; } out.collect("Window: " + context.window() + "count: " + count); } } ``` ``` val input: DataStream[(String, Long)] = ... input .keyBy(_._1) .timeWindow(Time.minutes(5)) .process(new MyProcessWindowFunction()) /* ... */ class MyProcessWindowFunction extends ProcessWindowFunction[(String, Long), String, String, TimeWindow] { def process(key: String, context: Context, input: Iterable[(String, Long)], out: Collector[String]): () = { var count = 0L for (in <- input) { count = count + 1 } out.collect(s"Window ${context.window} count: $count") } } ``` 该示例显示了`ProcessWindowFunction`对窗口中的数据元进行计数的情况。此外,窗口函数将有关窗口的信息添加到输出。 注意注意,使用`ProcessWindowFunction`简单的聚合(例如count)是非常低效的。下一节将介绍a `ReduceFunction`或如何`AggregateFunction`与a结合使用`ProcessWindowFunction`以获得增量聚合和a的附加信息`ProcessWindowFunction`。 ### ProcessWindowFunction with Incremental Aggregation 当a 数据元到达窗口时,A `ProcessWindowFunction`可以与a `ReduceFunction`,an `AggregateFunction`或a组合`FoldFunction`以递增地聚合数据元。当窗口关闭时,`ProcessWindowFunction`将提供聚合结果。这允许它在访问附加窗口元信息的同时递增地计算窗口`ProcessWindowFunction`。 注意您也可以使用旧版`WindowFunction`而不是 `ProcessWindowFunction`增量窗口聚合。 #### 使用ReduceFunction增量窗口聚合 以下示例显示如何将增量`ReduceFunction`与a组合`ProcessWindowFunction`以返回窗口中的最小事件以及窗口的开始时间。 * [**Java**](#tab_java_9) * [**Scala**](#tab_scala_9) ``` DataStream input = ...; input .keyBy() .timeWindow() .reduce(new MyReduceFunction(), new MyProcessWindowFunction()); // Function definitions private static class MyReduceFunction implements ReduceFunction { public SensorReading reduce(SensorReading r1, SensorReading r2) { return r1.value() > r2.value() ? r2 : r1; } } private static class MyProcessWindowFunction extends ProcessWindowFunction, String, TimeWindow> { public void process(String key, Context context, Iterable minReadings, Collector> out) { SensorReading min = minReadings.iterator().next(); out.collect(new Tuple2(window.getStart(), min)); } } ``` ``` val input: DataStream[SensorReading] = ... input .keyBy() .timeWindow() .reduce( (r1: SensorReading, r2: SensorReading) => { if (r1.value > r2.value) r2 else r1 }, ( key: String, window: TimeWindow, minReadings: Iterable[SensorReading], out: Collector[(Long, SensorReading)] ) => { val min = minReadings.iterator.next() out.collect((window.getStart, min)) } ) ``` #### 使用AggregateFunction进行增量窗口聚合 以下示例显示如何将增量`AggregateFunction`与a组合`ProcessWindowFunction`以计算平均值,并同时发出键和窗口以及平均值。 * [**Java**](#tab_java_10) * [**Scala**](#tab_scala_10) ``` DataStream> input = ...; input .keyBy() .timeWindow() .aggregate(new AverageAggregate(), new MyProcessWindowFunction()); // Function definitions /** * The accumulator is used to keep a running sum and a count. The {@code getResult} method * computes the average. */ private static class AverageAggregate implements AggregateFunction, Tuple2, Double> { @Override public Tuple2 createAccumulator() { return new Tuple2<>(0L, 0L); } @Override public Tuple2 add(Tuple2 value, Tuple2 accumulator) { return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L); } @Override public Double getResult(Tuple2 accumulator) { return ((double) accumulator.f0) / accumulator.f1; } @Override public Tuple2 merge(Tuple2 a, Tuple2 b) { return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1); } } private static class MyProcessWindowFunction extends ProcessWindowFunction, String, TimeWindow> { public void process(String key, Context context, Iterable averages, Collector> out) { Double average = averages.iterator().next(); out.collect(new Tuple2<>(key, average)); } } ``` ``` val input: DataStream[(String, Long)] = ... input .keyBy() .timeWindow() .aggregate(new AverageAggregate(), new MyProcessWindowFunction()) // Function definitions /** * The accumulator is used to keep a running sum and a count. The [getResult] method * computes the average. */ class AverageAggregate extends AggregateFunction[(String, Long), (Long, Long), Double] { override def createAccumulator() = (0L, 0L) override def add(value: (String, Long), accumulator: (Long, Long)) = (accumulator._1 + value._2, accumulator._2 + 1L) override def getResult(accumulator: (Long, Long)) = accumulator._1 / accumulator._2 override def merge(a: (Long, Long), b: (Long, Long)) = (a._1 + b._1, a._2 + b._2) } class MyProcessWindowFunction extends ProcessWindowFunction[Double, (String, Double), String, TimeWindow] { def process(key: String, context: Context, averages: Iterable[Double], out: Collector[(String, Double]): () = { val average = averages.iterator.next() out.collect((key, average)) } } ``` #### 使用FoldFunction进行增量窗口聚合 以下示例显示如何将增量`FoldFunction`与a组合`ProcessWindowFunction`以提取窗口中的事件数,并返回窗口的键和结束时间。 * [**Java**](#tab_java_11) * [**Scala**](#tab_scala_11) ``` DataStream input = ...; input .keyBy() .timeWindow() .fold(new Tuple3("",0L, 0), new MyFoldFunction(), new MyProcessWindowFunction()) // Function definitions private static class MyFoldFunction implements FoldFunction > { public Tuple3 fold(Tuple3 acc, SensorReading s) { Integer cur = acc.getField(2); acc.setField(cur + 1, 2); return acc; } } private static class MyProcessWindowFunction extends ProcessWindowFunction, Tuple3, String, TimeWindow> { public void process(String key, Context context, Iterable> counts, Collector> out) { Integer count = counts.iterator().next().getField(2); out.collect(new Tuple3(key, context.window().getEnd(),count)); } } ``` ``` val input: DataStream[SensorReading] = ... input .keyBy() .timeWindow() .fold ( ("", 0L, 0), (acc: (String, Long, Int), r: SensorReading) => { ("", 0L, acc._3 + 1) }, ( key: String, window: TimeWindow, counts: Iterable[(String, Long, Int)], out: Collector[(String, Long, Int)] ) => { val count = counts.iterator.next() out.collect((key, window.getEnd, count._3)) } ) ``` ### 在ProcessWindowFunction中使用每窗口状态 除了访问被Keys化状态(如任何丰富的函数可以),a `ProcessWindowFunction`还可以使用被Keys化状态,该被Keys化状态的作用域是函数当前正在处理的窗口。在这种情况下,了解_每个窗口_状态所指的窗口是很重要的。涉及不同的“窗口”: * 指定窗口 算子操作时定义的窗口:这可能是_1小时的翻滚窗口_或_滑动__1小时__的2小时滑动窗口_。 * 给定键的已定义窗口的实际实例:_对于user-id xyz,_这可能是_从12:00到13:00的时间窗口_。这基于窗口定义,并且将基于作业当前正在处理的键的数量以及基于事件落入的时隙而存在许多窗口。 每窗口状态与后两者相关联。这意味着如果我们处理1000个不同键的事件,并且所有这些事件的事件当前都落入_[12:00,13:00]_时间窗口,那么将有1000个窗口实例,每个窗口实例都有自己的按键每窗口状态。 调用接收的`Context`对象有两种方法`process()`允许访问两种类型的状态: * `globalState()`,允许访问没有作用于窗口的被Keys化状态 * `windowState()`,允许访问也限定在窗口范围内的被Keys化状态 如果您预计同一窗口会多次触发,则此函数非常有用,如果您对迟到的数据进行后期触发或者您有自定义触发器进行推测性早期触发时可能会发生这种情况。在这种情况下,您将存储有关先前点火的信息或每个窗口状态的点火次数。 使用窗口状态时,清除窗口时清除该状态也很重要。这应该在`clear()`方法中发生。 ### WindowFunction(留存) 在一些`ProcessWindowFunction`可以使用的地方你也可以使用`WindowFunction`。这是较旧版本`ProcessWindowFunction`,提供较少的上下文信息,并且没有一些高级函数,例如每窗口被Keys化状态。此接口将在某个时候弃用。 `WindowFunction`外观的签名如下: * [**Java**](#tab_java_12) * [**Scala**](#tab_scala_12) ``` public interface WindowFunction extends Function, Serializable { /** * Evaluates the window and outputs none or several elements. * * @param key The key for which this window is evaluated. * @param window The window that is being evaluated. * @param input The elements in the window being evaluated. * @param out A collector for emitting elements. * * @throws Exception The function may throw exceptions to fail the program and trigger recovery. */ void apply(KEY key, W window, Iterable input, Collector out) throws Exception; } ``` ``` trait WindowFunction[IN, OUT, KEY, W <: Window] extends Function with Serializable { /** * Evaluates the window and outputs none or several elements. * * @param key The key for which this window is evaluated. * @param window The window that is being evaluated. * @param input The elements in the window being evaluated. * @param out A collector for emitting elements. * @throws Exception The function may throw exceptions to fail the program and trigger recovery. */ def apply(key: KEY, window: W, input: Iterable[IN], out: Collector[OUT]) } ``` 它可以像这样使用: * [**Java**](#tab_java_13) * [**Scala**](#tab_scala_13) ``` DataStream> input = ...; input .keyBy() .window() .apply(new MyWindowFunction()); ``` ``` val input: DataStream[(String, Long)] = ... input .keyBy() .window() .apply(new MyWindowFunction()) ``` ## 触发器 A `Trigger`确定何时_窗口函数_准备好处理窗口(由_窗口分配器_形成)。每个都有默认值。如果默认触发器不符合您的需要,您可以使用指定自定义触发器。`WindowAssigner``Trigger``trigger(...)` 触发器界面有五种方法可以`Trigger`对不同的事件做出反应: * `onElement()`为添加到窗口的每个数据元调用该方法。 * `onEventTime()`在注册的事件时间计时器触发时调用该方法。 * `onProcessingTime()`在注册的处理时间计时器触发时调用该方法。 * 该`onMerge()`方法与状态触发器相关,并且当它们的相应窗口合并时合并两个触发器的状态,_例如_当使用会话窗口时。 * 最后,该`clear()`方法在移除相应窗口时执行所需的任何动作。 关于上述方法需要注意两点: 1)前三个决定如何通过返回a来对其调用事件进行 算子操作`TriggerResult`。该 算子操作可以是以下之一: * `CONTINUE`: 没做什么, * `FIRE`:触发​​计算, * `PURGE`:清除窗口中的数据元,和 * `FIRE_AND_PURGE`:触发​​计算并清除窗口中的数据元。 2)这些方法中的任何一种都可用于注册处理或事件时间计时器以用于将来的 算子操作。 ### 火与清除 一旦触发器确定窗口已准备好进行处理,它就会触发,_即_它返回`FIRE`或`FIRE_AND_PURGE`。这是窗口算子发出当前窗口结果的信号。给定一个窗口,将`ProcessWindowFunction` 所有数据元传递给`ProcessWindowFunction`(可能在将它们传递给逐出器后)。窗口`ReduceFunction`,`AggregateFunction`或`FoldFunction`简单地发出他们急切地汇总结果。 当触发器触发时,它可以`FIRE`或者`FIRE_AND_PURGE`。虽然`FIRE`保持了窗口的内容,`FIRE_AND_PURGE`删除其内容。默认情况下,预先实现的触发器只是`FIRE`没有清除窗口状态。 注意清除将简单地删除窗口的内容,并将保存有关窗口和任何触发状态的任何潜在元信息。 ### WindowAssigners的默认触发器 默认`Trigger`的`WindowAssigner`是适用于许多使用情况。例如,所有事件时间窗口分配器都具有`EventTimeTrigger`默认触发器。一旦水印通过窗口的末端,该触发器就会触发。 注意默认触发器`GlobalWindow`是`NeverTrigger`从不触发的。因此,在使用时必须定义自定义触发器`GlobalWindow`。 注意通过使用`trigger()`您指定触发器会覆盖a的默认触发器`WindowAssigner`。例如,如果指定a `CountTrigger`,`TumblingEventTimeWindows`则不再根据时间进度获取窗口,而是仅按计数。现在,如果你想根据时间和数量做出反应,你必须编写自己的自定义触发器。 ### 内置和自定义触发器 Flink附带了一些内置触发器。 * (已经提到的)`EventTimeTrigger`基于水印测量的事件时间的进展而触发。 * 在`ProcessingTimeTrigger`基于处理时间的火灾。 * `CountTrigger`一旦窗口中的数据元数量超过给定限制,就会触发。 * 将`PurgingTrigger`另一个触发器作为参数作为参数并将其转换为清除触发器。 如果需要实现自定义触发器,则应该检查抽象 [Trigger](https://github.com/apache/flink/blob/master//flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java)类。请注意,API仍在不断发展,可能会在Flink的未来版本中发生变化。 ## 逐出器 Flink的窗口模型允许指定`Evictor`除了`WindowAssigner`和之外的可选项`Trigger`。这可以使用`evictor(...)`方法(在本文档的开头显示)来完成。所述逐出器必须从一个窗口中删除数据元的能力_之后_触发器触发和_之前和/或之后_被施加的窗口函数。为此,该`Evictor`接口有两种方法: ``` /** * Optionally evicts elements. Called before windowing function. * * @param elements The elements currently in the pane. * @param size The current number of elements in the pane. * @param window The {@link Window} * @param evictorContext The context for the Evictor */ void evictBefore(Iterable> elements, int size, W window, EvictorContext evictorContext); /** * Optionally evicts elements. Called after windowing function. * * @param elements The elements currently in the pane. * @param size The current number of elements in the pane. * @param window The {@link Window} * @param evictorContext The context for the Evictor */ void evictAfter(Iterable> elements, int size, W window, EvictorContext evictorContext); ``` 在`evictBefore()`包含窗口函数之前被施加驱逐逻辑,而`evictAfter()` 包含窗口函数之后要施加的一个。在应用窗口函数之前被逐出的数据元将不会被处理。 Flink带有三个预先实施的驱逐者。这些是: * `CountEvictor`:保持窗口中用户指定数量的数据元,并从窗口缓冲区的开头丢弃剩余的数据元。 * `DeltaEvictor`:取a `DeltaFunction`和a `threshold`,计算窗口缓冲区中最后一个数据元与其余每个数据元之间的差值,并删除delta大于或等于阈值的值。 * `TimeEvictor`:以`interval`毫秒为单位作为参数,对于给定窗口,它查找`max_ts`其数据元中的最大时间戳,并删除时间戳小于的所有数据元`max_ts - interval`。 默认默认情况下,所有预先实现的驱逐程序在窗口函数之前应用它们的逻辑。 注意指定逐出器会阻止任何预聚合,因为在应用计算之前,必须将窗口的所有数据元传递给逐出器。 注意 Flink不保证窗口中数据元的顺序。这意味着尽管逐出器可以从窗口的开头移除数据元,但这些数据元不一定是首先或最后到达的数据元。 ## 允许迟到 当使用_事件时间_窗口时,可能会发生数据元迟到的情况,_即_ Flink用于跟踪事件时间进度的水印已经超过数据元所属的窗口的结束时间戳。查看 [事件时间](https://flink.sojb.cn/dev/event_time.html),特别是[后期数据元,](https://flink.sojb.cn/dev/event_time.html#late-elements)以便更全面地讨论Flink如何处理事件时间。 默认情况下,当水印超过窗口末尾时,会删除延迟数据元。但是,Flink允许为窗口 算子指定最大_允许延迟_。允许延迟指定数据元在被删除之前可以延迟多少时间,并且其默认值为0.在水印通过窗口结束之后但在通过窗口结束加上允许的延迟之前到达的数据元,仍然添加到窗口中。根据使用的触发器,延迟但未丢弃的数据元可能会导致窗口再次触发。就是这种情况`EventTimeTrigger`。 为了使这项工作,Flink保持窗口的状态,直到他们允许的延迟到期。一旦发生这种情况,Flink将删除窗口并删除其状态,如“ [窗口生命周期”](#window-lifecycle)部分中所述。 默认默认情况下,允许的延迟设置为 `0`。也就是说,到达水印后面的数据元将被丢弃。 你可以像这样指定一个允许的迟到: * [**Java**](#tab_java_14) * [**Scala**](#tab_scala_14) ``` DataStream input = ...; input .keyBy() .window() .allowedLateness(