内存优化

调优 Spark 应用程序的内存使用情况和 GC 行为已经在 优化指南 中详细讨论了。强烈推荐您阅读它。在这一章,我们讨论在 Spark Streaming 应用程序 Context 中指定的一些优化参数。

Spark Streaming 应用程序在群集中需要的 Memory(内存) 数量取决于使用的 Transformations(转换)上的类型行为。例如,如果您想要在最近 10 分钟的时候上使用 Window(窗口)函数,那么您的群集应有有足够的 Memory 以保存 10 分钟值的数据在内存中。或者您想要在大量的 keys 中使用 updateStateByKey,那么所需要的内存将会更高。与之相反,如果您想要去做一个简单的 map-filter-store 操作,那么所需的内存将会更少。

在一般情况下,从数据通过 Receiver(接收器)被接收时起使用 StorageLevel.MEMORY_AND_DISK_SER_2 存储,数据在内存中放不下时将拆分到硬盘中去。这样也许降低了 Streaming 应用程序的性能,因此建议为您的 Streaming 应用程序提供足够的内存。它最好去试一试,并且在小范围看看 Memory(内存)使用情况然后估算相应的值。

内存调优的另一个方面是垃圾收集。Streaming 应用程序需要低延迟,JVM 垃圾回收造成的大量暂停是不可取的。

这里有一些能够帮助你调整内存使用情况以及 GC 开销的参数 : 

  • Persistence Level of DStreamsDStream 的持久化级别): 像前面所提到的 数据序列化 部分,输入的数据和 RDDs 默认持久化为序列化的字节。和反序列化持久性相比,这样减少了内存使用和 GC 开销。启用 Kryo 序列化进一步减少了序列化大小和内存使用。进一步减少内存使用可以用压缩实现(请看 Spark 配置 spark.rdd.compress),付出的是 CPU 时间。
  • Clearing old data(清除旧数据): 默认情况下,所有的输入数据和通过 DStream transformation(转换)产生的持久化 RDDs 将被自动的清除。Spark Streaming 决定何时清除基于使用 transformation(转换)的数据。例如,如果你使用一个 10 分钟的 Window(窗口)操作,那么 Spark Streaming 将保存最近 10 分钟的数据,并主动扔掉旧的数据。数据也能够通过设置 streamingContext.remeber 保持更久(例如,交互式查询旧数据)。
  • CMS Garbage CollectorCMS 垃圾回收器): 使用并发的 mark-sweep GC 是强烈推荐的用于保持 GC 相关的暂停更低。即使知道并发的 GC 降低了整个系统处理的吞吐量。仍然建议使用,以获得更一致的批处理时间。确定你在 Driver(在 spark-submit 中使用 --driver-java-options)和 Executor(使用 Spark 配置 spark.executor.extraJavaOptions)上设置的 CMS GC
  • Other tips(其它建议): 为了进一步降低 GC 开销,这里有些更多的建议可以尝试。
    • 持久化 RDDs 使用 OFF_HEAP 存储界别。更多详情请看 Spark 编程指南
    • 使用更多的 Executor 和更小的 heap size(堆大小)。这将在每个 JVM heap 内降低 GC 压力。

应该记住的要点

  • 一个 DStream 和一个单独的 Receiver(接收器)关联。为了达到并行的读取多个 Receiver(接收器)。例如,多个 DStreams 需要被创建。一个 Receiver 运行在一个 Executor 内。它占有一个 CoreCPU)。确保在 Receiver Slot 被预定后有足够的 Core 。例如,spark.cores.max 应该考虑 Receiver SlotReceiver 以循环的方式被分配到 Executor
  • 当数据从一个 Stream 源被接收时,Receiver(接收器) 创建了数据块。每个块间隔的毫秒内产生一个新的数据块。N 个数据块在批间隔(N = 批间隔 / 块间隔)的时候被创建。这些 Block(块)通过当前的 ExecutorBlockManager 发布到其它 ExecutorBlockManager。在那之后,运行在 Driver 上的  Network Input Tracker 获取 Block 位置用于进一步处理。
  • 一个 RDD 创建在 Driver 上,因为 Block 创建在 batchInterval(批间隔)期间。Block 在 batchInterval 划分成 RDD 时生成。每个分区是 Spark 中的一个任务。blockInterval == batchInterval 将意味着那是一个单独的分区被创建并且可能它在本地被处理过了。
  • Block 上的 Map 任务在 Executor 中被处理(一个接收 Block,另一个 Block 被复制)无论块的间隔,除非非本地调度死亡。有更大的块间隔意味着更大的块。在本地节点上一个高的值 spark.locality.wait 增加处理 Blcok 的机会。需要发现一个平衡在这两个参数来确保更大的块被本都处理之间。
  • 而不是依靠 batchIntervalblockInterval,你可以通过调用 inputDstream.repartition(n) 来定义分区的数量。这样会 reshuffles RDD 中的数据随机来创建 N 个分区。是的,为了更好的并行,虽然增加了 shuffle 的成本。一个 RDD 的处理通过 DriverJobScheduler 作为一个 Job 来调度。在给定的时间点仅有一个 Job 是活跃的。所以,如果一个 Job 正在执行那么其它的 Job 会排队。
  • 如果你有两个 DStream,那将有两个 RDDs 形成并且将有两个 Job 被创建,他们将被一个一个的调度。为了避免这个,你可以合并两个 DStream。这将确保两个 RDDDStream 形成一个单独的 unionRDD。这个 unionRDD 被作为一个单独的 Job 考虑。然而分区的 RDDs 不受影响。
  • 如果批处理时间超过了批间隔,那么显然 Receiver(接收器)的内存将开始填满,最走将抛出异常(最可能的是 BlockNotFoundException)。当前没有方法去暂停 Receiver。使用 SparkConf 配置  spark.streaming.receiver.maxRateReceiver(接收器)的速率可以被限制。