This section is relevant for programs running on **event time**. For an introduction to _event time_, _processing time_, and _ingestion time_, please refer to the [introduction to event time](//ci.apache.org/projects/flink/flink-docs-release-1.7/dev/event_time.html).
In order to work with _event time_, Flink needs to know the events’ _timestamps_, meaning each element in the stream needs to have its event timestamp _assigned_. This is usually done by accessing/extracting the timestamp from some field in the element.
Timestamp assignment goes hand-in-hand with generating watermarks, which tell the system about progress in event time.
时间戳分配与生成水印同时进行,水印可以告诉系统事件时间的进展。
There are two ways to assign timestamps and generate watermarks:
有两种分配时间戳和生成水印的方法:
1.Directly in the data stream source
2.Via a timestamp assigner / watermark generator: in Flink, timestamp assigners also define the watermarks to be emitted
1.直接在数据流源中
2.通过时间戳分配器/水印生成器:在FLink中,时间戳分配器还定义要发射的水印
Attention Both timestamps and watermarks are specified as milliseconds since the Java epoch of 1970-01-01T00:00:00Z.
注意时间戳和水印都被指定为自1970-01-01T00:00:00Z的Java时期以来的毫秒。
### Source Functions with Timestamps and Watermarks
### Source Functions with Timestamps and Watermarks 具有时间戳和水印的源函数
Stream sources can directly assign timestamps to the elements they produce, and they can also emit watermarks. When this is done, no timestamp assigner is needed. Note that if a timestamp assigner is used, any timestamps and watermarks provided by the source will be overwritten.
To assign a timestamp to an element in the source directly, the source must use the `collectWithTimestamp(...)` method on the `SourceContext`. To generate watermarks, the source must call the `emitWatermark(Watermark)` function.
Timestamp assigners take a stream and produce a new stream with timestamped elements and watermarks. If the original stream had timestamps and/or watermarks already, the timestamp assigner overwrites them.
Timestamp assigners are usually specified immediately after the data source, but it is not strictly required to do so. A common pattern, for example, is to parse (_MapFunction_) and filter (_FilterFunction_) before the timestamp assigner. In any case, the timestamp assigner needs to be specified before the first operation on event time (such as the first window operation). As a special case, when using Kafka as the source of a streaming job, Flink allows the specification of a timestamp assigner / watermark emitter inside the source (or consumer) itself. More information on how to do so can be found in the [Kafka Connector documentation](//ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/kafka.html).
**NOTE:** The remainder of this section presents the main interfaces a programmer has to implement in order to create her own timestamp extractors/watermark emitters. To see the pre-implemented extractors that ship with Flink, please refer to the [Pre-defined Timestamp Extractors / Watermark Emitters](//ci.apache.org/projects/flink/flink-docs-release-1.7/dev/event_timestamp_extractors.html) page.
`AssignerWithPeriodicWatermarks` assigns timestamps and generates watermarks periodically (possibly depending on the stream elements, or purely based on processing time).
The interval (every _n_ milliseconds) in which the watermark will be generated is defined via `ExecutionConfig.setAutoWatermarkInterval(...)`. The assigner’s `getCurrentWatermark()` method will be called each time, and a new watermark will be emitted if the returned watermark is non-null and larger than the previous watermark.
Here we show two simple examples of timestamp assigners that use periodic watermark generation. Note that Flink ships with a `BoundedOutOfOrdernessTimestampExtractor` similar to the `BoundedOutOfOrdernessGenerator` shown below, which you can read about [here](//ci.apache.org/projects/flink/flink-docs-release-1.7/dev/event_timestamp_extractors.html#assigners-allowing-a-fixed-amount-of-lateness).
@@ -149,6 +149,9 @@ Here we show two simple examples of timestamp assigners that use periodic waterm
* This generator generates watermarks assuming that elements arrive out of order,
* but only to a certain degree. The latest elements for a certain timestamp t will arrive
* at most n milliseconds after the earliest elements for timestamp t.
* 当元件无序到达时,该发生器产生水印,
* 但仅在一定程度上。某个时间戳T的最新元素将到达
* 在时间戳T最早的元素之后的至多N毫秒。
*/
public class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks<MyEvent> {
...
...
@@ -173,6 +176,8 @@ public class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWater
/**
* This generator generates watermarks that are lagging behind processing time by a fixed amount.
* It assumes that elements arrive in Flink after a bounded delay.
* 该生成器生成落后于处理时间的水印。
* 假定元素在有界延迟之后到达Flink。
*/
public class TimeLagWatermarkGenerator implements AssignerWithPeriodicWatermarks<MyEvent> {
...
...
@@ -200,6 +205,8 @@ public class TimeLagWatermarkGenerator implements AssignerWithPeriodicWatermarks
* This generator generates watermarks assuming that elements arrive out of order,
* but only to a certain degree. The latest elements for a certain timestamp t will arrive
* at most n milliseconds after the earliest elements for timestamp t.
* 这个生成器产生水印,假设元素到达不正常,
* 但只在一定程度上。某个时间戳t的最新元素将在时间戳t的最早元素之后最多n毫秒到达。
*/
class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {
...
...
@@ -221,6 +228,8 @@ class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks[MyEv
/**
* This generator generates watermarks that are lagging behind processing time by a fixed amount.
* It assumes that elements arrive in Flink after a bounded delay.
* 该生成器生成落后于处理时间的水印。
* 假定元素在有界延迟之后到达Flink。
*/
class TimeLagWatermarkGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {
...
...
@@ -238,11 +247,11 @@ class TimeLagWatermarkGenerator extends AssignerWithPeriodicWatermarks[MyEvent]
#### **With Punctuated Watermarks**
#### **With Punctuated Watermarks 加上标点符号 **
To generate watermarks whenever a certain event indicates that a new watermark might be generated, use `AssignerWithPunctuatedWatermarks`. For this class Flink will first call the `extractTimestamp(...)` method to assign the element a timestamp, and then immediately call the `checkAndGetNextWatermark(...)` method on that element.
The `checkAndGetNextWatermark(...)` method is passed the timestamp that was assigned in the `extractTimestamp(...)` method, and can decide whether it wants to generate a watermark. Whenever the `checkAndGetNextWatermark(...)` method returns a non-null watermark, and that watermark is larger than the latest previous watermark, that new watermark will be emitted.
@@ -280,17 +289,17 @@ class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[MyEvent] {
_Note:_ It is possible to generate a watermark on every single event. However, because each watermark causes some computation downstream, an excessive number of watermarks degrades performance.
When using [Apache Kafka](connectors/kafka.html) as a data source, each Kafka partition may have a simple event time pattern (ascending timestamps or bounded out-of-orderness). However, when consuming streams from Kafka, multiple partitions often get consumed in parallel, interleaving the events from the partitions and destroying the per-partition patterns (this is inherent in how Kafka’s consumer clients work).
In that case, you can use Flink’s Kafka-partition-aware watermark generation. Using that feature, watermarks are generated inside the Kafka consumer, per Kafka partition, and the per-partition watermarks are merged in the same way as watermarks are merged on stream shuffles.
For example, if event timestamps are strictly ascending per Kafka partition, generating per-partition watermarks with the [ascending timestamps watermark generator](event_timestamp_extractors.html#assigners-with-ascending-timestamps) will result in perfect overall watermarks.
The illustrations below show how to use the per-Kafka-partition watermark generation, and how watermarks propagate through the streaming dataflow in that case.
下图展示了如何使用per-Kafka分割水印生成,以及在这种情况下水印如何通过流数据流传播。
...
...
@@ -322,5 +331,5 @@ val stream: DataStream[MyType] = env.addSource(kafkaSource)
![Generating Watermarks with awareness for Kafka-partitions](../img/parallel_kafka_watermarks.svg)