Windows are at the heart of processing infinite streams. Windows split the stream into “buckets” of finite size, over which we can apply computations. This document focuses on how windowing is performed in Flink and how the programmer can benefit to the maximum from its offered functionality.
The general structure of a windowed Flink program is presented below. The first snippet refers to _keyed_ streams, while the second to _non-keyed_ ones. As one can see, the only difference is the `keyBy(...)` call for the keyed streams and the `window(...)` which becomes `windowAll(...)` for non-keyed streams. This is also going to serve as a roadmap for the rest of the page.
In the above, the commands in square brackets ([…]) are optional. This reveals that Flink allows you to customize your windowing logic in many different ways so that it best fits your needs.
In a nutshell, a window is **created** as soon as the first element that should belong to this window arrives, and the window is **completely removed** when the time (event or processing time) passes its end timestamp plus the user-specified `allowed lateness` (see [Allowed Lateness](#allowed-lateness)). Flink guarantees removal only for time-based windows and not for other types, _e.g._ global windows (see [Window Assigners](#window-assigners)). For example, with an event-time-based windowing strategy that creates non-overlapping (or tumbling) windows every 5 minutes and has an allowed lateness of 1 min, Flink will create a new window for the interval between `12:00` and `12:05` when the first element with a timestamp that falls into this interval arrives, and it will remove it when the watermark passes the `12:06` timestamp.
In addition, each window will have a `Trigger` (see [Triggers](#triggers)) and a function (`ProcessWindowFunction`, `ReduceFunction`, `AggregateFunction` or `FoldFunction`) (see [Window Functions](#window-functions)) attached to it. The function will contain the computation to be applied to the contents of the window, while the `Trigger` specifies the conditions under which the window is considered ready for the function to be applied. A triggering policy might be something like “when the number of elements in the window is more than 4”, or “when the watermark passes the end of the window”. A trigger can also decide to purge a window’s contents any time between its creation and removal. Purging in this case only refers to the elements in the window, and _not_ the window metadata. This means that new data can still be added to that window.
Apart from the above, you can specify an `Evictor` (see [Evictors](#evictors)) which will be able to remove elements from the window after the trigger fires and before and/or after the function is applied.
In the following we go into more detail for each of the components above. We start with the required parts in the above snippet (see [Keyed vs Non-Keyed Windows](#keyed-vs-non-keyed-windows), [Window Assigner](#window-assigner), and [Window Function](#window-function)) before moving to the optional ones.
The first thing to specify is whether your stream should be keyed or not. This has to be done before defining the window. Using the `keyBy(...)` will split your infinite stream into logical keyed streams. If `keyBy(...)` is not called, your stream is not keyed.
In the case of keyed streams, any attribute of your incoming events can be used as a key (more details [here](//ci.apache.org/projects/flink/flink-docs-release-1.7/dev/api_concepts.html#specifying-keys)). Having a keyed stream will allow your windowed computation to be performed in parallel by multiple tasks, as each logical keyed stream can be processed independently from the rest. All elements referring to the same key will be sent to the same parallel task.
In case of non-keyed streams, your original stream will not be split into multiple logical streams and all the windowing logic will be performed by a single task, _i.e._ with parallelism of 1.
After specifying whether your stream is keyed or not, the next step is to define a _window assigner_. The window assigner defines how elements are assigned to windows. This is done by specifying the `WindowAssigner` of your choice in the `window(...)` (for _keyed_ streams) or the `windowAll()` (for _non-keyed_ streams) call.
A `WindowAssigner` is responsible for assigning each incoming element to one or more windows. Flink comes with pre-defined window assigners for the most common use cases, namely _tumbling windows_, _sliding windows_, _session windows_ and _global windows_. You can also implement a custom window assigner by extending the `WindowAssigner` class. All built-in window assigners (except the global windows) assign elements to windows based on time, which can either be processing time or event time. Please take a look at our section on [event time](//ci.apache.org/projects/flink/flink-docs-release-1.7/dev/event_time.html) to learn about the difference between processing time and event time and how timestamps and watermarks are generated.
Time-based windows have a _start timestamp_ (inclusive) and an _end timestamp_ (exclusive) that together describe the size of the window. In code, Flink uses `TimeWindow` when working with time-based windows which has methods for querying the start- and end-timestamp and also an additional method `maxTimestamp()` that returns the largest allowed timestamp for a given windows.
In the following, we show how Flink’s pre-defined window assigners work and how they are used in a DataStream program. The following figures visualize the workings of each assigner. The purple circles represent elements of the stream, which are partitioned by some key (in this case _user 1_, _user 2_ and _user 3_). The x-axis shows the progress of time.
A _tumbling windows_ assigner assigns each element to a window of a specified _window size_. Tumbling windows have a fixed size and do not overlap. For example, if you specify a tumbling window with a size of 5 minutes, the current window will be evaluated and a new window will be started every five minutes as illustrated by the following figure.
The following code snippets show how to use tumbling windows.
下面的代码片段演示如何使用滚动的窗口。
...
...
@@ -121,18 +158,24 @@ val input: DataStream[T] = ...
Time intervals can be specified by using one of `Time.milliseconds(x)`, `Time.seconds(x)`, `Time.minutes(x)`, and so on.
时间间隔可以使用“时间.毫秒(X)”、“时间.秒(X)”、“时间.分钟(X)”等来指定。
As shown in the last example, tumbling window assigners also take an optional `offset` parameter that can be used to change the alignment of windows. For example, without offsets hourly tumbling windows are aligned with epoch, that is you will get windows such as `1:00:00.000 - 1:59:59.999`, `2:00:00.000 - 2:59:59.999` and so on. If you want to change that you can give an offset. With an offset of 15 minutes you would, for example, get `1:15:00.000 - 2:14:59.999`, `2:15:00.000 - 3:14:59.999` etc. An important use case for offsets is to adjust windows to timezones other than UTC-0. For example, in China you would have to specify an offset of `Time.hours(-8)`.
The _sliding windows_ assigner assigns elements to windows of fixed length. Similar to a tumbling windows assigner, the size of the windows is configured by the _window size_ parameter. An additional _window slide_ parameter controls how frequently a sliding window is started. Hence, sliding windows can be overlapping if the slide is smaller than the window size. In this case elements are assigned to multiple windows.
For example, you could have windows of size 10 minutes that slides by 5 minutes. With this you get every 5 minutes a window that contains the events that arrived during the last 10 minutes as depicted by the following figure.
The following code snippets show how to use sliding windows.
以下代码片段显示如何使用滑动窗口。
...
...
@@ -184,16 +227,21 @@ val input: DataStream[T] = ...
Time intervals can be specified by using one of `Time.milliseconds(x)`, `Time.seconds(x)`, `Time.minutes(x)`, and so on.
时间间隔可以使用“时间.毫秒(X)”、“时间.秒(X)”、“时间.分钟(X)”等来指定。
As shown in the last example, sliding window assigners also take an optional `offset` parameter that can be used to change the alignment of windows. For example, without offsets hourly windows sliding by 30 minutes are aligned with epoch, that is you will get windows such as `1:00:00.000 - 1:59:59.999`, `1:30:00.000 - 2:29:59.999` and so on. If you want to change that you can give an offset. With an offset of 15 minutes you would, for example, get `1:15:00.000 - 2:14:59.999`, `1:45:00.000 - 2:44:59.999` etc. An important use case for offsets is to adjust windows to timezones other than UTC-0. For example, in China you would have to specify an offset of `Time.hours(-8)`.
The _session windows_ assigner groups elements by sessions of activity. Session windows do not overlap and do not have a fixed start and end time, in contrast to _tumbling windows_ and _sliding windows_. Instead a session window closes when it does not receive elements for a certain period of time, _i.e._, when a gap of inactivity occurred. A session window assigner can be configured with either a static _session gap_ or with a _session gap extractor_ function which defines how long the period of inactivity is. When this period expires, the current session closes and subsequent elements are assigned to a new session window.
Dynamic gaps are specified by implementing the `SessionWindowTimeGapExtractor` interface.
通过实现“SessionWindowTimeGapExtractor”接口指定动态间隙。
Attention Since session windows do not have a fixed start and end, they are evaluated differently than tumbling and sliding windows. Internally, a session window operator creates a new window for each arriving record and merges windows together if their are closer to each other than the defined gap. In order to be mergeable, a session window operator requires a merging [Trigger](#triggers) and a merging [Window Function](#window-functions), such as `ReduceFunction`, `AggregateFunction`, or `ProcessWindowFunction` (`FoldFunction` cannot merge.)
A _global windows_ assigner assigns all elements with the same key to the same single _global window_. This windowing scheme is only useful if you also specify a custom [trigger](#triggers). Otherwise, no computation will be performed, as the global window does not have a natural end at which we could process the aggregated elements.
The following code snippets show how to use a global window.
下面的代码片段演示如何使用全局窗口。
...
...
@@ -308,18 +362,25 @@ input
## Window Functions
## Window Functions 窗口函数
After defining the window assigner, we need to specify the computation that we want to perform on each of these windows. This is the responsibility of the _window function_, which is used to process the elements of each (possibly keyed) window once the system determines that a window is ready for processing (see [triggers](#triggers) for how Flink determines when a window is ready).
The window function can be one of `ReduceFunction`, `AggregateFunction`, `FoldFunction` or `ProcessWindowFunction`. The first two can be executed more efficiently (see [State Size](#state size) section) because Flink can incrementally aggregate the elements for each window as they arrive. A `ProcessWindowFunction` gets an `Iterable` for all the elements contained in a window and additional meta information about the window to which the elements belong.
A windowed transformation with a `ProcessWindowFunction` cannot be executed as efficiently as the other cases because Flink has to buffer _all_ elements for a window internally before invoking the function. This can be mitigated by combining a `ProcessWindowFunction` with a `ReduceFunction`, `AggregateFunction`, or `FoldFunction` to get both incremental aggregation of window elements and the additional window metadata that the `ProcessWindowFunction` receives. We will look at examples for each of these variants.
A `ReduceFunction` specifies how two elements from the input are combined to produce an output element of the same type. Flink uses a `ReduceFunction` to incrementally aggregate the elements of a window.
A `ReduceFunction` can be defined and used like this:
“ReduceFunction”可以这样定义和使用:
...
...
@@ -352,14 +413,19 @@ input
The above example sums up the second fields of the tuples for all elements in a window.
上述示例对窗口中的所有元素的元组的第二字段求和。
### AggregateFunction
### AggregateFunction 聚合函数
An `AggregateFunction` is a generalized version of a `ReduceFunction` that has three types: an input type (`IN`), accumulator type (`ACC`), and an output type (`OUT`). The input type is the type of elements in the input stream and the `AggregateFunction` has a method for adding one input element to an accumulator. The interface also has methods for creating an initial accumulator, for merging two accumulators into one accumulator and for extracting an output (of type `OUT`) from an accumulator. We will see how this works in the example below.
Same as with `ReduceFunction`, Flink will incrementally aggregate input elements of a window as they arrive.
与“ReduceFunction”一样,Flink将在窗口的输入元素到达时递增地聚合它们。
An `AggregateFunction` can be defined and used like this:
“聚合功能”可以这样定义和使用:
...
...
@@ -431,12 +497,16 @@ input
The above example computes the average of the second field of the elements in the window.
上述示例计算窗口中元素的第二个字段的平均值。
### FoldFunction
### FoldFunction 折叠函数
A `FoldFunction` specifies how an input element of the window is combined with an element of the output type. The `FoldFunction` is incrementally called for each element that is added to the window and the current output value. The first element is combined with a pre-defined initial value of the output type.
A `FoldFunction` can be defined and used like this:
“FoldFunction”可以如下定义和使用:
...
...
@@ -469,14 +539,19 @@ input
The above example appends all input `Long` values to an initially empty `String`.
上面的示例将所有输入“长”值附加到最初空的“字符串”。
Attention `fold()` cannot be used with session windows or other mergeable windows.
注意“折叠()”不能与会话窗口或其他可变形的窗口一起使用。
### ProcessWindowFunction
### ProcessWindowFunction 进程窗口函数
A ProcessWindowFunction gets an Iterable containing all the elements of the window, and a Context object with access to time and state information, which enables it to provide more flexibility than other window functions. This comes at the cost of performance and resource consumption, because elements cannot be incrementally aggregated but instead need to be buffered internally until the window is considered ready for processing.
The signature of `ProcessWindowFunction` looks as follows:
“ProcessWindowsFunction”的签名如下所示:
...
...
@@ -589,8 +664,10 @@ abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window] extends Function
Note The `key` parameter is the key that is extracted via the `KeySelector` that was specified for the `keyBy()` invocation. In case of tuple-index keys or string-field references this key type is always `Tuple` and you have to manually cast it to a tuple of the correct size to extract the key fields.
A `ProcessWindowFunction` can be defined and used like this:
可以定义并使用“processwindowfunction”,如:
...
...
@@ -647,18 +724,25 @@ class MyProcessWindowFunction extends ProcessWindowFunction[(String, Long), Stri
The example shows a `ProcessWindowFunction` that counts the elements in a window. In addition, the window function adds information about the window to the output.
Attention Note that using `ProcessWindowFunction` for simple aggregates such as count is quite inefficient. The next section shows how a `ReduceFunction` or `AggregateFunction` can be combined with a `ProcessWindowFunction` to get both incremental aggregation and the added information of a `ProcessWindowFunction`.
### ProcessWindowFunction with Incremental Aggregation
### ProcessWindowFunction with Incremental Aggregation 具有增量聚合的进程窗口函数
A `ProcessWindowFunction` can be combined with either a `ReduceFunction`, an `AggregateFunction`, or a `FoldFunction` to incrementally aggregate elements as they arrive in the window. When the window is closed, the `ProcessWindowFunction` will be provided with the aggregated result. This allows it to incrementally compute windows while having access to the additional window meta information of the `ProcessWindowFunction`.
#### Incremental Window Aggregation with ReduceFunction
#### Incremental Window Aggregation with ReduceFunction 具有缩减功能的增量窗口聚合
The following example shows how an incremental `ReduceFunction` can be combined with a `ProcessWindowFunction` to return the smallest event in a window along with the start time of the window.
#### Incremental Window Aggregation with AggregateFunction
#### Incremental Window Aggregation with AggregateFunction 具有AggregateFunction的增量窗口聚合
The following example shows how an incremental `AggregateFunction` can be combined with a `ProcessWindowFunction` to compute the average and also emit the key and window along with the average.
@@ -814,8 +900,10 @@ class MyProcessWindowFunction extends ProcessWindowFunction[Double, (String, Dou
#### Incremental Window Aggregation with FoldFunction
#### Incremental Window Aggregation with FoldFunction 带折叠功能的增量窗口聚合
The following example shows how an incremental `FoldFunction` can be combined with a `ProcessWindowFunction` to extract the number of events in the window and return also the key and end time of the window.
### Using per-window state in ProcessWindowFunction
### Using per-window state in ProcessWindowFunction 在ProcessWindow函数中使用每个窗口状态
In addition to accessing keyed state (as any rich function can) a `ProcessWindowFunction` can also use keyed state that is scoped to the window that the function is currently processing. In this context it is important to understand what the window that _per-window_ state is referring to is. There are different “windows” involved:
* The window that was defined when specifying the windowed operation: This might be _tumbling windows of 1 hour_ or _sliding windows of 2 hours that slide by 1 hour_.
* 指定加窗操作时定义的窗口:这可能是1小时_或_滑动窗口的1小时_时间窗。
* An actual instance of a defined window for a given key: This might be _time window from 12:00 to 13:00 for user-id xyz_. This is based on the window definition and there will be many windows based on the number of keys that the job is currently processing and based on what time slots the events fall into.
Per-window state is tied to the latter of those two. Meaning that if we process events for 1000 different keys and events for all of them currently fall into the _[12:00, 13:00)_ time window then there will be 1000 window instances that each have their own keyed per-window state.
There are two methods on the `Context` object that a `process()` invocation receives that allow access two the two types of state:
“进程()”调用接收的“Context”对象上有两种方法,允许访问两种类型的状态:
* `globalState()`, which allows access to keyed state that is not scoped to a window
* `globalState()`,允许访问不属于窗口的键状态
* `windowState()`, which allows access to keyed state that is also scoped to the window
* `windowState()`, 它允许访问也在窗口范围内的键控状态
This feature is helpful if you anticipate multiple firing for the same window, as can happen when you have late firings for data that arrives late or when you have a custom trigger that does speculative early firings. In such a case you would store information about previous firings or the number of firings in per-window state.
When using windowed state it is important to also clean up that state when a window is cleared. This should happen in the `clear()` method.
当使用窗口状态时,重要的是在清除窗口时也要清理该状态。这应该发生在“明确()”方法中。
### WindowFunction (Legacy)
### WindowFunction (Legacy) 窗口功能(遗产)
In some places where a `ProcessWindowFunction` can be used you can also use a `WindowFunction`. This is an older version of `ProcessWindowFunction` that provides less contextual information and does not have some advances features, such as per-window keyed state. This interface will be deprecated at some point.
The signature of a `WindowFunction` looks as follows:
“窗口函数”的签名如下所示:
...
...
@@ -944,6 +1045,7 @@ trait WindowFunction[IN, OUT, KEY, W <: Window] extends Function with Serializab
It can be used like this:
它可以这样使用:
...
...
@@ -972,58 +1074,90 @@ input
## Triggers
## Triggers )扳机
A `Trigger` determines when a window (as formed by the _window assigner_) is ready to be processed by the _window function_. Each `WindowAssigner` comes with a default `Trigger`. If the default trigger does not fit your needs, you can specify a custom trigger using `trigger(...)`.
The trigger interface has five methods that allow a `Trigger` to react to different events:
触发器接口有五种允许“Trigger”对不同事件作出反应的方法:
* The `onElement()` method is called for each element that is added to a window.
* 对添加到窗口中的每个元素调用“OnElement()”方法。
* The `onEventTime()` method is called when a registered event-time timer fires.
* 当注册事件时间计时器触发时,将调用‘onEventTime()’方法。
* The `onProcessingTime()` method is called when a registered processing-time timer fires.
* 当注册的处理时间计时器触发时,将调用‘onProcessingTime()’方法。
* The `onMerge()` method is relevant for stateful triggers and merges the states of two triggers when their corresponding windows merge, _e.g._ when using session windows.
*`FIRE_AND_PURGE`: trigger the computation and clear the elements in the window afterwards.
*`FIRE_AND_PURGE`: 触发计算,然后清除窗口中的元素。
2) Any of these methods can be used to register processing- or event-time timers for future actions.
2) 这些方法中的任何一种都可用于为将来的操作注册处理或事件-时间计时器。
### Fire and Purge
### Fire and Purge 火灾和吹扫
Once a trigger determines that a window is ready for processing, it fires, _i.e._, it returns `FIRE` or `FIRE_AND_PURGE`. This is the signal for the window operator to emit the result of the current window. Given a window with a `ProcessWindowFunction` all elements are passed to the `ProcessWindowFunction` (possibly after passing them to an evictor). Windows with `ReduceFunction`, `AggregateFunction`, or `FoldFunction` simply emit their eagerly aggregated result.
When a trigger fires, it can either `FIRE` or `FIRE_AND_PURGE`. While `FIRE` keeps the contents of the window, `FIRE_AND_PURGE` removes its content. By default, the pre-implemented triggers simply `FIRE` without purging the window state.
Attention Purging will simply remove the contents of the window and will leave any potential meta-information about the window and any trigger state intact.
注意清除将简单地删除窗口的内容,并将保留任何关于窗口和任何触发状态的潜在元信息。
### Default Triggers of WindowAssigners
### Default Triggers of WindowAssigners Windows分配程序的默认触发器
The default `Trigger` of a `WindowAssigner` is appropriate for many use cases. For example, all the event-time window assigners have an `EventTimeTrigger` as default trigger. This trigger simply fires once the watermark passes the end of a window.
Attention The default trigger of the `GlobalWindow` is the `NeverTrigger` which does never fire. Consequently, you always have to define a custom trigger when using a `GlobalWindow`.
Attention By specifying a trigger using `trigger()` you are overwriting the default trigger of a `WindowAssigner`. For example, if you specify a `CountTrigger` for `TumblingEventTimeWindows` you will no longer get window firings based on the progress of time but only by count. Right now, you have to write your own custom trigger if you want to react based on both time and count.
* The (already mentioned) `EventTimeTrigger` fires based on the progress of event-time as measured by watermarks.
* (已经提到)“事件时间触发”火灾是基于由水印测量的事件时间的进度。
* The `ProcessingTimeTrigger` fires based on processing time.
* 基于处理时间的“处理时间触发”火灾。
* The `CountTrigger` fires once the number of elements in a window exceeds the given limit.
* 一旦窗口中的元素数超过给定的限制,“CountTrigger”就会触发。
* The `PurgingTrigger` takes as argument another trigger and transforms it into a purging one.
* ‘PurgingTrigger’使用另一个触发器作为参数,并将其转换为清除触发器。
If you need to implement a custom trigger, you should check out the abstract [Trigger](https://github.com/apache/flink/blob/master//flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java) class. Please note that the API is still evolving and might change in future versions of Flink.
Flink’s windowing model allows specifying an optional `Evictor` in addition to the `WindowAssigner` and the `Trigger`. This can be done using the `evictor(...)` method (shown in the beginning of this document). The evictor has the ability to remove elements from a window _after_ the trigger fires and _before and/or after_ the window function is applied. To do so, the `Evictor` interface has two methods:
@@ -1048,30 +1182,44 @@ void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, Evic
```
The `evictBefore()` contains the eviction logic to be applied before the window function, while the `evictAfter()` contains the one to be applied after the window function. Elements evicted before the application of the window function will not be processed by it.
Flink comes with three pre-implemented evictors. These are:
FLINK来自三个预先实施的驱逐者。这些是:
*`CountEvictor`: keeps up to a user-specified number of elements from the window and discards the remaining ones from the beginning of the window buffer.
*`CountEvictor`: 保持用户指定的窗口元素数量,并从窗口缓冲区开始丢弃其余元素。
*`DeltaEvictor`: takes a `DeltaFunction` and a `threshold`, computes the delta between the last element in the window buffer and each of the remaining ones, and removes the ones with a delta greater or equal to the threshold.
*`TimeEvictor`: takes as argument an `interval` in milliseconds and for a given window, it finds the maximum timestamp `max_ts` among its elements and removes all the elements with timestamps smaller than `max_ts - interval`.
Default By default, all the pre-implemented evictors apply their logic before the window function.
默认情况下,所有预实现的驱逐者都在窗口函数之前应用它们的逻辑。
Attention Specifying an evictor prevents any pre-aggregation, as all the elements of a window have to be passed to the evictor before applying the computation.
注意,指定驱逐者可以防止任何预聚合,因为在应用计算之前,窗口的所有元素都必须传递给驱逐者。
Attention Flink provides no guarantees about the order of the elements within a window. This implies that although an evictor may remove elements from the beginning of the window, these are not necessarily the ones that arrive first or last.
When working with _event-time_ windowing, it can happen that elements arrive late, _i.e._ the watermark that Flink uses to keep track of the progress of event-time is already past the end timestamp of a window to which an element belongs. See [event time](//ci.apache.org/projects/flink/flink-docs-release-1.7/dev/event_time.html) and especially [late elements](//ci.apache.org/projects/flink/flink-docs-release-1.7/dev/event_time.html#late-elements) for a more thorough discussion of how Flink deals with event time.
By default, late elements are dropped when the watermark is past the end of the window. However, Flink allows to specify a maximum _allowed lateness_ for window operators. Allowed lateness specifies by how much time elements can be late before they are dropped, and its default value is 0. Elements that arrive after the watermark has passed the end of the window but before it passes the end of the window plus the allowed lateness, are still added to the window. Depending on the trigger used, a late but not dropped element may cause the window to fire again. This is the case for the `EventTimeTrigger`.
In order to make this work, Flink keeps the state of windows until their allowed lateness expires. Once this happens, Flink removes the window and deletes its state, as also described in the [Window Lifecycle](#window-lifecycle) section.
Default By default, the allowed lateness is set to `0`. That is, elements that arrive behind the watermark will be dropped.
默认情况下,允许的延迟设置为“0”。也就是说,到达水印后面的元素将被丢弃。
You can specify an allowed lateness like this:
您可以指定如下所允许的延迟:
...
...
@@ -1102,12 +1250,16 @@ input
Note When using the `GlobalWindows` window assigner no data is ever considered late because the end timestamp of the global window is `Long.MAX_VALUE`.
### Getting late data as a side output 获取延迟数据作为侧输出
Using Flink’s [side output](//ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/side_output.html) feature you can get a stream of the data that was discarded as late.
You first need to specify that you want to get late data using `sideOutputLateData(OutputTag)` on the windowed stream. Then, you can get the side-output stream on the result of the windowed operation:
@@ -1148,29 +1300,42 @@ val lateStream = result.getSideOutput(lateOutputTag)
### Late elements considerations
### Late elements considerations 后期要素考虑
When specifying an allowed lateness greater than 0, the window along with its content is kept after the watermark passes the end of the window. In these cases, when a late but not dropped element arrives, it could trigger another firing for the window. These firings are called `late firings`, as they are triggered by late events and in contrast to the `main firing` which is the first firing of the window. In case of session windows, late firings can further lead to merging of windows, as they may “bridge” the gap between two pre-existing, unmerged windows.
Attention You should be aware that the elements emitted by a late firing should be treated as updated results of a previous computation, i.e., your data stream will contain multiple results for the same computation. Depending on your application, you need to take these duplicated results into account or deduplicate them.
The result of a windowed operation is again a `DataStream`, no information about the windowed operations is retained in the result elements so if you want to keep meta-information about the window you have to manually encode that information in the result elements in your `ProcessWindowFunction`. The only relevant information that is set on the result elements is the element _timestamp_. This is set to the maximum allowed timestamp of the processed window, which is _end timestamp - 1_, since the window-end timestamp is exclusive. Note that this is true for both event-time windows and processing-time windows. i.e. after a windowed operations elements always have a timestamp, but this can be an event-time timestamp or a processing-time timestamp. For processing-time windows this has no special implications but for event-time windows this together with how watermarks interact with windows enables [consecutive windowed operations](#consecutive-windowed-operations) with the same window sizes. We will cover this after taking a look how watermarks interact with windows.
### Interaction of watermarks and windows 水印与窗口的交互
Before continuing in this section you might want to take a look at our section about [event time and watermarks](//ci.apache.org/projects/flink/flink-docs-release-1.7/dev/event_time.html).
When watermarks arrive at the window operator this triggers two things:
当水印到达窗口运算符时,这触发了两件事:
* the watermark triggers computation of all windows where the maximum timestamp (which is _end-timestamp - 1_) is smaller than the new watermark
* 水印触发所有窗口的计算,其中最大时间戳(即_end时间戳-1_)小于新水印。
* the watermark is forwarded (as is) to downstream operations
* 水印将(按原样)转发到下游操作。
Intuitively, a watermark “flushes” out any windows that would be considered late in downstream operations once they receive that watermark.
直观地说,水印“冲洗”任何窗口,这些窗口一旦收到水印,在下游操作中就会被认为是晚的。
### Consecutive windowed operations
### 连续窗口操作
As mentioned before, the way the timestamp of windowed results is computed and how watermarks interact with windows allows stringing together consecutive windowed operations. This can be useful when you want to do two consecutive windowed operations where you want to use different keys but still want elements from the same upstream window to end up in the same downstream window. Consider this example:
@@ -1207,14 +1372,20 @@ val globalResults = resultsPerKey
In this example, the results for time window `[0, 5)` from the first operation will also end up in time window `[0, 5)` in the subsequent windowed operation. This allows calculating a sum per key and then calculating the top-k elements within the same window in the second operation.
Windows can be defined over long periods of time (such as days, weeks, or months) and therefore accumulate very large state. There are a couple of rules to keep in mind when estimating the storage requirements of your windowing computation:
1. Flink creates one copy of each element per window to which it belongs. Given this, tumbling windows keep one copy of each element (an element belongs to exactly one window unless it is dropped late). In contrast, sliding windows create several of each element, as explained in the [Window Assigners](#window-assigners) section. Hence, a sliding window of size 1 day and slide 1 second might not be a good idea.
2.`ReduceFunction`, `AggregateFunction`, and `FoldFunction` can significantly reduce the storage requirements, as they eagerly aggregate elements and store only one value per window. In contrast, just using a `ProcessWindowFunction` requires accumulating all elements.
3. Using an `Evictor` prevents any pre-aggregation, as all the elements of a window have to be passed through the evictor before applying the computation (see [Evictors](#evictors)).