You can think of Keyed State as Operator State that has been partitioned, or sharded, with exactly one state-partition per key. Each keyed-state is logically bound to a unique composite of <parallel-operator-instance, key>, and since each key “belongs” to exactly one parallel instance of a keyed operator, we can think of this simply as <operator, key>.
Keyed State is further organized into so-called _Key Groups_. Key Groups are the atomic unit by which Flink can redistribute Keyed State; there are exactly as many Key Groups as the defined maximum parallelism. During execution each parallel instance of a keyed operator works with the keys for one or more Key Groups.
With _Operator State_ (or _non-keyed state_), each operator state is bound to one parallel operator instance. The [Kafka Connector](//ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/kafka.html) is a good motivating example for the use of Operator State in Flink. Each parallel instance of the Kafka consumer maintains a map of topic partitions and offsets as its Operator State.
The Operator State interfaces support redistributing state among parallel operator instances when the parallelism is changed. There can be different schemes for doing this redistribution.
_Managed State_is represented in data structures controlled by the Flink runtime, such as internal hash tables, or RocksDB. Examples are “ValueState”, “ListState”, etc. Flink’s runtime encodes the states and writes them into the checkpoints.
_Raw State_is state that operators keep in their own data structures. When checkpointed, they only write a sequence of bytes into the checkpoint. Flink knows nothing about the state’s data structures and sees only the raw bytes.
All datastream functions can use managed state, but the raw state interfaces can only be used when implementing operators. Using managed state (rather than raw state) is recommended, since with managed state Flink is able to automatically redistribute state when the parallelism is changed, and also do better memory management.
Attention If your managed state needs custom serialization logic, please see the [corresponding guide](custom_serialization.html) in order to ensure future compatibility. Flink’s default serializers don’t need special treatment.
The managed keyed state interface provides access to different types of state that are all scoped to the key of the current input element. This means that this type of state can only be used on a `KeyedStream`, which can be created via `stream.keyBy(…)`.
Now, we will first look at the different types of state available and then we will see how they can be used in a program. The available state primitives are:
现在,我们将首先查看可用的不同类型的状态,然后我们将看到它们如何在程序中使用。可用的状态基元是:
*`ValueState<T>`: This keeps a value that can be updated and retrieved (scoped to key of the input element as mentioned above, so there will possibly be one value for each key that the operation sees). The value can be set using `update(T)` and retrieved using `T value()`.
*`ListState<T>`: This keeps a list of elements. You can append elements and retrieve an `Iterable` over all currently stored elements. Elements are added using `add(T)` or `addAll(List<T>)`, the Iterable can be retrieved using `Iterable<T> get()`. You can also override the existing list with `update(List<T>)`
*`ReducingState<T>`: This keeps a single value that represents the aggregation of all values added to the state. The interface is similar to `ListState` but elements added using `add(T)` are reduced to an aggregate using a specified `ReduceFunction`.
*`AggregatingState<IN, OUT>`: This keeps a single value that represents the aggregation of all values added to the state. Contrary to `ReducingState`, the aggregate type may be different from the type of elements that are added to the state. The interface is the same as for `ListState` but elements added using `add(IN)` are aggregated using a specified `AggregateFunction`.
*`FoldingState<T, ACC>`: This keeps a single value that represents the aggregation of all values added to the state. Contrary to `ReducingState`, the aggregate type may be different from the type of elements that are added to the state. The interface is similar to `ListState` but elements added using `add(T)` are folded into an aggregate using a specified `FoldFunction`.
*`MapState<UK, UV>`: This keeps a list of mappings. You can put key-value pairs into the state and retrieve an `Iterable` over all currently stored mappings. Mappings are added using `put(UK, UV)` or `putAll(Map<UK, UV>)`. The value associated with a user key can be retrieved using `get(UK)`. The iterable views for mappings, keys and values can be retrieved using `entries()`, `keys()` and `values()` respectively.
All types of state also have a method `clear()` that clears the state for the currently active key, i.e. the key of the input element.
所有类型的状态都有一个方法 `clear()`,用于清除当前活动键的状态,即输入元素的键。
Attention `FoldingState` and `FoldingStateDescriptor` have been deprecated in Flink 1.4 and will be completely removed in the future. Please use `AggregatingState` and `AggregatingStateDescriptor` instead.
It is important to keep in mind that these state objects are only used for interfacing with state. The state is not necessarily stored inside but might reside on disk or somewhere else. The second thing to keep in mind is that the value you get from the state depends on the key of the input element. So the value you get in one invocation of your user function can differ from the value in another invocation if the keys involved are different.
To get a state handle, you have to create a `StateDescriptor`. This holds the name of the state (as we will see later, you can create several states, and they have to have unique names so that you can reference them), the type of the values that the state holds, and possibly a user-specified function, such as a `ReduceFunction`. Depending on what type of state you want to retrieve, you create either a `ValueStateDescriptor`, a `ListStateDescriptor`, a `ReducingStateDescriptor`, a `FoldingStateDescriptor` or a `MapStateDescriptor`.
要获得状态句柄,必须创建一个`StateDescriptor`。这保存了状态的名称(我们稍后会看到,您可以创建几个状态,它们必须有唯一的名称,以便您可以引用它们)、状态所持有的值的类型,以及可能是用户指定的函数,例如`ReduceFunction`。根据要检索的状态类型,可以创建 `ValueStateDescriptor`, a `ListStateDescriptor`, a `ReducingStateDescriptor`, a `FoldingStateDescriptor` or a `MapStateDescriptor`。
State is accessed using the `RuntimeContext`, so it is only possible in _rich functions_. Please see [here](//ci.apache.org/projects/flink/flink-docs-release-1.7/dev/api_concepts.html#rich-functions) for information about that, but we will also see an example shortly. The `RuntimeContext` that is available in a `RichFunction` has these methods for accessing state:
This example implements a poor man’s counting window. We key the tuples by the first field (in the example all have the same key `1`). The function stores the count and a running sum in a `ValueState`. Once the count reaches 2 it will emit the average and clear the state so that we start over from `0`. Note that this would keep a different state value for each different input key if we had tuples with different values in the first field.
A _time-to-live_ (TTL) can be assigned to the keyed state of any type. If a TTL is configured and a state value has expired, the stored value will be cleaned up on a best effort basis which is discussed in more detail below.
a _time-to-live_ (Ttl)可以分配给任意类型的键控状态。如果配置了一个TTL,并且状态值已经过期,则将在尽最大努力的基础上清理存储的值,下文将对此进行更详细的讨论。
All state collection types support per-entry TTLs. This means that list elements and map entries expire independently.
所有状态集合类型都支持每个入口TTL。这意味着列表元素和映射项将独立过期。
In order to use state TTL one must first build a `StateTtlConfig` configuration object. The TTL functionality can then be enabled in any state descriptor by passing the configuration:
In case of `NeverReturnExpired`, the expired state behaves as if it does not exist anymore, even if it still has to be removed. The option can be useful for use cases where data has to become unavailable for read access strictly after TTL, e.g. application working with privacy sensitive data.
Another option `ReturnExpiredIfNotCleanedUp` allows to return the expired state before its cleanup.
另一个选项`ReturnExpiredIfNotCleanedUp` 允许在清理之前返回过期状态。
**Notes:**
**注意:**
*The state backends store the timestamp of the last modification along with the user value, which means that enabling this feature increases consumption of state storage. Heap state backend stores an additional Java object with a reference to the user state object and a primitive long value in memory. The RocksDB state backend adds 8 bytes per stored value, list entry or map entry.
*Only TTLs in reference to _processing time_ are currently supported.
*当前只支持引用 _processing time_ 的TTL。
*Trying to restore state, which was previously configured without TTL, using TTL enabled descriptor or vice versa will lead to compatibility failure and `StateMigrationException`.
*The map state with TTL currently supports null user values only if the user value serializer can handle null values. If the serializer does not support null values, it can be wrapped with `NullableSerializer` at the cost of an extra byte in the serialized form.
Attention This means that by default if expired state is not read, it won’t be removed, possibly leading to ever growing state. This might change in future releases.
Additionally, you can activate the cleanup at the moment of taking the full state snapshot which will reduce its size. The local state is not cleaned up under the current implementation but it will not include the removed expired state in case of restoration from the previous snapshot. It can be configured in `StateTtlConfig`:
@@ -297,13 +298,13 @@ val ttlConfig = StateTtlConfig
This option is not applicable for the incremental checkpointing in the RocksDB state backend.
此选项不适用于RocksDB状态后端中的增量检查点。
More strategies will be added in the future for cleaning up expired state automatically in the background.
更多的策略将添加在未来的清理过期状态自动在后台。
### State in the Scala DataStream API
### State in the Scala DataStream API Scala 数据流API中的状态
In addition to the interface described above, the Scala API has shortcuts for stateful `map()` or `flatMap()` functions with a single `ValueState` on `KeyedStream`. The user function gets the current value of the `ValueState` in an `Option` and must return an updated value that will be used to update the state.
@@ -321,13 +322,14 @@ val counts: DataStream[(String, Int)] = stream
## Using Managed Operator State
## Using Managed Operator State 使用托管运营商状态
To use managed operator state, a stateful function can implement either the more general `CheckpointedFunction` interface, or the `ListCheckpointed<T extends Serializable>` interface.
The `CheckpointedFunction` interface provides access to non-keyed state with different redistribution schemes. It requires the implementation of two methods:
Whenever a checkpoint has to be performed, `snapshotState()` is called. The counterpart, `initializeState()`, is called every time the user-defined function is initialized, be that when the function is first initialized or be that when the function is actually recovering from an earlier checkpoint. Given this, `initializeState()` is not only the place where different types of state are initialized, but also where state recovery logic is included.
Currently, list-style managed operator state is supported. The state is expected to be a `List` of _serializable_ objects, independent from each other, thus eligible for redistribution upon rescaling. In other words, these objects are the finest granularity at which non-keyed state can be redistributed. Depending on the state accessing method, the following redistribution schemes are defined:
***Even-split redistribution:** Each operator returns a List of state elements. The whole state is logically a concatenation of all lists. On restore/redistribution, the list is evenly divided into as many sublists as there are parallel operators. Each operator gets a sublist, which can be empty, or contain one or more elements. As an example, if with parallelism 1 the checkpointed state of an operator contains elements `element1` and `element2`, when increasing the parallelism to 2, `element1` may end up in operator instance 0, while `element2` will go to operator instance 1.
***Union redistribution:** Each operator returns a List of state elements. The whole state is logically a concatenation of all lists. On restore/redistribution, each operator gets the complete list of state elements.
Below is an example of a stateful `SinkFunction` that uses `CheckpointedFunction` to buffer elements before sending them to the outside world. It demonstrates the basic even-split redistribution list state:
@@ -455,9 +457,9 @@ class BufferingSink(threshold: Int = 0)
The `initializeState` method takes as argument a `FunctionInitializationContext`. This is used to initialize the non-keyed state “containers”. These are a container of type `ListState` where the non-keyed state objects are going to be stored upon checkpointing.
Note how the state is initialized, similar to keyed state, with a `StateDescriptor` that contains the state name and information about the type of the value that the state holds:
The naming convention of the state access methods contain its redistribution pattern followed by its state structure. For example, to use list state with the union redistribution scheme on restore, access the state by using `getUnionListState(descriptor)`. If the method name does not contain the redistribution pattern, _e.g._ `getListState(descriptor)`, it simply implies that the basic even-split redistribution scheme will be used.
After initializing the container, we use the `isRestored()` method of the context to check if we are recovering after a failure. If this is `true`, _i.e._ we are recovering, the restore logic is applied.
As shown in the code of the modified `BufferingSink`, this `ListState` recovered during state initialization is kept in a class variable for future use in `snapshotState()`. There the `ListState` is cleared of all objects included by the previous checkpoint, and is then filled with the new ones we want to checkpoint.
As a side note, the keyed state can also be initialized in the `initializeState()` method. This can be done using the provided `FunctionInitializationContext`.
The `ListCheckpointed` interface is a more limited variant of `CheckpointedFunction`, which only supports list-style state with even-split redistribution scheme on restore. It also requires the implementation of two methods:
On `snapshotState()` the operator should return a list of objects to checkpoint and `restoreState` has to handle such a list upon recovery. If the state is not re-partitionable, you can always return a `Collections.singletonList(MY_STATE)` in the `snapshotState()`.
Stateful sources require a bit more care as opposed to other operators. In order to make the updates to the state and output collection atomic (required for exactly-once semantics on failure/recovery), the user is required to get a lock from the source’s context.
Some operators might need the information when a checkpoint is fully acknowledged by Flink to communicate that with the outside world. In this case see the `org.apache.flink.runtime.state.CheckpointListener` interface.