Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<ahref="#top"></a>
Flink's fault tolerance mechanism recovers programs in the presence of failures and
continues to execute them. Such failures include machine hardware failures, network failures,
transient program failures, etc.
* This will be replaced by the TOC
{:toc}
Streaming Fault Tolerance (DataStream API)
------------------------------------------
Flink has a checkpointing mechanism that recovers streaming jobs after failues. The checkpointing mechanism requires a *persistent* (or *durable*) source that
can be asked for prior records again (Apache Kafka is a good example of such a source).
The checkpointing mechanism stores the progress in the data sources and data sinks, the state of windows, as well as the user-defined state (see [Working with State]({{ site.baseurl }}/apis/streaming_guide.html#working-with-state)) consistently to provide *exactly once* processing semantics. Where the checkpoints are stored (e.g., JobManager memory, file system, database) depends on the configured [state backend]({{ site.baseurl }}/apis/state_backends.html).
The [docs on streaming fault tolerance]({{ site.baseurl }}/internals/stream_checkpointing.html) describe in detail the technique behind Flink's streaming fault tolerance mechanism.
To enable checkpointing, call `enableCheckpointing(n)` on the `StreamExecutionEnvironment`, where *n* is the checkpoint interval in milliseconds.
Other parameters for checkpointing include:
-*Number of retries*: The `setNumberOfExecutionRerties()` method defines how many times the job is restarted after a failure.
When checkpointing is activated, but this value is not explicitly set, the job is restarted infinitely often.
-*exactly-once vs. at-least-once*: You can optionally pass a mode to the `enableCheckpointing(n)` method to choose between the two guarantee levels.
Exactly-once is preferrable for most applications. At-least-once may be relevant for certain super-low-latency (consistently few milliseconds) applications.
-*number of concurrent checkpoints*: By default, the system will not trigger another checkpoint while one is still in progress. This ensures that the topology does not spend too much time on checkpoints and not make progress with processing the streams. It is possible to allow for multiple overlapping checkpoints, which is interesting for pipelines that have a certain processing delay (for example because the functions call external services that need some time to respond) but that still want to do very frequent checkpoints (100s of milliseconds) to re-process very little upon failures.
-*checkpoint timeout*: The time after which a checkpoint-in-progress is aborted, if it did not complete until then.
val env = ExecutionEnvironment.getExecutionEnvironment()
env.setNumberOfExecutionRetries(3)
{% endhighlight %}
</div>
</div>
You can also define default values for the number of execution retries and the retry delay in the `flink-conf.yaml`:
~~~
execution-retries.default: 3
~~~
Retry Delays
------------
Execution retries can be configured to be delayed. Delaying the retry means that after a failed execution, the re-execution does not start
immediately, but only after a certain delay.
Delaying the retries can be helpful when the program interacts with external systems where for example connections or pending transactions should reach a timeout before re-execution is attempted.
You can set the retry delay for each program as follows (the sample shows the DataStream API - the DataSet API works similarly):
Flink has a checkpointing mechanism that recovers streaming jobs after failues. The checkpointing mechanism requires a *persistent* or *durable* source that
can be asked for prior records again (Apache Kafka is a good example of a durable source).
The checkpointing mechanism stores the progress in the source as well as the user-defined state (see [Working with State](#working_with_state))
consistently to provide *exactly once* processing guarantees.
To enable checkpointing, call `enableCheckpointing(n)` on the `StreamExecutionEnvironment`, where *n* is the checkpoint interval in milliseconds.
Other parameters for checkpointing include:
-*Number of retries*: The `setNumberOfExecutionRerties()` method defines how many times the job is restarted after a failure.
When checkpointing is activated, but this value is not explicitly set, the job is restarted infinitely often.
-*exactly-once vs. at-least-once*: You can optionally pass a mode to the `enableCheckpointing(n)` method to choose between the two guarantee levels.
Exactly-once is preferrable for most applications. At-least-once may be relevant for certain super-low-latency (consistently few milliseconds) applications.
The [docs on streaming fault tolerance](../internals/stream_checkpointing.html) describe in detail the technique behind Flink's streaming fault tolerance mechanism.
Flink can guarantee exactly-once state updates to user-defined state only when the source participates in the
snapshotting mechanism. This is currently guaranteed for the Kafka source (and internal number generators), but
not for other sources. The following table lists the state update guarantees of Flink coupled with the bundled sources:
| HDFS rolling sink | exactly once | Implementation depends on Hadoop version |
| Elasticsearch | at least once | Duplicates need to be handled in Elasticsearch
| File sinks | at least once | |
| Socket sinks | at least once | |
| Standard output | at least once | |
-->
The [Fault Tolerance Documentation]({{ site.baseurl }}/apis/fault_tolerance.html) describes the options and parameters to enable and configure Flink's checkpointing mechanism.