* state (fault-tolerant, consistent, only on keyed stream)
* timers (event time and processing time, only on keyed stream)
* 状态(容错、一致,仅在键控流中)
* 定时器(事件时间和处理时间,仅在键控流中)
The `ProcessFunction` can be thought of as a `FlatMapFunction` with access to keyed state and timers. It handles events by being invoked for each event received in the input stream(s).
For fault-tolerant state, the `ProcessFunction` gives access to Flink’s [keyed state](//ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/state/state.html), accessible via the `RuntimeContext`, similar to the way other stateful functions can access keyed state.
The timers allow applications to react to changes in processing time and in [event time](//ci.apache.org/projects/flink/flink-docs-release-1.7/dev/event_time.html). Every call to the function `processElement(...)` gets a `Context` object which gives access to the element’s event time timestamp, and to the _TimerService_. The `TimerService` can be used to register callbacks for future event-/processing-time instants. When a timer’s particular time is reached, the `onTimer(...)` method is called. During that call, all states are again scoped to the key with which the timer was created, allowing timers to manipulate keyed state.
To realize low-level operations on two inputs, applications can use `CoProcessFunction`. This function is bound to two different inputs and gets individual calls to `processElement1(...)` and `processElement2(...)` for records from the two different inputs.
Implementing a low level join typically follows this pattern:
实现低级别连接通常遵循以下模式:
* Create a state object for one input (or both)
* 为一个输入创建状态对象(或两者)
* Update the state upon receiving elements from its input
* 从其输入接收元素时更新状态
* Upon receiving elements from the other input, probe the state and produce the joined result
* 接收来自其他输入的元素后,探测状态并生成连接结果。
For example, you might be joining customer data to financial trades, while keeping state for the customer data. If you care about having complete and deterministic joins in the face of out-of-order events, you can use a timer to evaluate and emit the join for a trade when the watermark for the customer data stream has passed the time of that trade.
* The count, key, and last-modification-timestamp are stored in a `ValueState`, which is implicitly scoped by key.
* 计数、密钥和最后修改时间戳存储在一个“Value State”中,该状态由密钥隐式范围。
* For each record, the `ProcessFunction` increments the counter and sets the last-modification timestamp
* 对于每个记录,“ProcessFunction”将递增计数器并设置最后修改时间戳
* The function also schedules a callback one minute into the future (in event time)
* 该函数还将回调调度为将来(在事件时间内)
* Upon each callback, it checks the callback’s event time timestamp against the last-modification time of the stored count and emits the key/count if they match (i.e., no further update occurred during that minute)
@@ -193,10 +216,13 @@ class CountWithTimeoutFunction extends ProcessFunction[(String, String), (String
**NOTE:** Before Flink 1.4.0, when called from a processing-time timer, the `ProcessFunction.onTimer()` method sets the current processing time as event-time timestamp. This behavior is very subtle and might not be noticed by users. Well, it’s harmful because processing-time timestamps are indeterministic and not aligned with watermarks. Besides, user-implemented logic depends on this wrong timestamp highly likely is unintendedly faulty. So we’ve decided to fix it. Upon upgrading to 1.4.0, Flink jobs that are using this incorrect event-time timestamp will fail, and users should adapt their jobs to the correct logic.
Both types of timers (processing-time and event-time) are internally maintained by the `TimerService` and enqueued for execution.
这两种类型的定时器(处理时间和事件时间)都由‘TimerService’内部维护,并排队等待执行。
The `TimerService` deduplicates timers per key and timestamp, i.e., there is at most one timer per key and timestamp. If multiple timers are registered for the same timestamp, the `onTimer()` method will be called just once.
Timers are fault tolerant and checkpointed along with the state of the application. In case of a failure recovery or when starting an application from a savepoint, the timers are restored.
Note Checkpointed processing-time timers that were supposed to fire before their restoration, will fire immediately. This might happen when an application recovers from a failure or when it is started from a savepoint.
Note Timers are always asynchronously checkpointed, except for the combination of RocksDB backend / with incremental snapshots / with heap-based timers (will be resolved with `FLINK-10026`). Notice that large numbers of timers can increase the checkpointing time because timers are part of the checkpointed state. See the “Timer Coalescing” section for advice on how to reduce the number of timers.
Since Flink maintains only one timer per key and timestamp, you can reduce the number of timers by reducing the timer resolution to coalesce them.
因为Flink每个键只维护一个计时器和时间戳,所以您可以通过降低定时器分辨率来减少计时器的数量。
For a timer resolution of 1 second (event or processing time), you can round down the target time to full seconds. Timers will fire at most 1 second earlier but not later than requested with millisecond accuracy. As a result, there are at most one timer per key and second.
Since event-time timers only fire with watermarks coming in, you may also schedule and coalesce these timers with the next watermark by using the current one: