diff --git a/docs/apis/streaming/non-windowed.svg b/docs/apis/streaming/non-windowed.svg new file mode 100644 index 0000000000000000000000000000000000000000..3c1cdaa3efdf6b8ab3f65c334d2d8977ac32c537 --- /dev/null +++ b/docs/apis/streaming/non-windowed.svg @@ -0,0 +1,22 @@ + + + + + diff --git a/docs/apis/streaming/session-windows.svg b/docs/apis/streaming/session-windows.svg new file mode 100644 index 0000000000000000000000000000000000000000..92785c7f8f46691f1be51bfd108083fb8d340439 --- /dev/null +++ b/docs/apis/streaming/session-windows.svg @@ -0,0 +1,22 @@ + + + + + diff --git a/docs/apis/streaming/sliding-windows.svg b/docs/apis/streaming/sliding-windows.svg new file mode 100644 index 0000000000000000000000000000000000000000..32c6bf037e1ab852fc144354933a84078d80ec10 --- /dev/null +++ b/docs/apis/streaming/sliding-windows.svg @@ -0,0 +1,22 @@ + + + + + diff --git a/docs/apis/streaming/tumbling-windows.svg b/docs/apis/streaming/tumbling-windows.svg new file mode 100644 index 0000000000000000000000000000000000000000..1857076c54aedf9e81c7b4a5cd94234c8bacfb24 --- /dev/null +++ b/docs/apis/streaming/tumbling-windows.svg @@ -0,0 +1,22 @@ + + + + + diff --git a/docs/apis/streaming/windows.md b/docs/apis/streaming/windows.md index 90ad0dec9eec3ff5bc88e65c2ed2f447532cd6a3..fba17fc1bbe6e2f15ec1e5661c989804ee243ebd 100644 --- a/docs/apis/streaming/windows.md +++ b/docs/apis/streaming/windows.md @@ -24,1023 +24,616 @@ specific language governing permissions and limitations under the License. --> +Flink uses a concept called *windows* to divide a (potentially) infinite `DataStream` into finite +slices based on the timestamps of elements or other criteria. This division is required when working +with infinite streams of data and performing transformations that aggregate elements. + +Info We will mostly talk about *keyed windowing* here, i.e. +windows that are applied on a `KeyedStream`. Keyed windows have the advantage that elements are +subdivided based on both window and key before being given to +a user function. The work can thus be distributed across the cluster +because the elements for different keys can be processed independently. If you absolutely have to, +you can check out [non-keyed windowing](#non-keyed-windowing) where we describe how non-keyed +windows work. + * This will be replaced by the TOC {:toc} -## Windows on Keyed Data Streams - -Flink offers a variety of methods for defining windows on a `KeyedStream`. All of these group elements *per key*, -i.e., each window will contain elements with the same key value. +## Basics -### Basic Window Constructs +For a windowed transformation you must at least specify a *key* +(see [specifying keys](/apis/common/index.html#specifying-keys)), +a *window assigner* and a *window function*. The *key* divides the infinite, non-keyed, stream +into logical keyed streams while the *window assigner* assigns elements to finite per-key windows. +Finally, the *window function* is used to process the elements of each window. -Flink offers a general window mechanism that provides flexibility, as well as a number of pre-defined windows -for common use cases. See first if your use case can be served by the pre-defined windows below before moving -to defining your own windows. +The basic structure of a windowed transformation is thus as follows:
+{% highlight java %} +DataStream input = ...; -
- - - - - - - - - - - - - - - - - - - - - - - - - - -
TransformationDescription
Tumbling time window
KeyedStream → WindowedStream
-

- Defines a window of 5 seconds, that "tumbles". This means that elements are - grouped according to their timestamp in groups of 5 second duration, and every element belongs to exactly one window. - The notion of time is specified by the selected TimeCharacteristic (see time). - {% highlight java %} -keyedStream.timeWindow(Time.seconds(5)); - {% endhighlight %} -

-
Sliding time window
KeyedStream → WindowedStream
-

- Defines a window of 5 seconds, that "slides" by 1 second. This means that elements are - grouped according to their timestamp in groups of 5 second duration, and elements can belong to more than - one window (since windows overlap by at most 4 seconds) - The notion of time is specified by the selected TimeCharacteristic (see time). - {% highlight java %} -keyedStream.timeWindow(Time.seconds(5), Time.seconds(1)); - {% endhighlight %} -

-
Tumbling count window
KeyedStream → WindowedStream
-

- Defines a window of 1000 elements, that "tumbles". This means that elements are - grouped according to their arrival time (equivalent to processing time) in groups of 1000 elements, - and every element belongs to exactly one window. - {% highlight java %} -keyedStream.countWindow(1000); - {% endhighlight %} -

-
Sliding count window
KeyedStream → WindowedStream
-

- Defines a window of 1000 elements, that "slides" every 100 elements. This means that elements are - grouped according to their arrival time (equivalent to processing time) in groups of 1000 elements, - and every element can belong to more than one window (as windows overlap by at most 900 elements). - {% highlight java %} -keyedStream.countWindow(1000, 100) - {% endhighlight %} -

-
- +input + .keyBy() + .window() + .(); +{% endhighlight %}
+{% highlight scala %} +val input: DataStream[T] = ... -
- - - - - - - - - - - - - - - - - - - - - - - - - - -
TransformationDescription
Tumbling time window
KeyedStream → WindowedStream
-

- Defines a window of 5 seconds, that "tumbles". This means that elements are - grouped according to their timestamp in groups of 5 second duration, and every element belongs to exactly one window. - The notion of time is specified by the selected TimeCharacteristic (see time). - {% highlight scala %} -keyedStream.timeWindow(Time.seconds(5)) - {% endhighlight %} -

-
Sliding time window
KeyedStream → WindowedStream
-

- Defines a window of 5 seconds, that "slides" by 1 second. This means that elements are - grouped according to their timestamp in groups of 5 second duration, and elements can belong to more than - one window (since windows overlap by at most 4 seconds) - The notion of time is specified by the selected TimeCharacteristic (see time). - {% highlight scala %} -keyedStream.timeWindow(Time.seconds(5), Time.seconds(1)) - {% endhighlight %} -

-
Tumbling count window
KeyedStream → WindowedStream
-

- Defines a window of 1000 elements, that "tumbles". This means that elements are - grouped according to their arrival time (equivalent to processing time) in groups of 1000 elements, - and every element belongs to exactly one window. - {% highlight scala %} -keyedStream.countWindow(1000) - {% endhighlight %} -

-
Sliding count window
KeyedStream → WindowedStream
-

- Defines a window of 1000 elements, that "slides" every 100 elements. This means that elements are - grouped according to their arrival time (equivalent to processing time) in groups of 1000 elements, - and every element can belong to more than one window (as windows overlap by at most 900 elements). - {% highlight scala %} -keyedStream.countWindow(1000, 100) - {% endhighlight %} -

-
- +input + .keyBy() + .window() + .() +{% endhighlight %}
-### Advanced Window Constructs +We will cover [window assigners](#window-assigners) in a separate section below. + +The window transformation can be one of `reduce()`, `fold()` or `apply()`. Which respectively +takes a `ReduceFunction`, `FoldFunction` or `WindowFunction`. We describe each of these ways +of specifying a windowed transformation in detail below: [window functions](#window-functions). + +For more advanced use cases you can also specify a `Trigger` that determines when exactly a window +is being considered as *ready for processing*. These will be covered in more detail in +[triggers](#triggers). + +## Window Assigners + +The window assigner specifies how elements of the stream are divided into finite slices. Flink comes +with pre-implemented window assigners for the most typical use cases, namely *tumbling windows*, +*sliding windows*, *session windows* and *global windows*, but you can implement your own by +extending the `WindowAssigner` class. All the built-in window assigners, except for the global +windows one, assign elements to windows based on time, which can either be processing time or event +time. Please take a look at our section on [event time](/apis/streaming/event_time.html) for more +information about how Flink deals with time. + +Let's first look at how each of these window assigners works before looking at how they can be used +in a Flink program. We will be using abstract figures to visualize the workings of each assigner: +in the following, the purple circles are elements of the stream, they are partitioned +by some key (in this case *user 1*, *user 2* and *user 3*) and the x-axis shows the progress +of time. + +### Global Windows + +Global windows are a way of specifying that we don't want to subdivide our elements into windows. +Each element is assigned to one single per-key *global window*. +This windowing scheme is only useful if you also specify a custom [trigger](#triggers). Otherwise, +no computation is ever going to be performed, as the global window does not have a natural end at +which we could process the aggregated elements. + + + +### Tumbling Windows + +A *tumbling windows* assigner assigns elements to fixed length, non-overlapping windows of a +specified *window size*.. For example, if you specify a window size of 5 minutes, the window +function will get 5 minutes worth of elements in each invocation. -The general mechanism can define more powerful windows at the cost of more verbose syntax. For example, -below is a window definition where windows hold elements of the last 5 seconds and slides every 1 second, -but the execution of the window function is triggered when 100 elements have been added to the -window, and every time execution is triggered, 10 elements are retained in the window: + + +### Sliding Windows + +The *sliding windows* assigner assigns elements to windows of fixed length equal to *window size*, +as the tumbling windows assigner, but in this case, windows can be overlapping. The size of the +overlap is defined by the user-specified parameter *window slide*. As windows are overlapping, an +element can be assigned to multiple windows + +For example, you could have windows of size 10 minutes that slide by 5 minutes. With this you get 10 +minutes worth of elements in each invocation of the window function and it will be invoked for every +5 minutes of data. + + + +### Session Windows + +The *session windows* assigner is ideal for cases where the window boundaries need to adjust to the +incoming data. Both the *tumbling windows* and *sliding windows* assigner assign elements to windows +that start at fixed time points and have a fixed *window size*. With session windows it is possible +to have windows that start at individual points in time for each key and that end once there has +been a certain period of inactivity. The configuration parameter is the *session gap* that specifies +how long to wait for new data before considering a session as closed. + + + +### Specifying a Window Assigner + +The built-in window assigners (except `GlobalWindows`) come in two versions. One for processing-time +windowing and one for event-time windowing. The processing-time assigners assign elements to +windows based on the current clock of the worker machines while the event-time assigners assign +windows based on the timestamps of elements. Please have a look at +[event time](/apis/streaming/event_time.html) to learn about the difference between processing time +and event time and about how timestamps can be assigned to elements. + +The following code snippets show how each of the window assigners can be used in a program:
{% highlight java %} -keyedStream - .window(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(1)) - .trigger(CountTrigger.of(100)) - .evictor(CountEvictor.of(10)); +DataStream input = ...; + +// tumbling event-time windows +input + .keyBy() + .window(TumblingEventTimeWindows.of(Time.seconds(5))) + .(); + +// sliding event-time windows +input + .keyBy() + .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))) + .(); + +// event-time session windows +input + .keyBy() + .window(EventTimeSessionWindows.withGap(Time.minutes(10))) + .(); + +// tumbling processing-time windows +input + .keyBy() + .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) + .(); + +// sliding processing-time windows +input + .keyBy() + .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))) + .(); + +// processing-time session windows +input + .keyBy() + .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10))) + .(); + +// global windows +input + .keyBy() + .window(GlobalWindows.create()) + .(); {% endhighlight %}
{% highlight scala %} -keyedStream - .window(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(1)) - .trigger(CountTrigger.of(100)) - .evictor(CountEvictor.of(10)) +val input: DataStream[T] = ... + +// tumbling event-time windows +input + .keyBy() + .window(TumblingEventTimeWindows.of(Time.seconds(5))) + .() + +// sliding event-time windows +input + .keyBy() + .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))) + .() + +// event-time session windows +input + .keyBy() + .window(EventTimeSessionWindows.withGap(Time.minutes(10))) + .() + +// tumbling processing-time windows +input + .keyBy() + .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) + .() + +// sliding processing-time windows +input + .keyBy() + .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))) + .() + +// processing-time session windows +input + .keyBy() + .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10))) + .() + +// global windows +input + .keyBy() + .window(GlobalWindows.create()) {% endhighlight %}
-The general recipe for building a custom window is to specify (1) a `WindowAssigner`, (2) a `Trigger` (optionally), -and (3) an `Evictor` (optionally). +Note, how we can specify a time interval by using one of `Time.milliseconds(x)`, `Time.seconds(x)`, +`Time.minutes(x)`, and so on. -The `WindowAssigner` defines how incoming elements are assigned to windows. A window is a logical group of elements -that has a begin-value, and an end-value corresponding to a begin-time and end-time. Elements with timestamp (according -to some notion of time described above within these values are part of the window). +## Window Functions -For example, the `SlidingEventTimeWindows` -assigner in the code above defines a window of size 5 seconds, and a slide of 1 second. Assume that -time starts from 0 and is measured in milliseconds. Then, we have 6 windows -that overlap: [0,5000], [1000,6000], [2000,7000], [3000, 8000], [4000, 9000], and [5000, 10000]. Each incoming -element is assigned to the windows according to its timestamp. For example, an element with timestamp 2000 will be -assigned to the first three windows. Flink comes bundled with window assigners that cover the most common use cases. You can write your -own window types by extending the `WindowAssigner` class. +The *window function* is used to process the elements of each window (and key) once the system +determines that a window is ready for processing (see [triggers](#triggers) for how the system +determines when a window is ready). -
+The window function can be one of `ReduceFunction`, `FoldFunction` or `WindowFunction`. The first +two can be executed more efficiently because Flink can incrementally aggregate the elements for each +window as they arrive. A `WindowFunction` gets an `Iterable` for all the elements contained in a +window and additional meta information about the window to which the elements belong. + +A windowed transformation with a `WindowFunction` cannot be executed as efficiently as the other +cases because Flink has to buffer *all* elements for a window internally before invoking the function. +This can be mitigated by combining a `WindowFunction` with a `ReduceFunction` or `FoldFunction` to +get both incremental aggregation of window elements and the additional information that the +`WindowFunction` receives. We will look at examples for each of these variants. + +### ReduceFunction +A reduce function specifies how two values can be combined to form one element. Flink can use this +to incrementally aggregate the elements in a window. + +A `ReduceFunction` can be used in a program like this: + +
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
TransformationDescription
Global window
KeyedStream → WindowedStream
-

- All incoming elements of a given key are assigned to the same window. - The window does not contain a default trigger, hence it will never be triggered - if a trigger is not explicitly specified. -

- {% highlight java %} -stream.window(GlobalWindows.create()); - {% endhighlight %} -
Tumbling event-time windows
KeyedStream → WindowedStream
-

- Incoming elements are assigned to a window of a certain size (1 second below) based on - their timestamp. Windows do not overlap, i.e., each element is assigned to exactly one window. - This assigner comes with a default trigger that fires for a window when a - watermark with value higher than its end-value is received. -

- {% highlight java %} -stream.window(TumblingEventTimeWindows.of(Time.seconds(1))); - {% endhighlight %} -
Sliding event-time windows
KeyedStream → WindowedStream
-

- Incoming elements are assigned to a window of a certain size (5 seconds below) based on - their timestamp. Windows "slide" by the provided value (1 second in the example), and hence - overlap. This assigner comes with a default trigger that fires for a window when a - watermark with value higher than its end-value is received. -

- {% highlight java %} -stream.window(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(1))); - {% endhighlight %} -
Tumbling processing time windows
KeyedStream → WindowedStream
-

- Incoming elements are assigned to a window of a certain size (1 second below) based on - the current processing time. Windows do not overlap, i.e., each element is assigned to exactly one window. - This assigner comes with a default trigger that fires for a window a window when the current - processing time exceeds its end-value. -

- {% highlight java %} -stream.window(TumblingProcessingTimeWindows.of(Time.seconds(1))); - {% endhighlight %} -
Sliding processing time windows
KeyedStream → WindowedStream
-

- Incoming elements are assigned to a window of a certain size (5 seconds below) based on - their timestamp. Windows "slide" by the provided value (1 second in the example), and hence - overlap. This assigner comes with a default trigger that fires for a window a window when the current - processing time exceeds its end-value. -

- {% highlight java %} -stream.window(SlidingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(1))); - {% endhighlight %} -
Event-time Session windows
KeyedStream → WindowedStream
-

- Incoming elements are assigned to sessions based on a session gap interval (5 seconds in the example below). - Elements whose timestamp differs by more than the session gap are assigned to different sessions. If there are - consecutive elements which are less than the session gap apart then these will also be put into the same session, i.e. elements - can be connected into a session by intermediate elements. -

- {% highlight scala %} -keyedStream.window(EventTimeSessionWindows.withGap(Time.seconds(5))); - {% endhighlight %} -
Processing time Session windows
KeyedStream → WindowedStream
-

- This is similar to event-time session windows but works on the current processing - time instead of the timestamp of elements -

- {% highlight scala %} -keyedStream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5))); - {% endhighlight %} -
+{% highlight java %} +DataStream> input = ...; + +input + .keyBy() + .window() + .reduce(new ReduceFunction> { + public Tuple2 reduce(Tuple2 v1, Tuple2 v2) { + return new Tuple2<>(v1.f0, v1.f1 + v2.f1); + } + }); +{% endhighlight %}
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
TransformationDescription
Global window
KeyedStream → WindowedStream
-

- All incoming elements of a given key are assigned to the same window. - The window does not contain a default trigger, hence it will never be triggered - if a trigger is not explicitly specified. -

- {% highlight scala %} -stream.window(GlobalWindows.create) - {% endhighlight %} -
Tumbling event-time windows
KeyedStream → WindowedStream
-

- Incoming elements are assigned to a window of a certain size (1 second below) based on - their timestamp. Windows do not overlap, i.e., each element is assigned to exactly one window. - This assigner comes with a default trigger that fires for a window when a - watermark with value higher than its end-value is received. -

- {% highlight scala %} -stream.window(TumblingEventTimeWindows.of(Time.seconds(1))) - {% endhighlight %} -
Sliding event-time windows
KeyedStream → WindowedStream
-

- Incoming elements are assigned to a window of a certain size (5 seconds below) based on - their timestamp. Windows "slide" by the provided value (1 second in the example), and hence - overlap. This assigner comes with a default trigger that fires for a window when a - watermark with value higher than its end-value is received. -

- {% highlight scala %} -stream.window(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(1))) - {% endhighlight %} -
Tumbling processing time windows
KeyedStream → WindowedStream
-

- Incoming elements are assigned to a window of a certain size (1 second below) based on - the current processing time. Windows do not overlap, i.e., each element is assigned to exactly one window. - This assigner comes with a default trigger that fires for a window a window when the current - processing time exceeds its end-value. - -

- {% highlight scala %} -stream.window(TumblingProcessingTimeWindows.of(Time.seconds(1))) - {% endhighlight %} -
Sliding processing time windows
KeyedStream → WindowedStream
-

- Incoming elements are assigned to a window of a certain size (5 seconds below) based on - their timestamp. Windows "slide" by the provided value (1 second in the example), and hence - overlap. This assigner comes with a default trigger that fires for a window a window when the current - processing time exceeds its end-value. -

- {% highlight scala %} -stream.window(SlidingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(1))) - {% endhighlight %} -
Event-time Session windows
KeyedStream → WindowedStream
-

- Incoming elements are assigned to sessions based on a session gap interval (5 seconds in the example below). - Elements whose timestamp differs by more than the session gap are assigned to different sessions. If there are - consecutive elements which are less than the session gap apart then these will also be put into the same session, i.e. elements - can be connected into a session by intermediate elements. -

- {% highlight scala %} -keyedStream.window(EventTimeSessionWindows.withGap(Time.seconds(5))) - {% endhighlight %} -
Processing time Session windows
KeyedStream → WindowedStream
-

- This is similar to event-time session windows but works on the current processing - time instead of the timestamp of elements -

- {% highlight scala %} -keyedStream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5))) - {% endhighlight %} -
-
+{% highlight scala %} +val input: DataStream[(String, Long)] = ... +input + .keyBy() + .window() + .reduce { (v1, v2) => (v1._1, v1._2 + v2._2) } +{% endhighlight %} +
-The `Trigger` specifies when the function that comes after the window clause (e.g., `sum`, `count`) is evaluated ("fires") -for each window. If a trigger is not specified, a default trigger for each window type is used (that is part of the -definition of the `WindowAssigner`). Flink comes bundled with a set of triggers if the ones that windows use by -default do not fit the application. You can write your own trigger by implementing the `Trigger` interface. Note that -specifying a trigger will override the default trigger of the window assigner. +A `ReduceFunction` specifies how two elements from the input can be combined to produce +an output element. This example will sum up the second field of the tuple for all elements +in a window. -
+### FoldFunction + +A fold function can be specified like this: +
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
TransformationDescription
Processing time trigger -

- A window is fired when the current processing time exceeds its end-value. - The elements on the triggered window are henceforth discarded. -

{% highlight java %} -windowedStream.trigger(ProcessingTimeTrigger.create()); +DataStream> input = ...; + +input + .keyBy() + .window() + .fold("", new FoldFunction, String>> { + public String fold(String acc, Tuple2 value) { + return acc + value.f1; + } + }); {% endhighlight %} -
Watermark trigger -

- A window is fired when a watermark with value that exceeds the window's end-value has been received. - The elements on the triggered window are henceforth discarded. -

-{% highlight java %} -windowedStream.trigger(EventTimeTrigger.create()); -{% endhighlight %} -
Continuous processing time trigger -

- A window is periodically considered for being fired (every 5 seconds in the example). - The window is actually fired only when the current processing time exceeds its end-value. - The elements on the triggered window are retained. -

-{% highlight java %} -windowedStream.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(5))); -{% endhighlight %} -
Continuous watermark time trigger -

- A window is periodically considered for being fired (every 5 seconds in the example). - A window is actually fired when a watermark with value that exceeds the window's end-value has been received. - The elements on the triggered window are retained. -

-{% highlight java %} -windowedStream.trigger(ContinuousEventTimeTrigger.of(Time.seconds(5))); + + +
+{% highlight scala %} +val input: DataStream[(String, Long)] = ... + +input + .keyBy() + .window() + .fold("") { (acc, v) => acc + v._2 } {% endhighlight %} -
Count trigger -

- A window is fired when it has more than a certain number of elements (1000 below). - The elements of the triggered window are retained. -

+ + + +A `FoldFunction` specifies how elements from the input will be added to an initial +accumulator value (`""`, the empty string, in our example). This example will compute +a concatenation of all the `Long` fields of the input. + +### WindowFunction - The Generic Case + +Using a `WindowFunction` provides most flexibility, at the cost of performance. The reason for this +is that elements cannot be incrementally aggregated for a window and instead need to be buffered +internally until the window is considered ready for processing. A `WindowFunction` gets an +`Iterable` containing all the elements of the window being processed. The signature of +`WindowFunction` is this: + +
+
{% highlight java %} -windowedStream.trigger(CountTrigger.of(1000)); +public interface WindowFunction extends Function, Serializable { + + /** + * Evaluates the window and outputs none or several elements. + * + * @param key The key for which this window is evaluated. + * @param window The window that is being evaluated. + * @param input The elements in the window being evaluated. + * @param out A collector for emitting elements. + * + * @throws Exception The function may throw exceptions to fail the program and trigger recovery. + */ + void apply(KEY key, W window, Iterable input, Collector out) throws Exception; +} {% endhighlight %} -
Purging trigger -

- Takes any trigger as an argument and forces the triggered window elements to be - "purged" (discarded) after triggering. -

-{% highlight java %} -windowedStream.trigger(PurgingTrigger.of(CountTrigger.of(1000))); + + +
+{% highlight scala %} +public interface WindowFunction extends Function, Serializable { + + /** + * Evaluates the window and outputs none or several elements. + * + * @param key The key for which this window is evaluated. + * @param window The window that is being evaluated. + * @param input The elements in the window being evaluated. + * @param out A collector for emitting elements. + * + * @throws Exception The function may throw exceptions to fail the program and trigger recovery. + */ + void apply(KEY key, W window, Iterable input, Collector out) throws Exception; +} {% endhighlight %} -
Delta trigger -

- A window is periodically considered for being fired (every 5000 milliseconds in the example). - A window is actually fired when the value of the last added element exceeds the value of - the first element inserted in the window according to a `DeltaFunction`. -

+ + + +Here we show an example that uses a `WindowFunction` to count the elements in a window. We do this +because we want to access information about the window itself to emit it along with the count. +This is very inefficient, however, and should be implemented with a +`ReduceFunction` in practice. Below, we will see an example of how a `ReduceFunction` can +be combined with a `WindowFunction` to get both incremental aggregation and the added +information of a `WindowFunction`. + +
+
{% highlight java %} -windowedStream.trigger(new DeltaTrigger.of(5000.0, new DeltaFunction() { - @Override - public double getDelta (Double old, Double new) { - return (new - old > 0.01); +DataStream> input = ...; + +input + .keyBy() + .window() + .apply(new MyWindowFunction()); + +/* ... */ + +public class MyWindowFunction implements WindowFunction, String, String, TimeWindow> { + + void apply(String key, TimeWindow window, Iterable> input, Collector out) { + long count = 0; + for (Tuple in: input) { + count++; } -})); + out.collect("Window: " + window + "count: " + count); + } +} + {% endhighlight %} -
-
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
TransformationDescription
Processing time trigger -

- A window is fired when the current processing time exceeds its end-value. - The elements on the triggered window are henceforth discarded. -

-{% highlight scala %} -windowedStream.trigger(ProcessingTimeTrigger.create); -{% endhighlight %} -
Watermark trigger -

- A window is fired when a watermark with value that exceeds the window's end-value has been received. - The elements on the triggered window are henceforth discarded. -

-{% highlight scala %} -windowedStream.trigger(EventTimeTrigger.create); -{% endhighlight %} -
Continuous processing time trigger -

- A window is periodically considered for being fired (every 5 seconds in the example). - The window is actually fired only when the current processing time exceeds its end-value. - The elements on the triggered window are retained. -

-{% highlight scala %} -windowedStream.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(5))); -{% endhighlight %} -
Continuous watermark time trigger -

- A window is periodically considered for being fired (every 5 seconds in the example). - A window is actually fired when a watermark with value that exceeds the window's end-value has been received. - The elements on the triggered window are retained. -

-{% highlight scala %} -windowedStream.trigger(ContinuousEventTimeTrigger.of(Time.seconds(5))); -{% endhighlight %} -
Count trigger -

- A window is fired when it has more than a certain number of elements (1000 below). - The elements of the triggered window are retained. -

{% highlight scala %} -windowedStream.trigger(CountTrigger.of(1000)); +val input: DataStream[(String, Long)] = ... + +input + .keyBy() + .window() + .apply(new MyWindowFunction()) + +/* ... */ + +class MyWindowFunction extends WindowFunction[(String, Long), String, String, TimeWindow] { + + def apply(key: String, window: TimeWindow, input: Iterable[(String, Long)], out: Collector[String]): () = { + var count = 0L + for (in <- input) { + count = count + 1 + } + out.collect(s"Window $window count: $count") + } +} {% endhighlight %} -
Purging trigger -

- Takes any trigger as an argument and forces the triggered window elements to be - "purged" (discarded) after triggering. -

-{% highlight scala %} -windowedStream.trigger(PurgingTrigger.of(CountTrigger.of(1000))); + + + +### WindowFunction with Incremental Aggregation + +A `WindowFunction` can be combined with either a `ReduceFunction` or a `FoldFunction`. When doing +this, the `ReduceFunction`/`FoldFunction` will be used to incrementally aggregate elements as they +arrive while the `WindowFunction` will be provided with the aggregated result when the window is +ready for processing. This allows to get the benefit of incremental window computation and also have +the additional meta information that writing a `WindowFunction` provides. + +This is an example that shows how incremental aggregation functions can be combined with +a `WindowFunction`. + +
+
+{% highlight java %} +DataStream> input = ...; + +// for folding incremental computation +input + .keyBy() + .window() + .apply(, new MyFoldFunction(), new MyWindowFunction()); + +// for reducing incremental computation +input + .keyBy() + .window() + .apply(new MyReduceFunction(), new MyWindowFunction()); {% endhighlight %} -
Delta trigger -

- A window is periodically considered for being fired (every 5000 milliseconds in the example). - A window is actually fired when the value of the last added element exceeds the value of - the first element inserted in the window according to a `DeltaFunction`. -

+ + +
{% highlight scala %} -windowedStream.trigger(DeltaTrigger.of(5000.0, { (old,new) => new - old > 0.01 })) +val input: DataStream[(String, Long)] = ... + +// for folding incremental computation +input + .keyBy() + .window() + .apply(, new MyFoldFunction(), new MyWindowFunction()) + +// for reducing incremental computation +input + .keyBy() + .window() + .apply(new MyReduceFunction(), new MyWindowFunction()) {% endhighlight %} -
-
-After the trigger fires, and before the function (e.g., `sum`, `count`) is applied to the window contents, an -optional `Evictor` removes some elements from the beginning of the window before the remaining elements -are passed on to the function. Flink comes bundled with a set of evictors You can write your own evictor by -implementing the `Evictor` interface. +## Dealing with Late Data -
+When working with event-time windowing it can happen that elements arrive late, i.e the +watermark that Flink uses to keep track of the progress of event-time is already past the +end timestamp of a window to which an element belongs. Please +see [event time](/apis/streaming/event_time.html) and especially +[late elements](/apis/streaming/event_time.html#late-elements) for a more thorough discussion of +how Flink deals with event time. + +You can specify how a windowed transformation should deal with late elements and how much lateness +is allowed. The parameter for this is called *allowed lateness*. This specifies by how much time +elements can be late. Elements that arrive within the allowed lateness are still put into windows +and are considered when computing window results. If elements arrive after the allowed lateness they +will be dropped. Flink will also make sure that any state held by the windowing operation is garbage +collected once the watermark passes the end of a window plus the allowed lateness. +You can specify an allowed lateness like this: + +
- - - - - - - - - - - - - - - - - - - - - -
TransformationDescription
Time evictor -

- Evict all elements from the beginning of the window, so that elements from end-value - 1 second - until end-value are retained (the resulting window size is 1 second). -

- {% highlight java %} -triggeredStream.evictor(TimeEvictor.of(Time.seconds(1))); - {% endhighlight %} -
Count evictor -

- Retain 1000 elements from the end of the window backwards, evicting all others. -

- {% highlight java %} -triggeredStream.evictor(CountEvictor.of(1000)); - {% endhighlight %} -
Delta evictor -

- Starting from the beginning of the window, evict elements until an element with - value lower than the value of the last element is found (by a threshold and a - DeltaFunction). -

- {% highlight java %} -triggeredStream.evictor(DeltaEvictor.of(5000, new DeltaFunction() { - public double getDelta (Double oldValue, Double newValue) { - return newValue - oldValue; - } -})); - {% endhighlight %} -
+{% highlight java %} +DataStream input = ...; + +input + .keyBy() + .window() + .allowedLateness(
- - - - - - - - - - - - - - - - - - - - - -
TransformationDescription
Time evictor -

- Evict all elements from the beginning of the window, so that elements from end-value - 1 second - until end-value are retained (the resulting window size is 1 second). -

- {% highlight scala %} -triggeredStream.evictor(TimeEvictor.of(Time.seconds(1))); - {% endhighlight %} -
Count evictor -

- Retain 1000 elements from the end of the window backwards, evicting all others. -

- {% highlight scala %} -triggeredStream.evictor(CountEvictor.of(1000)); - {% endhighlight %} -
Delta evictor -

- Starting from the beginning of the window, evict elements until an element with - value lower than the value of the last element is found (by a threshold and a - DeltaFunction). -

- {% highlight scala %} -windowedStream.evictor(DeltaEvictor.of(5000.0, { (old,new) => new - old > 0.01 })) - {% endhighlight %} -
-
+{% highlight scala %} +val input: DataStream[T] = ... +input + .keyBy() + .window() + .allowedLateness(
-### Recipes for Building Windows - -The mechanism of window assigner, trigger, and evictor is very powerful, and it allows you to define -many different kinds of windows. Flink's basic window constructs are, in fact, syntactic -sugar on top of the general mechanism. Below is how some common types of windows can be -constructed using the general mechanism - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
Window typeDefinition
- Tumbling count window
- {% highlight java %} -stream.countWindow(1000) - {% endhighlight %} -
- {% highlight java %} -stream.window(GlobalWindows.create()) - .trigger(PurgingTrigger.of(CountTrigger.of(size))) - {% endhighlight %} -
- Sliding count window
- {% highlight java %} -stream.countWindow(1000, 100) - {% endhighlight %} -
- {% highlight java %} -stream.window(GlobalWindows.create()) - .evictor(CountEvictor.of(1000)) - .trigger(CountTrigger.of(100)) - {% endhighlight %} -
- Tumbling event time window
- {% highlight java %} -stream.timeWindow(Time.seconds(5)) - {% endhighlight %} -
- {% highlight java %} -stream.window(TumblingEventTimeWindows.of(Time.seconds(5)) - .trigger(EventTimeTrigger.create()) - {% endhighlight %} -
- Sliding event time window
- {% highlight java %} -stream.timeWindow(Time.seconds(5), Time.seconds(1)) - {% endhighlight %} -
- {% highlight java %} -stream.window(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(1))) - .trigger(EventTimeTrigger.create()) - {% endhighlight %} -
- Tumbling processing time window
- {% highlight java %} -stream.timeWindow(Time.seconds(5)) - {% endhighlight %} -
- {% highlight java %} -stream.window(TumblingProcessingTimeWindows.of(Time.seconds(5)) - .trigger(ProcessingTimeTrigger.create()) - {% endhighlight %} -
- Sliding processing time window
- {% highlight java %} -stream.timeWindow(Time.seconds(5), Time.seconds(1)) - {% endhighlight %} -
- {% highlight java %} -stream.window(SlidingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(1))) - .trigger(ProcessingTimeTrigger.create()) - {% endhighlight %} -
- - -## Windows on Unkeyed Data Streams - -You can also define windows on regular (non-keyed) data streams using the `windowAll` transformation. These -windowed data streams have all the capabilities of keyed windowed data streams, but are evaluated at a single -task (and hence at a single computing node). The syntax for defining triggers and evictors is exactly the -same: +Note When using the `GlobalWindows` window assigner no +data is ever considered late because the end timestamp of the global window is `Long.MAX_VALUE`. + +## Triggers + +A `Trigger` determines when a window (as assigned by the `WindowAssigner`) is ready for being +processed by the *window function*. The trigger observes how elements are added to windows +and can also keep track of the progress of processing time and event time. Once a trigger +determines that a window is ready for processing, it fires. This is the signal for the +window operation to take the elements that are currently in the window and pass them along to +the window function to produce output for the firing window. + +Each `WindowAssigner` (except `GlobalWindows`) comes with a default trigger that should be +appropriate for most use cases. For example, `TumblingEventTimeWindows` has an `EventTimeTrigger` as +default trigger. This trigger simply fires once the watermark passes the end of a window. + +You can specify the trigger to be used by calling `trigger()` with a given `Trigger`. The +whole specification of the windowed transformation would then look like this:
{% highlight java %} -nonKeyedStream - .windowAll(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(1)) - .trigger(CountTrigger.of(100)) - .evictor(CountEvictor.of(10)); +DataStream input = ...; + +input + .keyBy() + .window() + .trigger() + .(); {% endhighlight %}
{% highlight scala %} -nonKeyedStream - .windowAll(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(1)) - .trigger(CountTrigger.of(100)) - .evictor(CountEvictor.of(10)) +val input: DataStream[T] = ... + +input + .keyBy() + .window() + .trigger() + .() {% endhighlight %}
-Basic window definitions are also available for windows on non-keyed streams: +Flink comes with a few triggers out-of-box: there is the already mentioned `EventTimeTrigger` that +fires based on the progress of event-time as measured by the watermark, the `ProcessingTimeTrigger` +does the same but based on processing time and the `CountTrigger` fires once the number of elements +in a window exceeds the given limit. + +Attention By specifying a trigger using `trigger()` you +are overwriting the default trigger of a `WindowAssigner`. For example, if you specify a +`CountTrigger` for `TumblingEventTimeWindows` you will no longer get window firings based on the +progress of time but only by count. Right now, you have to write your own custom trigger if +you want to react based on both time and count. + +The internal `Trigger` API is still considered experimental but you can check out the code +if you want to write your own custom trigger: +{% gh_link /flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java "Trigger.java" %}. + +## Non-keyed Windowing + +You can also leave out the `keyBy()` when specifying a windowed transformation. This means, however, +that Flink cannot process windows for different keys in parallel, essentially turning the +transformation into a non-parallel operation. + +Warning As mentioned in the introduction, non-keyed +windows have the disadvantage that work cannot be distributed in the cluster because +windows cannot be computed independently per key. This can have severe performance implications. + + +The basic structure of a non-keyed windowed transformation is as follows:
+{% highlight java %} +DataStream input = ...; -
- - - - - - - - - - - - - - - - - - - - - - - - - - -
TransformationDescription
Tumbling time window all
DataStream → WindowedStream
-

- Defines a window of 5 seconds, that "tumbles". This means that elements are - grouped according to their timestamp in groups of 5 second duration, and every element belongs to exactly one window. - The notion of time used is controlled by the StreamExecutionEnvironment. - {% highlight java %} -nonKeyedStream.timeWindowAll(Time.seconds(5)); - {% endhighlight %} -

-
Sliding time window all
DataStream → WindowedStream
-

- Defines a window of 5 seconds, that "slides" by 1 second. This means that elements are - grouped according to their timestamp in groups of 5 second duration, and elements can belong to more than - one window (since windows overlap by at least 4 seconds) - The notion of time used is controlled by the StreamExecutionEnvironment. - {% highlight java %} -nonKeyedStream.timeWindowAll(Time.seconds(5), Time.seconds(1)); - {% endhighlight %} -

-
Tumbling count window all
DataStream → WindowedStream
-

- Defines a window of 1000 elements, that "tumbles". This means that elements are - grouped according to their arrival time (equivalent to processing time) in groups of 1000 elements, - and every element belongs to exactly one window. - {% highlight java %} -nonKeyedStream.countWindowAll(1000) - {% endhighlight %} -

-
Sliding count window all
DataStream → WindowedStream
-

- Defines a window of 1000 elements, that "slides" every 100 elements. This means that elements are - grouped according to their arrival time (equivalent to processing time) in groups of 1000 elements, - and every element can belong to more than one window (as windows overlap by at least 900 elements). - {% highlight java %} -nonKeyedStream.countWindowAll(1000, 100) - {% endhighlight %} -

-
- +input + .windowAll() + .(); +{% endhighlight %}
+{% highlight scala %} +val input: DataStream[T] = ... -
- - - - - - - - - - - - - - - - - - - - - - - - - - -
TransformationDescription
Tumbling time window all
DataStream → WindowedStream
-

- Defines a window of 5 seconds, that "tumbles". This means that elements are - grouped according to their timestamp in groups of 5 second duration, and every element belongs to exactly one window. - The notion of time used is controlled by the StreamExecutionEnvironment. - {% highlight scala %} -nonKeyedStream.timeWindowAll(Time.seconds(5)); - {% endhighlight %} -

-
Sliding time window all
DataStream → WindowedStream
-

- Defines a window of 5 seconds, that "slides" by 1 second. This means that elements are - grouped according to their timestamp in groups of 5 second duration, and elements can belong to more than - one window (since windows overlap by at least 4 seconds) - The notion of time used is controlled by the StreamExecutionEnvironment. - {% highlight scala %} -nonKeyedStream.timeWindowAll(Time.seconds(5), Time.seconds(1)); - {% endhighlight %} -

-
Tumbling count window all
DataStream → WindowedStream
-

- Defines a window of 1000 elements, that "tumbles". This means that elements are - grouped according to their arrival time (equivalent to processing time) in groups of 1000 elements, - and every element belongs to exactly one window. - {% highlight scala %} -nonKeyedStream.countWindowAll(1000) - {% endhighlight %} -

-
Sliding count window all
DataStream → WindowedStream
-

- Defines a window of 1000 elements, that "slides" every 100 elements. This means that elements are - grouped according to their arrival time (equivalent to processing time) in groups of 1000 elements, - and every element can belong to more than one window (as windows overlap by at least 900 elements). - {% highlight scala %} -nonKeyedStream.countWindowAll(1000, 100) - {% endhighlight %} -

-
- +input + .windowAll() + .() +{% endhighlight %}