在这部分,我们将讨论 Spark Streaming 应用程序在发生故障时的行为。
为了理解 Spark Streaming 提供的语义,让我们记住 Spark RDDs 最基本的容错语义。
Spark 在数据上的操作像 HDFS 或者 S3 这样容错的文件系统一样。因此,所有从容错的数据中产生的 RDDs 也是容错的。然而,这种情况在 Spark Streaming 中不适用,因为在大多数情况下数据通过网络被接收(除非使用 fileStream)。为了对所有产生的 RDDs 实现相同的容错语义属性,接收的数据被复制到群集中 Worker 节点的多个 Spark Executor 之间(默认复制因子是 2)。这将会造成需要在发生故障时去恢复两种文件系统的数据类型 :
此外,我们应该关注两种类型的故障 :
与这个基础知识一起,让我们理解 Spark Streaming 的容错语义。
Streaming 处理系统经常会捕获系统异常并记录执行次数以保障系统容错,其中在这三种条件下可以保障服务,等等。
在任何流处理系统,从广义上讲,处理数据有三个步骤。
接收数据 : 接受数据或者从其他数据源接受数据。
转换数据 : 所接收的数据使用 DStream 和 RDD 变换转化。
输出数据 : 最后变换的数据被推送到外部系统,如文件系统,数据库,DashBoard 等。
如果一个 Stream 应用程序来实现端到端的数据传输,则每个环节都需要一次性完整保障。也就是说,每个记录都必须被接收正好一次,恰好转化一次,被推到下游系统一次。让我们了解在 Spark Streaming 的情况下这些步骤的语义。
接收数据 : 不同的输入源提供不同的容错。详细过程在下一小节中讨论。
转换数据 : 已经接收将被处理一次的所有数据,这要依靠于 RDDs 提供保证。即使有故障,只要接收到的输入数据是可读的,最终通过 RDDs 的转化将始终具有相同的内容。
输出数据 : 定义了数据至少一次输出,因为它依赖于输出数据操作类型(是否支持转换)和定义的下游系统(是否支持事务)。但是,用户可以实现自己的传输机制,以实现只要一次定义。会在后面小节里有更详细的讨论。
不同的输入源提供不同的保障,从至少一次到正好一次。阅读更多的细节。
如果所有的输入数据已经存在如 HDFS 的容错文件系统,Spark Streaming 总是可以从任何故障中恢复所有数据。这种定义,在一次处理后就能恢复。
对于基于 receiver 的输入源,容错的语义既依赖于故障的情形也依赖于 receiver 的类型。正如之前讨论的,有两种类型的 receiver :
选择哪种类型的 receiver 依赖于这些语义。如果一个 worker 节点出现故障,Reliable Receiver 不会丢失数据,Unreliable Receiver 会丢失接收了但是没有复制的数据。如果 driver 节点出现故障,除了以上情况下的数据丢失,所有过去接收并复制到内存中的数据都会丢失,这会影响有状态 transformation 的结果。
为了避免丢失过去接收的数据,Spark 1.2 引入了一个实验性的特征(预写日志机制)write ahead logs,它保存接收的数据到容错存储系统中。有了 write ahead logs 和 Reliable Receiver,我们可以做到零数据丢失以及 exactly-once 语义。
下表总结了根据故障的语义 :
Deployment Scenario | Worker Failure | Driver Failure |
---|---|---|
Spark 1.1 或者更早, 没有 write ahead log 的 Spark 1.2 | 在 Unreliable Receiver 情况下缓冲数据丢失;在 Reliable Receiver 和文件的情况下,零数据丢失 | 在 Unreliable Receiver 情况下缓冲数据丢失;在所有 receiver 情况下,过去的数据丢失;在文件的情况下,零数据丢失 |
带有 write ahead log 的 Spark 1.2 | 在 Reliable Receiver 和文件的情况下,零数据丢失 | 在 Reliable Receiver 和文件的情况下,零数据丢失 |
在 Spark 1.3 中,我们引入了一个新的 kafka Direct 的 API,它可以保证所有 kafka 数据由 spark stream 收到一次。如果实现仅一次输出操作,就可以实现保证终端到终端的一次。这种方法(版本 Spark2.0.2)中进一步讨论 kafka集成指南。
输出操作(例如 foreachRDD
)至少被定义一次,即变换后的数据在一次人工故障的情况下可能会不止一次写入外部实体。虽然可以通过操作 saveAs***Files
保存文件到系统上(具有相同的数据将简单地被覆盖),额外的尝试可能是必要的,以实现一次准确的语义。有两种方法。
幂等更新 : 多次尝试写相同的数据。例如,saveAs***Files
总是写入相同数据生成的文件。
事务更新 : 所有更新事务作出这样的更新是恰好遵循原子性。这样做的方法如下:
foreachRDD
)和 RDD 的分区索引。给这个应用定义唯一标识符标识 blob 数据。更新外部系统与当前事务(即原子性),如果已经被标识过得数据应用已经存在,那么就跳过更新。
dstream.foreachRDD { (rdd, time) => rdd.foreachPartition { partitionIterator => val partitionId = TaskContext.get.partitionId() val uniqueId = generateUniqueId(time.milliseconds, partitionId) // use this uniqueId to transactionally commit the data in partitionIterator } }