# Checkpointing 检验指示 弗林克中的每个函数和运算符都可以是*状态*(有关详细信息,请参阅[与状态一起工作](state.html)。状态函数存储数据跨单个元素/事件的处理,使状态成为任何类型的更精细操作的关键构建块。 为了使状态容错,Flink需要到**检查点**状态。检查点允许Flink恢复流中的状态和位置,以赋予应用程序与无故障执行相同的语义。 [关于流式容错的文档](//ci.apache.org/projects/flink/flink-docs-release-1.7/internals/stream_checkpointing.html)详细介绍了flink的流容错机制背后的技术。 ## Prerequisites 先决条件 Flink的检查点机制与流和状态的持久存储交互。一般而言,它要求: * a _persistent_ (或 _durable_ )数据源,可以重放特定时间量的记录。这样的源的示例是持久消息队列(例如,ApacheKafka、RabbitMQ、AmazonKinesis、Googlepubsub)或文件系统(例如,HDFS、S3、GFS、NFS、CEH等)。)。 * 用于状态的持久存储,通常是分布式文件系统(例如,HDFS、S3、GFS、NFS、CEH等)。) ## Enabling and Configuring Checkpointing 启用和配置检查点 默认情况下,禁用检查点。要启用检查点,请在`StreamExecutionEnvironment`上调用`enableCheckpointing(n)`,其中_n_是以毫秒为单位的检查点间隔。 检查点操作的其他参数包括: * _exactly-once vs. at-least-once_:您可以选择将模式传递到 `enableCheckpointing(n)`方法以在两个保证级别之间进行选择。对于大多数应用而言,精确-一次是优选的。至少-一次对于某些超低延迟(持续几毫秒)应用而言可能是相关的。 * _checkpoint timeout_: 进程检查点终止的时间,如果该检查点到那时还没有完成的话. * _minimum time between checkpoints_: 为了确保流应用程序在检查点之间取得一定的进展,可以定义在检查点之间传递所需的时间。例如,如果将此值设置为 _5000_,则下一个检查点将在上一个检查点完成后5秒钟内启动,而不管检查点持续时间和检查点间隔如何。请注意,这意味着检查点间隔永远不会小于此参数。 通过定义“检查点之间的时间”通常比定义检查点间隔更容易配置应用程序,因为“检查点之间的时间”不容易受到检查点有时比平均时间长的影响(例如,如果目标存储系统暂时慢的话)。 注意,这个值还意味着并发检查点的数目是 _one_。 * _number of concurrent checkpoints_: 默认情况下,当另一个检查点仍在进行中时,系统不会触发另一个检查点。这可以确保拓扑不会花费太多时间在检查点上,并且不会在处理流方面取得进展。它可以允许多个重叠检查点,这对于具有一定处理延迟(例如,函数调用需要一定时间响应的外部服务)但仍然希望执行非常频繁的检查点(100毫秒)来重新处理故障时很少处理的管道来说是很有趣的。 当定义检查点之间的最小时间时,无法使用此选项。 * _externalized checkpoints_: 您可以在外部配置定期检查点。外部化检查点将其元数据写入永久存储,并在作业失败时自动清除。这样,如果您的作业失败,您将有一个检查点以恢复。[外部化检查点的部署说明](//ci.apache.org/projects/flink/flink-docs-release-1.7/ops/state/checkpoints.html#externalized-checkpoints)中有更多详细信息。 * _fail/continue task on checkpoint errors_: 这将确定如果任务的检查点过程执行中发生错误,任务是否会失败。这是默认行为。或者,当此操作被禁用时,任务将简单地将检查点拒绝给检查点协调器并继续运行。 ``` StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // start a checkpoint every 1000 ms env.enableCheckpointing(1000); // advanced options: // set mode to exactly-once (this is the default) env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // make sure 500 ms of progress happen between checkpoints env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); // checkpoints have to complete within one minute, or are discarded env.getCheckpointConfig().setCheckpointTimeout(60000); // allow only one checkpoint to be in progress at the same time env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // enable externalized checkpoints which are retained after job cancellation env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); ``` ``` val env = StreamExecutionEnvironment.getExecutionEnvironment() // start a checkpoint every 1000 ms env.enableCheckpointing(1000) // advanced options: // set mode to exactly-once (this is the default) env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) // make sure 500 ms of progress happen between checkpoints env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500) // checkpoints have to complete within one minute, or are discarded env.getCheckpointConfig.setCheckpointTimeout(60000) // prevent the tasks from failing if an error happens in their checkpointing, the checkpoint will just be declined. env.getCheckpointConfig.setFailTasksOnCheckpointingErrors(false) // allow only one checkpoint to be in progress at the same time env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) ``` ### Related Config Options 相关配置选项 可以通过`conf/flink-conf.yaml`设置更多参数和/或默认值(请参阅[配置](//ci.apache.org/projects/flink/flink-docs-release-1.7/ops/config.html)作为完整指南): | Key | Default | Description | | --- | --- | --- | | ##### state.backend | (none) | 用于存储和检查点状态的状态后端。| | ##### state.backend.async | true | 选项是否应在可能的情况下使用异步快照方法并进行配置。有些状态后端可能不支持异步快照,或者只支持异步快照,而忽略此选项。| | ##### state.backend.fs.memory-threshold | 1024 | 状态数据文件的最小大小。所有小于该状态块的状态块都内联存储在根检查点元数据文件中。 | | ##### state.backend.incremental | false | 选项状态后端是否应创建增量检查点(如果可能)。对于增量检查点,仅存储与上一个检查点不同的值,而不是完整的检查点状态。某些状态后端可能不支持增量检查点并忽略此选项。| | ##### state.backend.local-recovery | false | 选项状态后端是否应创建增量检查点(如果可能)。对于增量检查点,仅存储与上一个检查点不同的值,而不是完整的检查点状态。某些状态后端可能不支持增量检查点并忽略此选项。| | ##### state.checkpoints.dir | (none) | 用于在FLink支持的文件系统中存储检查点数据文件和元数据的默认目录。必须从所有参与进程/节点访问存储路径(即,所有任务管理器和作业管理器)。| | ##### state.checkpoints.num-retained | 1 | 要保留的已完成检查点的最大数量。 | | ##### state.savepoints.dir | (none) | 保存点的默认目录。用于将保存点写入文件系统的状态后端(MemoryStateBackend、FsStateBackend、RocksDBStateBackend)。 | | ##### taskmanager.state.local.root-dirs | (none) | 配置参数定义了用于存储用于本地恢复的基于文件的状态的根目录。本地恢复当前仅覆盖键控状态后端。当前,MemoryStateBackend不支持本地恢复并忽略此选项| ## Selecting a State Backend 选择状态后端 flink的[checkpoint机制](//ci.apache.org/projects/flink/flink-docs-release-1.7/internals/stream_checkpointing.html)存储定时器和有状态运算符的所有状态的一致快照,包括连接器、Windows和任何[用户定义的状态](state.html)。其中存储检查点(例如,JobManager内存、文件系统、数据库)取决于所配置的**状态后端**。 默认情况下,状态保存在TaskManager中的内存中,检查点存储在JobManager中的内存中。为了适当地保持大状态,Flink支持在其他状态后端存储和检查点状态的各种方法。可以通过`StreamExecutionEnvironment.setStateBackend(…)配置状态后端的选择。``。 有关作业范围和集群范围内配置的可用状态后端和选项的详细信息,请参见[State backends](//ci.apache.org/projects/flink/flink-docs-release-1.7/ops/state/state_backends.html)]。 ## State Checkpoints in Iterative Jobs 迭代作业中的状态检查点 Flink目前只为没有迭代的作业提供处理保证。在迭代作业上启用检查点会导致异常。为了在迭代程序上强制检查点,用户在启用检查点时需要设置一个特殊标志:‘env.enableCheckpoint(Interval,force=true)’。 请注意,在循环边缘的飞行记录(以及与它们相关的状态更改)将在失败期间丢失。 ## Restart Strategies 重启策略 Flink支持不同的重新启动策略,这些策略控制在发生故障时如何重新启动作业。有关更多信息,请参见[重新启动Strategies](//ci.apache.org/projects/flink/flink-docs-release-1.7/dev/restart_strategies.html).