Spark Streaming 是 Spark Core API 的扩展,它支持弹性的,高吞吐的,容错的实时数据流的处理。数据可以通过多种数据源获取,例如 Kafka,Flume,Kinesis 以及 TCP sockets,也可以通过例如 map,reduce,join,window 等的高阶函数组成的复杂算法处理。最终,处理后的数据可以输出到文件系统,数据库以及实时仪表盘中。事实上,你还可以在数据流上使用 Spark机器学习 以及 图形处理算法 。
在内部,它工作原理如下,Spark Streaming 接收实时输入数据流并将数据切分成多个批数据,然后交由 Spark 引擎处理并分批的生成结果数据流。
Spark Streaming 提供了一个高层次的抽象叫做离散流(discretized stream)或者 DStream,它代表一个连续的数据流。DStream 可以通过来自数据源的输入数据流创建,例如 Kafka,Flume 以及 Kinesis,或者在其他 DStream 上进行高层次的操作创建。在内部,一个 DStream 是通过一系列的 RDD 来表示。
本指南告诉你如何使用 DStream 来编写一个 Spark Streaming 程序.你可以使用 Scala ,Java 或者 Python(Spark 1.2 版本后引进)来编写程序。
注意 :
在 Python 有些 API 可能会有不同或不可用,在这个指南里你将会发现 Python API 的标签来高亮显示这些不同。
在我们详细介绍如何你自己的 Spark Streaming 程序的细节之前,让我们先来看一看一个简单的 Spark Streaming 程序的样子。比方说,我们想要计算从一个监听 TCP socket 的数据服务器接收到的文本数据(text data)中的字数。所有你需要做的就是照着下面的步骤做。
首先,我们导入了 Spark Streaming 类和部分从 StreamingContext 隐式转换到我们的环境的名称,目的是添加有用的方法到我们需要的其他类(如 DStream)。 StreamingContext 是所有流功能的主要入口点。我们创建了一个带有 2 个执行线程和间歇时间为 1 秒的本地 StreamingContext 。
import org.apache.spark._ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ //自从 Spark 1.3 开始,不再是必要的了 // 创建一个具有两个工作线程(working thread)和批次间隔为1秒的本地 StreamingContext // master 需要 2 个核,以防止饥饿情况(starvation scenario)。 val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") val ssc = new StreamingContext(conf, Seconds(1))
在使用这种背景下,我们可以创建一个代表从 TCP 源流数据的离散流(DStream),指定主机名(hostname)(例如 localhost)和端口(例如 9999)。
// 创建一个将要连接到 hostname:port 的离散流,如 localhost:9999 val lines = ssc.socketTextStream("localhost", 9999)
上一步的这个 lines 离散流(DStream)表示将要从数据服务器接收到的数据流。在这个 离散流(DStream)中的每一条记录都是一行文本(text)。接下来,我们想要通过空格字符(space characters)拆分这些数据行(lines)成单词(words)。
// 将每一行拆分成单词 val words = lines.flatMap(_.split(" ")) val words = lines.flatMap(_.split(" "))
flatMap 是一种一对多的离散流(DStream)操作,它会通过在源离散流(source DStream)中根据每个记录(record)生成多个新纪录的形式创建一个新的离散流(DStream)。在这种情况下,在这种情况下,每一行(each line)都将被拆分成多个单词(words)和代表单词离散流(words DStream)的单词流。接下来,我们想要计算这些单词。
import org.apache.spark.streaming.StreamingContext._ // 自从 Spark 1.3 不再是必要的 // 计算每一个批次中的每一个单词 val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKey(_ + _) // 在控制台打印出在这个离散流(DStream)中生成的每个 RDD 的前十个元素 // 注意 : 必需要触发 action(很多初学者会忘记触发action操作,导致报错:No output operations registered, so nothing to execute) wordCounts.print()
上一步的 words 离散流进行了进一步的映射(一对一的转变)为一个 (word, 1)对 的离散流(DStream),这个离散流然后被规约(reduce)来获得数据中每个批次(batch)的单词频率。最后,wordCounts.print() 将会打印一些每秒生成的计数。
请注意,当这些行(lines)被执行的时候, Spark Streaming 只有建立在启动时才会执行计算,在它已经开始之后,并没有真正地处理。为了在所有的转换都已经完成之后开始处理,我们在最后运行 :
ssc.start() // 启动计算 ssc.awaitTermination() // 等待计算的终止
完整的代码可以在 Spark Streaming 的例子 NetworkWordCount 中找到。
如果你已经 下载 并且 建立 了 Spark ,你可以运行下面的例子。你首先需要运行 Netcat(一个在大多数类 Unix 系统中的小工具)作为我们使用的数据服务器。
$ nc -lk 9999
然后,在另一个不同的终端,你可以运行这个例子通过执行 :
$ ./bin/run-example streaming.NetworkWordCount localhost 9999
然后,在运行在 netcat 服务器上的终端输入的任何行(lines),都将被计算,并且每一秒都显示在屏幕上,它看起来就像下面这样 :
接下来,我们了解完了简单的例子,开始阐述 Spark Streaming 的基本知识。
与 Spark 类似, Spark Streaming 可以通过 Maven 来管理依赖。为了写你自己的 Spark Streaming 程序,你必须添加以下的依赖到你的 SBT 或者 Maven 项目中。
Maven 示例 :
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>2.0.2</version> </dependency>
对于从现在没有在 Spark Streaming Core API 中的数据源获取数据,如 Kafka, Flume,Kinesis ,你必须添加相应的组件 spark-streaming-xyz_2.11 到依赖中。例如,有一些常见的依赖如下。
源 | 坐标 |
---|---|
Kafka | spark-streaming-kafka-0-8_2.11 |
Flume | spark-streaming-flume_2.11 |
Kinesis | spark-streaming-kinesis-asl_2.11 [Amazon Software License] |
想要查看一个实时更新的列表,请参阅 Maven repository 来了解支持的源文件和组件的完整列表。
为了初始化一个 Spark Streaming 程序,一个 StreamingContext 对象必须要被创建出来,它是所有的 Spark Streaming 功能的主入口点。
一个 StreamingContext 对象可以从一个 SparkConf 对象中创建出来。
import org.apache.spark._ import org.apache.spark.streaming._ val conf = new SparkConf().setAppName(appName).setMaster(master) val ssc = new StreamingContext(conf, Seconds(1))
这个 appName 参数是展示在集群用户界面上的你的应用程序的名称。 master 是一个 Spark, Mesos or YARN cluster URL ,或者一个特殊的 "local[*]" 字符串运行在本地模式下。在实践中,在一个集群上运行时,你不会想在程序中硬编码 master,而是使用 spark-submit 启动应用程序,并且接收这个参数。然而,对于本地测试和单元测试,你可以传递 "local[*]" 去运行 Spark Streaming 过程(检测本地系统中内核的个数)。请注意,这内部创建了一个 SparkContext (所有 Spark 功能的出发点),它可以像这样被访问 ssc.sparkContext。
这个批处理间隔(batch interval)必须根据您的应用程序和可用的集群资源的等待时间要求进行设置。详情请参阅 优化指南 部分。
一个 StreamingContext 对象也可以从一个现有的 SparkContext 对象中创建出。
import org.apache.spark.streaming._ val sc = ... // existing SparkContext val ssc = new StreamingContext(sc, Seconds(1))
一个 context 定义之后,你必须做以下几个方面。
要记住的要点 :
Discretized Stream(离散化流)或者 DStream(离散流)是 Spark Streaming 提供的基本抽象。它代表了一个连续的数据流,无论是从源接收到的输入数据流,还是通过变换输入流所产生的处理过的数据流。在内部,一个离散流(DStream)被表示为一系列连续的 RDDs,RDD 是 Spark 的一个不可改变的,分布式的数据集的抽象(查看 编程指南了解更多)。在一个 DStream 中的每个 RDD 包含来自一定的时间间隔的数据,如下图所示。
应用于 DStream 的任何操作转化为对于底层的 RDDs 的操作。例如,在先前的例子,转换一个行(lines)流成为单词(words)中,flatMap 操作被应用于在行离散流(lines DStream)中的每个 RDD 来生成单词离散流(words DStream)的 RDDs 。如下图所示。
这些底层的 RDD 变换由 Spark 引擎(engine)计算。 DStream 操作隐藏了大多数这些细节并为了方便起见,提供给了开发者一个更高级别的 API 。这些操作细节会在后边的章节中讨论。
输入 DStreams 是代表输入数据是从流的源数据(streaming sources)接收到的流的 DStream 。在快速简单的例子中,行(lines)是一个输入 DStream ,因为它代表着从 netcat 服务器接收到的数据的流。每个输入离散流(input DStream)(除了文件流(file stream),在后面的章节进行讨论)都会与一个接收器(Scala doc,Java doc)对象联系,这个接收器对象从一个源头接收数据并且存储到 Sparks 的内存中用于处理。
Spark Streaming 提供了两种内置的流来源(streaming source)。
在本节的后边,我们将讨论每种类别中的现有的一些来源。
需要注意的是,如果你想要在你的流处理程序中并行的接收多个数据流,你可以创建多个输入离散流(input DStreams)(在 性能调整 部分进一步讨论)。这将创建同时接收多个数据流的多个接收器(receivers)。但需要注意,一个 Spark 的 worker/executor 是一个长期运行的任务(task),因此它将占用分配给 Spark Streaming 的应用程序的所有核中的一个核(core)。因此,要记住,一个 Spark Streaming 应用需要分配足够的核(core)(或线程(threads),如果本地运行的话)来处理所接收的数据,以及来运行接收器(receiver(s))。
需要记住的要点
我们已经简单地了解过了 ssc.socketTextStream(...) 在快速开始的例子中,例子中是从通过一个 TCP socket 连接接收到的文本数据中创建了一个离散流(DStream)。除了 sockets,StreamingContext API 也提供了根据文件作为输入来源创建离散流(DStreams)的方法。
Spark Streaming 将监控 dataDirectory 目录,并处理任何在该目录下创建的文件(写在嵌套目录中的文件是不支持的)。注意 :
对于简单的文本文件,还有一个更加简单的方法 streamingContext.textFileStream(dataDirectory)。并且文件流(file streams)不需要运行一个接收器(receiver),因此,不需要分配内核(core)。
在 Python API 中 Python API fileStream 是不可用的,只有 textFileStream 是可用的。
想要了解更多的关于从 sockets 和文件(files)创建的流的细节,查看相关功能的 API 文档,Scala 中的 StreamingContext ,Java 中的 JavaStreamingContext 和 Python 中的 StreamingContext 。
Python API 在 Spark 2.0.2 中,这些来源中,Kafka,Kinesis 和 Flume 在 Python API 中都是可用的。
这一类别的来源需要使用非 Spark 库中的外部接口,它们中的其中一些还需要比较复杂的依赖关系(例如, Kafka 和 Flume)。因此,为了最小化有关的依赖关系的版本冲突的问题,这些资源本身不能创建 DStream 的功能,它是通过连接单独的类库实现创建 DStream 的功能。
需要注意的是这些高级来源在 Spark Shell 中是不可用的。因此,基于这些高级来源的应用程序不能在 shell 中被测试。如果你真的想要在 Spark shell 中使用它们,你必须下载带有它的依赖的相应的 Maven 组件的 JAR ,并且将其添加到 classpath 。
一些高级来源如下。
在 Python 中 Python API 还不支持自定义来源。
输入离散流(Input DStreams)也可以从创建自定义数据源。所有你需要做的就是实现一个用户定义(user-defined)的接收器(receiver)(查看下一章节去了解那是什么),这个接收器可以从自定义的数据源接收数据并将它推送到 Spark 。查看 自定义接收器指南(Custom Receiver Guide) 来了解更多。
可以有两种基于他们的可靠性的数据源。数据源(如 Kafka 和 Flume)允许传输的数据被确认。如果系统从这些可靠的数据来源接收数据,并且被确认(acknowledges)正确地接收数据,它可以确保数据不会因为任何类型的失败而导致数据丢失。这样就出现了 2 种接收器(receivers):
在 自定义接收器指南(Custom Receiver Guide) 中描述了关于如何去编写一个可靠的接收器的细节。
与 RDD 类似,transformation 允许从 input DStream 输入的数据做修改。DStreams 支持很多在 RDD 中可用的 transformation 算子。一些常用的算子如下所示 :
Transformation(转换) | Meaning(含义) |
---|---|
map(func) | 利用函数 func 处理原 DStream 的每个元素,返回一个新的 DStream。 |
flatMap(func) | 与 map 相似,但是每个输入项可用被映射为 0 个或者多个输出项。 |
filter(func) | 返回一个新的 DStream,它仅仅包含源 DStream 中满足函数 func 的项。 |
repartition(numPartitions) | 通过创建更多或者更少的 partition 改变这个 DStream 的并行级别(level of parallelism)。 |
union(otherStream) | 返回一个新的 DStream,它包含源 DStream 和 otherStream 的所有元素。 |
count() | 通过计算源 DStream 中每个 RDD 的元素数量,返回一个包含单元素(single-element)RDDs 的新 DStream。 |
reduce(func) | 利用函数 func 聚集源 DStream 中每个 RDD 的元素,返回一个包含单元素(single-element)RDDs 的新 DStream。函数应该是相关联的,以使计算可以并行化。 |
countByValue() | 这个算子应用于元素类型为 K 的 DStream上,返回一个(K,long)对的新 DStream,每个键的值是在原 DStream 的每个 RDD 中的频率。 |
reduceByKey(func, [numTasks]) | 当在一个由 (K,V) 对组成的 DStream 上调用这个算子,返回一个新的由 (K,V) 对组成的 DStream,每一个 key 的值均由给定的 reduce 函数聚集起来。 注意:在默认情况下,这个算子利用了 Spark 默认的并发任务数去分组。你可以用 |
join(otherStream, [numTasks]) | 当应用于两个 DStream(一个包含(K,V)对,一个包含 (K,W) 对),返回一个包含 (K, (V, W)) 对的新 DStream。 |
cogroup(otherStream, [numTasks]) | 当应用于两个 DStream(一个包含(K,V)对,一个包含 (K,W) 对),返回一个包含 (K, Seq[V], Seq[W]) 的元组。 |
transform(func) | 通过对源 DStream 的每个 RDD 应用 RDD-to-RDD 函数,创建一个新的 DStream。这个可以在 DStream 中的任何 RDD 操作中使用。 |
updateStateByKey(func) | 利用给定的函数更新 DStream 的状态,返回一个新 "state" 的 DStream。 |
最后两 个transformation 算子需要重点介绍一下 :
updateStateByKey 操作允许不断用新信息更新它的同时保持任意状态。你需要通过两步来使用它 :
在每个 batch 中,Spark 会使用更新状态函数为所有的关键字更新状态,不管在 batch 中是否含有新的数据。如果这个更新函数返回一个 none,这个键值对也会被消除。
让我们举个例子说明。在例子中,你想保持一个文本数据流中每个单词的运行次数,运行次数用一个 state 表示,它的类型是整数 :
def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = { val newCount = ... // add the new values with the previous running count to get the new count Some(newCount) }
这里是一个应用于包含 words(单词)的 DStream 上(也就是说,在 更前面的示例 中,该 pairs DStream 包含了 (word, 1) pair)。
val runningCounts = pairs.updateStateByKey[Int](updateFunction _)
更新函数将会被每个单词调用,newValues
拥有一系列的 1(从 (word, 1) 组对应),runningCount 拥有之前的次数。要看完整的代码,请看 StatefulNetworkWordCount.scala 示例。
注意,使用 updateStateByKey
需要配置的检查点的目录,这里是更详细关于讨论 CheckPointing 的部分。
transform
操作(以及它的变化形式如 transformWith
)允许在 DStream 运行任何 RDD-to-RDD 函数。它能够被用来应用任何没在 DStream API 中提供的 RDD 操作(It can be used to apply any RDD operation that is not exposed in the DStream API)。 例如,连接数据流中的每个批(batch)和另外一个数据集的功能并没有在 DStream API 中提供,然而你可以简单的利用 transform
方法做到。如果你想通过连接带有预先计算的垃圾邮件信息的输入数据流 来清理实时数据,然后过了它们,你可以按如下方法来做 :
val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information val cleanedDStream = wordCounts.transform(rdd => { rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning ... })
请注意,每批间隔提供的函数被调用。 这允许你做 时变抽样操作,即抽样操作,数量的分区,广播变量,批次之间等可以改变。
Spark Streaming 也支持窗口计算,它允许你在一个滑动窗口数据上应用 transformation 算子。下图阐明了这个滑动窗口。
如上图显示,窗口在源 DStream 上滑动,合并和操作落入窗内的源 RDDs,产生窗口化的 DStream 的 RDDs。在这个具体的例子中,程序在三个时间单元的数据上进行窗口操作,并且每两个时间单元滑动一次。 这说明,任何一个窗口操作都需要指定两个参数 :
这两个参数必须是源 DStream 上 batch interval(批时间间隔)的倍数。
下面举例说明窗口操作。例如,你想扩展前面的例子用来计算过去 30 秒的词频,间隔时间是 10 秒。为了达到这个目的,我们必须在过去 30 秒的 pairs
DStream 上应用 reduceByKey
操作。用方法 reduceByKeyAndWindow
实现。
// Reduce last 30 seconds of data, every 10 seconds val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))
一些常用的窗口操作如下所示,这些操作都需要用到上文提到的两个参数 - 窗口长度和滑动的时间间隔。
转换 | 含义 |
---|---|
window ( windowLength , slideInterval ) | 返回一个新的 DStream,基于窗口的批 DStream 来源。 |
countByWindow ( windowLength , slideInterval ) | 返回一个滑动窗口中的元素计算流。 |
reduceByWindow ( func, windowLength , slideInterval ) | 返回一个新创建的单个元素流,通过聚合元素流了 滑动时间间隔使用 函数 。 函数应该关联和交换,以便它可以计算 正确地并行执行。 |
reduceByKeyAndWindow ( func, windowLength , slideInterval , ( numTasks ]) | 当呼吁DStream(K、V)对,返回一个新的DStream(K、V) 对每个键的值在哪里聚合使用给定的reduce函数 函数 在一个滑动窗口批次。 注意: 默认情况下,它使用引发的默认数量 并行任务(2为本地模式,在集群模式是由配置数量 财产 spark.default.parallelism
分组)。 你可以通过一个可选的 numTasks
参数设置不同数量的任务。 |
reduceByKeyAndWindow ( func, invFunc , windowLength , slideInterval ,( numTasks ]) | 上面的reduceByKeyAndWindow()的一个更有效的版本,其中每个窗口的reduce值是使用上一个窗口的reduce值递增计算的。 这是通过减少进入滑动窗口的新数据和“反向减少”离开窗口的旧数据来完成的。 一个例子是在窗口滑动时“添加”和“减去”键的计数。 然而,它仅适用于“可逆缩减函数”,即,具有对应的“逆缩减”函数(作为参数invFunc)的那些缩减函数。 像reduceByKeyAndWindow中一样,reduce任务的数量可通过可选参数进行配置。 请注意,必须启用检查点设置才能使用此操作。 |
countByValueAndWindow ( windowLength , slideInterval ,[ numTasks ]) | 当呼吁DStream(K、V)对,返回一个新的DStream(K,长)对的 每个键的值是它的频率在一个滑动窗口。 就像在 reduceByKeyAndWindow
,通过一个减少任务的数量是可配置的 可选参数。 |
最后,Spark streaming 可以很容易与其它的数据源进行 join。
stream 可以很容易与其他 stream 进行 join
val stream1: DStream[String, String] = ... val stream2: DStream[String, String] = ... val joinedStream = stream1.join(stream2)
之类,在每一个批间隔中,生成的抽样 stream1
将与生成的抽样 stream2 进行 join 操作。
也可以做 leftOuterJoin
,
rightOuterJoin
,
fullOuterJoin
。此外,它通常是非常有用的做连接的窗口 (window) stream。 这是非常容易的。
val windowedStream1 = stream1.window(Seconds(20)) val windowedStream2 = stream2.window(Minutes(1)) val joinedStream = windowedStream1.join(windowedStream2)
这已经被证明在早些时候解释 DStream.transform
操作。 这是另一个例子,加入一个有窗口的流数据集。
val dataset: RDD[String, String] = ... val windowedStream = stream.window(Seconds(20))... val joinedStream = windowedStream.transform { rdd => rdd.join(dataset) }
事实上,你也可以动态地改变你想加入的数据集。 提供的函数变换
评估每批间隔,因此将使用当前数据集作为
参考点。
DStream 转换的完整列表可以在 API 文档。Scala API 看到 DStream 和 PairDStreamFunctions 。Java API 明白了 JavaDStream 和 JavaPairDStream 。Python API 看到 DStream。
输出操作允许 DStream 的操作推到如数据库、文件系统等外部系统中。因为输出操作实际上是允许外部系统消费转换后的数据,它们触发的实际操作是 DStream 转换。目前,定义了下面几种输出操作 :
输出操作 | 含义 |
---|---|
print() | 在 DStream 的每个批数据中打印前 10 条元素,这个操作在开发和调试中都非常有用。在 Python API 中调用 pprint() 。 |
saveAsObjectFiles(prefix, [suffix]) | 保存 DStream 的内容为一个序列化的文件 SequenceFile 。每一个批间隔的文件的文件名基于 prefix 和 suffix 生成。"prefix-TIME_IN_MS[.suffix]",在 Python API 中不可用。 |
saveAsTextFiles(prefix, [suffix]) | 保存 DStream 的内容为一个文本文件。每一个批间隔的文件的文件名基于 prefix 和 suffix 生成。"prefix-TIME_IN_MS[.suffix]" |
saveAsHadoopFiles(prefix, [suffix]) | 保存 DStream 的内容为一个 hadoop 文件。每一个批间隔的文件的文件名基于 prefix 和 suffix 生成。"prefix-TIME_IN_MS[.suffix]",在 Python API 中不可用。 |
foreachRDD(func) | 在从流中生成的每个 RDD 上应用函数 需要注意的是, |
dstream.foreachRDD 是一个强大的原语,发送数据到外部系统中。然而,明白怎样正确地、有效地用这个原语是非常重要的。下面几点介绍了如何避免一般错误。
经常写数据到外部系统需要建一个连接对象(例如到远程服务器的 TCP 连接),用它发送数据到远程系统。为了达到这个目的,开发人员可能不经意的在 Spark 驱动中创建一个连接对象,但是在 Spark worker 中 尝试调用这个连接对象保存记录到 RDD 中,如下 :
dstream.foreachRDD { rdd => rdd.foreach { record => val connection = createNewConnection() connection.send(record) connection.close() } }
这是不正确的,因为这需要先序列化连接对象,然后将它从 driver 发送到 worker 中。这样的连接对象在机器之间不能传送。它可能表现为序列化错误(连接对象不可序列化)或者初始化错误(连接对象应该在 worker 中初始化)等等。正确的解决办法是在 worker 中创建连接对象。
然而,这会造成另外一个常见的错误 - 为每一个记录创建了一个连接对象。例如 :
dstream.foreachRDD { rdd => rdd.foreachPartition { partitionOfRecords => // ConnectionPool is a static, lazily initialized pool of connections val connection = ConnectionPool.getConnection() partitionOfRecords.foreach(record => connection.send(record)) ConnectionPool.returnConnection(connection) // return to the pool for future reuse } }
需要注意的是,池中的连接对象应该根据需要延迟创建,并且在空闲一段时间后自动超时。这样就获取了最有效的方式发生数据到外部系统。
其它需要注意的地方 :
dstream.foreachRDD()
,但是没有任何 RDD action 操作在 dstream.foreachRDD()
里面,那么什么也不会执行。系统仅仅会接收输入,然后丢弃它们。你可以很容易地使用 DataFrames 和 SQL Streaming 操作数据。 需要使用 SparkContext 或者正在使用的 StreamingContext 创建一个 SparkSession。这样做的目的就是为了使得驱动程序可以在失败之后进行重启。 使用懒加载模式创建单例的 SparkSession 对象。下面的示例所示。在原先的 单词统计 程序的基础上进行修改,使用 DataFrames 和 SQL 生成单词统计。 每个 RDD 转换为 DataFrame,注册为临时表,然后使用 SQL 查询。
/** 流程序中的DataFrame操作 */ val words: DStream[String] = ... words.foreachRDD { rdd => // 获取单例的SQLContext val sqlContext = SQLContext.getOrCreate(rdd.sparkContext) import sqlContext.implicits._ // 将RDD [String]转换为DataFrame val wordsDataFrame = rdd.toDF("word") // 注册临时表 wordsDataFrame.registerTempTable("words") // 在DataFrame上使用SQL进行字计数并打印它 val wordCountsDataFrame = sqlContext.sql("select word, count(*) as total from words group by word") wordCountsDataFrame.show() }
这里是完整 源代码 。
你可以在使用其他线程读取的流数据上进行 SQL 查询(就是说,可以异步运行 StreamingContext)。 只要确保 StreamingContext 可以缓存一定量的数据来满足查询的需求。 否则 StreamingContext,检测不到任何异步 SQL 查询,在完成查询之前将删除旧的数据。 例如,如果您想查询最后一批,但您的查询可以运行需要 5 分钟,然后调用 streamingContext.remember(Minutes(5))(
在 Scala 中,或其他语言)。
请看 DataFrames,Datasets 和 SQL 指南学习更多关于 DataFrames 的知识。
你还可以轻松地使用所提供的机器学习算法 MLlib 。 首先这些streaming,(如机器学习算法。 Streaming 线性回归,StreamingKMeans 等)可以同时学习 Streaming 数据的应用模型。 除了这些,对于一个大得多的机器学习算法,可以学习模型离线(即使用历史数据),然后应用该模型在线流媒体数据。访问 MLlib 指南更多细节。
类似于抽样,DStreams 还允许开发人员持久化 streama €™数据在内存中。 也就是说,在 DStream 上使用 persist() 方法,它会自动把每个抽样持续化到内存中 。 这个非常有用,如果数据多次 DStream(如同样的数据进行多次操作)。 像 reduceByWindow、reduceByKeyAndWindow 和 updateStateByKey 这些都隐式的开启了 “persist()”。 因此,DStreams 生成的窗口操作会自动保存在内存中,如果没有开发人员调用 persist()。
对于通过网络接收数据的输入流(如 Kafka、Flume、Sockets 等),默认的持久性级别被设置为复制两个节点的数据容错。
注意,与抽样不同,默认的序列化数据持久性 DStreams。更多细节在 性能优化 部分查阅。 可以在 Spark 编程指南 中查看不同持久化级别的详情。
一个 Streaming 应用程序必须全天候运行,所有必须能够解决应用程序逻辑无关的故障(如系统错误,JVM 崩溃等)。为了使这成为可能,Spark Streaming 需要 checkpoint 足够的信息到容错存储系统中, 以使系统从故障中恢复。
元数据 checkpoint 主要是为了从 driver 故障中恢复数据。如果 transformation 操作被用到了,数据 checkpoint 即使在简单的操作中都是必须的。
应用程序在下面两种情况下必须开启 checkpoint :
注意,没有前述的有状态的 transformation 的简单流应用程序在运行时可以不开启 checkpoint。在这种情况下,从 driver 故障的恢复将是部分恢复(接收到了但是还没有处理的数据将会丢失)。 这通常是可以接受的,许多运行的 Spark Streaming 应用程序都是这种方式。
在容错、可靠的文件系统(HDFS、s3 等)中设置一个目录用于保存 checkpoint 信息。这可以通过 streamingContext.checkpoint(checkpointDirectory) 方法来做。这运行你用之前介绍的 有状态 transformation。另外,如果你想从 driver 故障中恢复,你应该以下面的方式重写你的Streaming 应用程序。
这种配置的方式很简单,通过使用 StreamingContext.getOrCreate 即可,如下所示 :
// Function to create and setup a new StreamingContext def functionToCreateContext(): StreamingContext = { val ssc = new StreamingContext(...) // new context val lines = ssc.socketTextStream(...) // create DStreams ... ssc.checkpoint(checkpointDirectory) // set checkpoint directory ssc } // Get StreamingContext from checkpoint data or create a new one val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _) // Do additional setup on context that needs to be done, // irrespective of whether it is being started or restarted context. ... // Start the context context.start() context.awaitTermination()
如果 checkpointDirectory 存在,上下文将会利用 checkpoint 数据重新创建。如果这个目录不存在,将会调用 functionToCreateContext 函数创建一个新的上下文,建立 DStream。 请看RecoverableNetworkWordCount 例子。
除了使用 getOrCreate,开发者必须保证在故障发生时,driver 处理自动重启。只能通过部署运行应用程序的基础设施来达到该目的。在部署章节将有更进一步的讨论。
注意,RDD 的 checkpointing 有存储成本。这会导致批数据(包含的 RDD 被 checkpoint)的处理时间增加。因此,需要小心的设置批处理的时间间隔。在最小的批容量(包含 1 秒的数据)情况下,checkpoint 每批数据会显著的减少 操作的吞吐量。相反,checkpointing 太少会导致谱系以及任务大小增大,这会产生有害的影响。因为有状态的 transformation 需要 RDD checkpoint。默认的间隔时间是批间隔时间的倍数,最少 10 秒。它可以通过 dstream.checkpoint 来设置。典型的情况下,设置 checkpoint 间隔是 DStream 的滑动间隔的 5-10 大小是一个好的尝试。
累加器 和 广播变量 是不能从 Spark Streaming 中恢复。 如果要使 checkpoint 可用,累加器 或 广播变量,需要使用 懒加载的方式实例化这两种变量以至于他们能够可以在驱动程序失败重启之后进行再次实例化。下面的示例所示。
object WordBlacklist { @volatile private var instance: Broadcast[Seq[String]] = null def getInstance(sc: SparkContext): Broadcast[Seq[String]] = { if (instance == null) { synchronized { if (instance == null) { val wordBlacklist = Seq("a", "b", "c") instance = sc.broadcast(wordBlacklist) } } } instance } } object DroppedWordsCounter { @volatile private var instance: Accumulator[Long] = null def getInstance(sc: SparkContext): Accumulator[Long] = { if (instance == null) { synchronized { if (instance == null) { instance = sc.accumulator(0L, "WordsInBlacklistCounter") } } } instance } } wordCounts.foreachRDD((rdd: RDD[(String, Int)], time: Time) => { // Get or register the blacklist Broadcast val blacklist = WordBlacklist.getInstance(rdd.sparkContext) // Get or register the droppedWordsCounter Accumulator val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext) // Use blacklist to drop words and use droppedWordsCounter to count them val counts = rdd.filter { case (word, count) => if (blacklist.value.contains(word)) { droppedWordsCounter += count false } else { true } }.collect() val output = "Counts at time " + time + " " + counts })
完整代码(源代码)。
本节讨论部署 Spark Streaming 应用程序的步骤。
要运行一个 Spark Streaming 应用,你需要有以下几点。
如果运行的 Spark Streaming应用 程序需要升级,有两种可能的方法。
除了 Spark 自己的 监控功能 之外,针对 Spark Streaming 它也有一些其他的功能。当使用 StreamingContext 时,Spark Web UI 显示了一个额外的 Streaming 标签,它显示了有关正在运行的 Receivers(接收器)(是否 Receivers 处于活动状态,记录接受数量,接收器误差,等等)和已完成的 Batches(批处理时间,查询延迟,等等)的统计信息。这可以用于监控 Spark 应用程序的进度。
在 Web UI 中以下两个指标特别重要 :
如果批处理时间始终大于批的间隔 并且(或者)队列延迟不断增加,那么说明系统不能尽可能快的处理 Batches,它们(Batches)正在被生成并且落后(处理),在这种情况下,可以考虑 降低 批处理时间。
Spark Streaming 应用程序处理的进度,也可以使用 StreamingListener 接口监控,它允许你获取 Receiver 的状态以及批处理时间。注意,这是一个开发者 API 并且在将来它很可能被改进(即上报更多的信息)。
为了在集群中获得 Spark Streaming 应用程序的最佳性能需要一些优化。这部分解释了一部分能够调整用来提升您应用程序性能的参数和配置。在一个较高的水平上,您需要考虑两件事情 :
有许多优化可以在 Spark 中来完成,使每批数据的处理时间最小化。这些都在 优化指南 中详细讨论。本章介绍一些最重要的问题。
通过网络(像 Kafka,Flume,socket,等等)接受数据需要数据反序列化然后存在 Spark 中。如果数据接收成为了系统中的瓶颈,则需要考虑并行的数据接收。注意,每个 Input DStream(输入流)创建一个接受单个数据流的单独的 Receiver(接收器)(运行在一个 Worker 机器上)。接受多个数据流因此可以通过创建多个 Input DStreams 以及配置他们去从数据源(S)的不同分区接收数据流来实现。例如,一个单一的 Kafka Input DStream 接收两个 Topic(主题) 的数据能够被拆分成两个 Kafka 输入流,每个仅接收一个 Topic。这将运行两个 Receiver(接收器),使得数据可以被并行接受,因此将提高整体吞吐量。这些多个 DStream 可以合并在一起以创建一个单独的 DStream。然后被用于在一个单独的 Input DStream 上的 Transformations(转换)可以在统一的流上被应用。按照以下步骤进行 :
val numStreams = 5 val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) } val unifiedStream = streamingContext.union(kafkaStreams) unifiedStream.print()
应当考虑的另一个参数是 Receiver(接收器)的阻塞间隔,它通 过配置参数 spark.streaming.blockInterval 来决定。对于大部分 Receiver(接收器) 来说,在存储到 Spark 的 Memory(内存)之前时接收的数据被合并成数据 Block(块)。每批 Block(块)的数量确定了将用于处理在一个类似 Map Transformation(转换)的接收数据的任务数量。每个 Receiver(接收器)的每个 Batch 的任务数量大约为(Batch 间隔 / Block 间隔)。例如,200ms 的 Block(块)间隔和每 2s 的 Batch 将创建 10 个任务。如果任务的数量过低(即,小于每台机器的 Core(CPU)数量),那么效率会很低,因为所有可用的 Core(CPU)将不会用来处理数据。以增加给定的 Batch 间隔的任务数量,降低该 Block(间隔)。然而,Block(块)间隔推荐的最低值约为 50ms,低于该推荐值的任务的运行开销可能是一个问题。
与多个 Input Streams / Receivers 接受数据的另一种方法是明确的 Repartition (重分区)输入的数据流(使用 inputStream.repartition(<partition(分区)的数量>))。这样在进一步处理之前就分发接收数据的 Batch 到群集中指定数量的机器上去。
集群资源的利用率可能会很低,如果在任何计算阶段中并行任务数量的不是很多的话。例如,对于像 reduceByKey 和 reduceByKeyAndWindow 这样的分布式 Reduce 操作来说,默认的并行任务数量由 spark.default.parallelism 配置属性 控制。您可以传递并行的的参数(请看 PairDStreamFunctions 文档),或者设置 spark.default.parallelism 配置属性 来改变默认值。
可以通过调整序列化的格式来减少数据序列化的开销。在流式传输的情况下,有两种数据类型会被序列化。
在这两种情况下,使用 Kryo 序列化能够减少 CPU 和 内存的开销。更多细节请看 Spark 优化指南。对于 Kyro 来说,考虑注册自定义的 Class,并且禁用 Object(对象)引用跟踪(在 配置指南 中看 Kyro 相关的配置)。
在特定的情况下,需要用于保留 Streaming 应用程序的数据量不是很大,这样也许是可行的,来保存反序列化的数据(两种类型)不用引起过度的 GC 开销。例如,如果您使用 Batch 的间隔有几秒钟并且没有 Window(窗口)操作,然后你可以通过显式地设置存储相应的级别来尝试禁用序列化保存数据。这将减少由于序列化的 CPU 开销,可能不需要太多的 GC 开销就能提升性能。
如果每秒任务启动的数据很高(比如,每秒 50 个任务或者更多),那么发送 Task(任务) 到 Slave 的负载可能很大,将很难实现 亚秒级 延迟。可以通过下面的改变来降低负载 :
这些改变也许能减少批处理的时间(100s of milliseconds),因此亚秒级的 Batch 大小是可行的。
对于群集上运行的 Spark Streaming 应用程序来说应该是稳定的,系统应该尽可能快的处理接收到的数据。换句话说,批数据在生成后应该尽快被处理。对于应用程序来说无论这个是不是真的都可以通过在 Streaming Web UI 找到 监控 的处理时间,批处理时间应该小于批间隔。
取决于流计算的性质,所用的批间隔可能在通过群集中一组固定资源上的应用程序持续的数据速率有显著的影响。例如,让我们考虑下更早的 WordCountNetwork 例子。对于一个特定的数据速率。系统可能能够保持每 2 秒报告一次单词统计(即,批间隔是 2 秒),而不是每 500 毫秒一次。因此批间隔需要被设置,使得线上的预期数据速率可持续。
要为您的应用程序找出一个合理的批大小是去使用一个保守的批间隔(例如,5~10 秒)和一个较低的数据速率来测试它。为了验证系统是否能够保持数据速率,您可以通过每个被处理的 Batch 来检查端到端的延迟情况(或者在 Spark Driver log4j 日志中看 “Total delay”,或者使用 StreamingListener 接口)。如果延迟与批大小相比较处于一个稳定的状态,那么系统是稳定的。否则,如果延迟继续增加,它意味着系统不能保持下去,因此它是不稳定的。一旦你有了一个稳定的配置,你可以去试着增加数据速率 和/或者 降低批大小。注意一个短暂的延迟增加是由于临时的数据速率增加可能会变好,只要延迟降低到一个比较低的值。(即,小于批大小)。
调优 Spark 应用程序的内存使用情况和 GC 行为已经在 优化指南 中详细讨论了。强烈推荐您阅读它。在这一章,我们讨论在 Spark Streaming 应用程序 Context 中指定的一些优化参数。
Spark Streaming 应用程序在群集中需要的 Memory(内存) 数量取决于使用的 Transformations(转换)上的类型行为。例如,如果您想要在最近 10 分钟的时候上使用 Window(窗口)函数,那么您的群集应有有足够的 Memory 以保存 10 分钟值的数据在内存中。或者您想要在大量的 keys 中使用 updateStateByKey,那么所需要的内存将会更高。与之相反,如果您想要去做一个简单的 map-filter-store 操作,那么所需的内存将会更少。
在一般情况下,从数据通过 Receiver(接收器)被接收时起使用 StorageLevel.MEMORY_AND_DISK_SER_2 存储,数据在内存中放不下时将拆分到硬盘中去。这样也许降低了 Streaming 应用程序的性能,因此建议为您的 Streaming 应用程序提供足够的内存。它最好去试一试,并且在小范围看看 Memory(内存)使用情况然后估算相应的值。
内存调优的另一个方面是垃圾收集。Streaming 应用程序需要低延迟,JVM 垃圾回收造成的大量暂停是不可取的。
这里有一些能够帮助你调整内存使用情况以及 GC 开销的参数 :
应该记住的要点
在这部分,我们将讨论 Spark Streaming 应用程序在发生故障时的行为。
为了理解 Spark Streaming 提供的语义,让我们记住 Spark RDDs 最基本的容错语义。
Spark 在数据上的操作像 HDFS 或者 S3 这样容错的文件系统一样。因此,所有从容错的数据中产生的 RDDs 也是容错的。然而,这种情况在 Spark Streaming 中不适用,因为在大多数情况下数据通过网络被接收(除非使用 fileStream)。为了对所有产生的 RDDs 实现相同的容错语义属性,接收的数据被复制到群集中 Worker 节点的多个 Spark Executor 之间(默认复制因子是 2)。这将会造成需要在发生故障时去恢复两种文件系统的数据类型 :
此外,我们应该关注两种类型的故障 :
与这个基础知识一起,让我们理解 Spark Streaming 的容错语义。
Streaming 处理系统经常会捕获系统异常并记录执行次数以保障系统容错,其中在这三种条件下可以保障服务,等等。
在任何流处理系统,从广义上讲,处理数据有三个步骤。
接收数据 : 接受数据或者从其他数据源接受数据。
转换数据 : 所接收的数据使用 DStream 和 RDD 变换转化。
输出数据 : 最后变换的数据被推送到外部系统,如文件系统,数据库,DashBoard 等。
如果一个 Stream 应用程序来实现端到端的数据传输,则每个环节都需要一次性完整保障。也就是说,每个记录都必须被接收正好一次,恰好转化一次,被推到下游系统一次。让我们了解在 Spark Streaming 的情况下这些步骤的语义。
接收数据 : 不同的输入源提供不同的容错。详细过程在下一小节中讨论。
转换数据 : 已经接收将被处理一次的所有数据,这要依靠于 RDDs 提供保证。即使有故障,只要接收到的输入数据是可读的,最终通过 RDDs 的转化将始终具有相同的内容。
输出数据 : 定义了数据至少一次输出,因为它依赖于输出数据操作类型(是否支持转换)和定义的下游系统(是否支持事务)。但是,用户可以实现自己的传输机制,以实现只要一次定义。会在后面小节里有更详细的讨论。
不同的输入源提供不同的保障,从至少一次到正好一次。阅读更多的细节。
如果所有的输入数据已经存在如 HDFS 的容错文件系统,Spark Streaming 总是可以从任何故障中恢复所有数据。这种定义,在一次处理后就能恢复。
对于基于 receiver 的输入源,容错的语义既依赖于故障的情形也依赖于 receiver 的类型。正如之前讨论的,有两种类型的 receiver :
选择哪种类型的 receiver 依赖于这些语义。如果一个 worker 节点出现故障,Reliable Receiver 不会丢失数据,Unreliable Receiver 会丢失接收了但是没有复制的数据。如果 driver 节点出现故障,除了以上情况下的数据丢失,所有过去接收并复制到内存中的数据都会丢失,这会影响有状态 transformation 的结果。
为了避免丢失过去接收的数据,Spark 1.2 引入了一个实验性的特征(预写日志机制)write ahead logs,它保存接收的数据到容错存储系统中。有了 write ahead logs 和 Reliable Receiver,我们可以做到零数据丢失以及 exactly-once 语义。
下表总结了根据故障的语义 :
Deployment Scenario | Worker Failure | Driver Failure |
---|---|---|
Spark 1.1 或者更早, 没有 write ahead log 的 Spark 1.2 | 在 Unreliable Receiver 情况下缓冲数据丢失;在 Reliable Receiver 和文件的情况下,零数据丢失 | 在 Unreliable Receiver 情况下缓冲数据丢失;在所有 receiver 情况下,过去的数据丢失;在文件的情况下,零数据丢失 |
带有 write ahead log 的 Spark 1.2 | 在 Reliable Receiver 和文件的情况下,零数据丢失 | 在 Reliable Receiver 和文件的情况下,零数据丢失 |
在 Spark 1.3 中,我们引入了一个新的 kafka Direct 的 API,它可以保证所有 kafka 数据由 spark stream 收到一次。如果实现仅一次输出操作,就可以实现保证终端到终端的一次。这种方法(版本 Spark2.0.2)中进一步讨论 kafka集成指南。
输出操作(例如 foreachRDD
)至少被定义一次,即变换后的数据在一次人工故障的情况下可能会不止一次写入外部实体。虽然可以通过操作 saveAs***Files
保存文件到系统上(具有相同的数据将简单地被覆盖),额外的尝试可能是必要的,以实现一次准确的语义。有两种方法。
幂等更新 : 多次尝试写相同的数据。例如,saveAs***Files
总是写入相同数据生成的文件。
事务更新 : 所有更新事务作出这样的更新是恰好遵循原子性。要做到这一点,如下 2 种方式。
foreachRDD
)和 RDD 的分区索引。给这个应用定义唯一标识符标识 blob 数据。更新外部系统与当前事务(即原子性),如果已经被标识过得数据应用已经存在,那么就跳过更新。
dstream.foreachRDD { (rdd, time) => rdd.foreachPartition { partitionIterator => val partitionId = TaskContext.get.partitionId() val uniqueId = generateUniqueId(time.milliseconds, partitionId) // use this uniqueId to transactionally commit the data in partitionIterator } }
在 Spark 0.9.1 和 Spark 1.0 之间,它们有一些 API 的改变以确保未来 API 的稳定性。这一章阐述需要迁移您已经在的代码到 1.0 版本的步骤。
Input DSteams(输入流): 所有创建 Input Steam(输入流)的操作(例如,StreamingContext.socketStream,FlumeUtils.createStream,等等。)现在返回 InputDStream / ReceiverInputDStream (而不是 DStream) 的 Scala 版本,和 JavaInputDStream / JavaPairInputDStream /JavaReceiverInputDStream / JavaPairReceiverInputDStream(而不是 JavaDStream) 的 Java 版。这将确保特定输入流的功能可以添加到这些类中在未来不需要破坏二进制兼容性。注意您已经存在的 Spark Streaming 应用程序应该不需要任何改变(因为那些类是 DStream / JavaDStream 的子类),但可能需要使用 Spark 1.0 重新编译。
Custom Network Receivers(自定义网络接收器): 从发布 Spark Streaming 以来,自定义网络接收器能够在 Scala 使用 NetworkReceiver 类自定义,该 API 在错误处理和报告中被限制了,并且不能够被 Java 使用。从 Spark 1.0 开始,自个类被 Receiver 替换了并且带来了下面的好处。
为了迁移你已存在的自定义 Receiver,从更早的 NetworkReceiver 到新的 Receiver。您需要去做如下事情。
Actor-base Receivers(基于 Actor 的接收器): 这个基于 Actor 的接收器 API 已经被移到 DStream Akka 中去了。更多详细信息请参考该工程。