diff --git a/docs/apis/streaming/event_time.md b/docs/apis/streaming/event_time.md index ad01f04e7672568dd08c216549cc7fc996ca3843..90eb67aa236fc8a381e2ba30c388fa2629cbf18d 100644 --- a/docs/apis/streaming/event_time.md +++ b/docs/apis/streaming/event_time.md @@ -112,6 +112,7 @@ stream
{% highlight scala %} val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) // alternatively: @@ -141,8 +142,67 @@ to use timestamp assignment and watermark generation in the Flink DataStream API # Event Time and Watermarks -*Note: For a deep introduction to Event Time, please refer also to the paper on the [Dataflow Model](https://static.googleusercontent.com/media/research.google.com/en//pubs/archive/43864.pdf)* +*Note: Flink implements many techniques from the Dataflow Model. For a good introduction to Event Time and, have also a look at these articles* + + - [Streaming 101](https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-101) by Tyler Akidau + - The [Dataflow Model paper](https://static.googleusercontent.com/media/research.google.com/en//pubs/archive/43864.pdf) + + +A stream processor that supports *event time* needs a way to measure the progress of event time. +For example, a window operator that builds hourly windows needs to be notified when event time has reached the +next full hour, such that the operator can close the next window. + +*Event Time* can progress independently of *Processing Time* (measures by wall clocks). +For example, in one program, the current *event time* of an operator can trail slightly behind the processing time +(accounting for a delay in receiving the latest elements) and both proceed at the same speed. In another streaming +program, which reads fast-forward through some data already buffered in a Kafka topic (or another message queue), event time +can progress by weeks in seconds. + +------ + +The mechanism in Flink to measure progress in event time are **Watermarks**. +Watermarks flow as part of the data stream and carry a timestamp *t*. A *Watermark(t)* declares that event time has reached time +*t* in that stream, meaning that all events with a timestamps *t' < t* have occurred. + +The figure below shows a stream of events with (logical) timestamps, and watermarks flowing inline. The events are in order +(with respect to their timestamp), meaning that watermarks are simply periodic markers in the stream with an in-order timestamp. + +A data stream with events (in order) and watermarks + +Watermarks are crucial for *out-of-order* streams, as shown in the figure below, where, events do not occur ordered by their timestamp. +Watermarks establish points in the stream where all events up to a certain timestamp have occurred. Once these watermarks reach an +operator, the operator can advance its internal *event time clock* to the value of the watermark. + +A data stream with events (out of order) and watermarks + + +## Watermarks in Parallel Streams + +Watermarks are generated at source functions, or directly after source functions. Each parallel subtask of a source function usually +generates its watermarks independently. These watermarks define the event time at that particular parallel source. + +As the watermarks flow through the streaming program, they advance the event time at the operators where they arrive. Whenever an +operator advances its event time, it generates a new watermark downstream for its successor operators. + +Operators that consume multiple input streams (e.g., after a *keyBy(...)* or *partition(...)* function, or a union) track the event time +on each of their input streams. The operator's current event time is the minimum of the input streams' event time. As the input streams +update their event time, so does the operator. + +The figure below shows an example of events and watermarks flowing through parallel streams, and operators tracking event time. + +Parallel data streams and operators with events and watermarks + + +## Late Elements +It is possible that certain elements violate the watermark condition, meaning that even after the *Watermark(t)* has occurred, +more elements with timestamp *t' < t* will occur. In fact, in many real world setups, certain elements can be arbitrarily +delayed, which it is impossible to define a time when all elements of a certain event timestamp have occurred. +Further more, even if the lateness can be bounded, delaying the watermarks by too much is often not desirable, because it delays +the evaluation of the event time windows by too much. +Due to that, some streaming programs will explicitly expect a number of *late* elements. Late elements are elements that +arrive after the system's event time clock (as signaled by the watermarks) has already passed the time of the late element's +timestamp. diff --git a/docs/apis/streaming/fig/parallel_streams_watermarks.svg b/docs/apis/streaming/fig/parallel_streams_watermarks.svg new file mode 100644 index 0000000000000000000000000000000000000000..f6a4c4b69742b1ac3eef058e1e6b7e97cc900891 --- /dev/null +++ b/docs/apis/streaming/fig/parallel_streams_watermarks.svg @@ -0,0 +1,516 @@ + + + + + + + + + + image/svg+xml + + + + + + + + + Source + (1) + + Source + (2) + + map + (1) + + map + (2) + + window + (1) + + window + (2) + + 33 + + 17 + + + + + + + + 29 + + 29 + + 17 + + 14 + + 14 + + 29 + + 14 + + 14 + + W(33) + + + W(17) + W(17) + + + A|30 + + + + + B|31 + + + C|30 + + + D|15 + + + E|30 + + + F|30 + + + G|18 + + + H|20 + + + B|35 + Watermark + + Event Time + at the operator + Event + [ + id|timestamp + ] + + + + Event Time + at input streams + + + + + diff --git a/docs/apis/streaming/fig/stream_watermark_in_order.svg b/docs/apis/streaming/fig/stream_watermark_in_order.svg new file mode 100644 index 0000000000000000000000000000000000000000..dcdbbc6f65558524ac3531e80f77d753a50644a7 --- /dev/null +++ b/docs/apis/streaming/fig/stream_watermark_in_order.svg @@ -0,0 +1,314 @@ + + + + + + + + + + image/svg+xml + + + + + + + + + + + Stream + (in order) + + + + 7 + + W(11) + W(20) + Watermark + + + + + 9 + + + 9 + + + 10 + + + 11 + + + 14 + + + 15 + + + 17 + Event + Event timestamp + + + + + 18 + + + 20 + + + 19 + + + 21 + + + 23 + + + diff --git a/docs/apis/streaming/fig/stream_watermark_out_of_order.svg b/docs/apis/streaming/fig/stream_watermark_out_of_order.svg new file mode 100644 index 0000000000000000000000000000000000000000..e8f80a0ad3af3a8d7c0a7e17ada38ac15265e5cd --- /dev/null +++ b/docs/apis/streaming/fig/stream_watermark_out_of_order.svg @@ -0,0 +1,314 @@ + + + + + + + + + + image/svg+xml + + + + + + + + + + + Stream + (out of order) + + + + 7 + + W(11) + W(17) + + + 11 + + + 15 + + + 9 + + + 12 + + + 14 + + + 17 + + + 12 + + + 22 + + + 20 + + + 17 + + + 19 + + + 21 + Watermark + + + Event + Event timestamp + + + + +