大多数 Saprk 计算本质是内存,Spark 程序可以碰到集群中的 CPU、网络带宽或存储 资源上的瓶颈。大多数情况下,如果数据加载到内存中,网络带宽就是瓶颈。但有时候,还需要做一些调整,比如序列化形式存储 RDD (storing RDDs in serialized),以减少内存使用情况。还涉及两个主要方面 : 原始数据的系列化,良好的网络性能,以减少内存使用情况。下面做几个主要的讨论。
序列化在任何分布式应用程序提高性能方面都是至关重要的思路。格式对象序列化对降低消耗大量的字节数,将大大减少计算量。通常情况下,这将是 Spark 调整以优化应用程序的最先考虑的。
Spark 开发目的就在方便(可以让您在操作中与任何的 Java Type 类型一起工作)和性能之间最大化。它提供了两个序列化库 :
你可以通过设置 conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") 初始化 SparkConf 来转化成 Kryo 序列化。此设置配置串行器不仅用在工作节点之间进行数据shuffle,而且还用于将RDD序列化到磁盘。Kryo 不能成为默认方式的唯一原因是需要用户进行注册 ,但建议在任何"网络密集"应用中进行尝试。自从Spark 2.0.0以来,我们在使用简单类型,简单类型的数组或字符串类型对RDD进行shuffle时,内部使用Kryo serializer。。
Spark自动包含Kryo序列化器,用于Twitter chill库中AllScalaRegistrar涵盖的许多常用的核心Scala类。
通过 registerKryoClasses 类的方法,用Kryo 注册的自定义类。
val conf = new SparkConf().setMaster(...).setAppName(...) conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2])) val sc = new SparkContext(conf)
Kryo documentation 提供更多高效的注册选项,比如添加自定义序列化。
针对对象是很大,你可能还需要增加的 spark.kryoserializer.buffer config。该值对大量序列化有带来一定个性能提升最后,如果您没有注册自定义类,Kryo 仍然可以使用,但它有完整的类名称存储每个对象时,这是一种浪费...
内存优化有三个方面的考虑 : 对象所占用的内存,访问对象的消耗以及垃圾回收所占用的开销。
默认情况下,Java 对象存取速度快,但可以很容易地比内部 “raw” 数据的字段的消耗的 2-5 倍以及更多空间。这是由于以下几个原因 :
先介绍 Spark 内存管理的概述,然后再讨论具体策略使"用户"更有效地在应用去申请内存。特别是,我们将介绍对象的内存使用情况在哪里寻找,以及从哪些方面提升 - 改变你的数据结构,或存储的数据就是序列化格式。以及调整 Spark 缓存、Java 的垃圾收集器。
Spark中的内存使用大部分属于两类:execution 和storage。 execution 内存是指用于以shuffles, joins, sorts and aggregations计算的内存,而storage内存是指用于在集群中缓存和传播内部数据的内存。 在Spark中,execution 和storage共享一个统一的区域(M)。 当没有使用execution 内存时,storage可以获取所有可用的内存,反之亦然。 如果需要,execution 可以驱逐storage,但只有在总存储内存使用量到达某个阈值(R)之前。 换句话说,R描述了M内的一个子区域,其中缓存块永远不会被驱逐。 由于实施的复杂性,storage不得驱逐execution 。
这种设计保证几个显著的特性。首先,应用程序不需要缓存整个执行到空间中,避免不必要的磁盘溢出。其次,使用缓存的应用程序可以保留最小的存储空间(R),其中的数据块不会被移除。最后,这种方法为各种工作负载提供了合理的开箱即用性能,而不需要用户内部如何分配内存的专业知识。
虽然有两种相关配置,但典型用户不需要调整它们,因为默认值适用于大多数工作负载::
spark.memory.fraction 值的配置不仅仅以调试的 JVM 堆空间或 “tenured” 设置。还有一些 GC 优化。
计算数据集所需的内存消耗量的最佳方式是创建RDD,将其放入缓存中,并查看Web UI中的“Storage”页面。 该页面将告诉您RDD占用多少内存。要估计特定对象的内存消耗,请使用SizeEstimator的估计方法这对于尝试使用不同的数据布局来调整内存使用情况以及确定广播变量在每个执行程序堆中占用的空间量非常有用。
减少内存消耗的首要方式是避免了 Java 特性开销,如基于指针的数据结构和二次封装对象。有几个方法可以做到这一点 :
当上面的优化都尝试过了对象同样很大。那么,还有一种减少内存的使用方法“以序列化形式存储数据”,在 RDD 持久化 API 中(RDD persistence API)使用序列化的 StorageLevel 例如 MEMORY_ONLY_SER 。 Spark 将每个 RDD 分区都保存为 byte 数组。序列化带来的唯一不足就是会降低访问速度,因为需要将对象反序列化(using Kryo)。如果需要采用序列化的方式缓存数据,我们强烈建议采用 Kryo,Kryo 序列化结果比 Java 标准序列化的更小(某种程度,甚至比对象内部的 raw 数据都还要小)。
如果你需要不断的“翻动”程序保存的 RDD 数据,JVM 内存回收就可能成为问题(通常,如果只需进行一次 RDD 读取然后进行操作是不会带来问题的)。当需要回收旧对象以便为新对象腾内存空间时,JVM 需要跟踪所有的 Java 对象以确定哪些对象是不再需要的。需要记住的一点是,内存回收的代价与对象的数量正相关;因此,使用对象数量更小的数据结构(例如使用int数组而不是 LinkedList)能显著降低这种消耗。另外一种更好的方法是采用对象序列化,如上面所描述的一样;这样,RDD 的每一个 partition 都会保存为唯一一个对象(一个 byte 数组)。如果内存回收存在问题,在尝试其他方法之前,首先尝试使用 序列化缓存(serialized caching) 。
每项任务(task)的工作内存(运行task所需要的空间)以及缓存在节点的 RDD 之间会相互影响,这种影响也会带来内存回收问题。下面我们讨论如何为 RDD 分配空间以便减轻这种影响。
优化内存回收的第一步是获取一些统计信息,包括内存回收的频率、内存回收耗费的时间等。为了获取这些统计信息,我们可以把参数 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps 添加到 java 选项中( 配置向导里面有关于传 java 选项参数到 Spark job 的信息)。设置完成后,Spark 作业运行时,我们可以在日志中看到每一次内存回收的信息。注意,这些日志保存在集群的工作节点(在他们工作目录下的 stout 文件中)而不是你的驱动程序(driver program )。
为了进一步优化内存回收,我们需要了解 JVM 内存管理的一些基本知识。
Spark 内存回收优化的目标是确保只有长时间存活的 RDD 才保存到老生代区域;同时,新生代区域足够大以保存生命周期比较短的对象。这样,在任务执行期间可以避免执行 full GC 。下面是一些可能有用的执行步骤 :
我们的经历表明有效的内存回收优化取决于你的程序和内存大小。 在网上还有很多更多其他调优选项 , 总体而言有效控制内存回收的频率非常有助于降低额外开销。
executor 的 GC 调优标志位可以在 job 的配置中设置 spark.executor.extraJavaOptions 来指定。
除非你为每步操作设置的 并行度 足够大,否则集群的资源是无法被充分利用的。Spark 自动根据文件大小设置运行在每个文件上的 map 任务的数量(虽然你可以通过 SparkContext.textFile 的可选参数来控制数量,等等),而且对于分布式 reduce 操作,例如 groupByKey 和 reduceByKey ,它使用最大父 RDD 的分区数。你可以通过第二个参数传入并行度(阅读文档 spark.PairRDDFunctions
)或者通过设置系统参数 spark.default.parallelism 来改变默认值。通常来讲,在集群中,我们建议为每一个 CPU 核( core )分配 2-3 个任务。
有时,你会碰到 OutOfMemory 错误,这不是因为你的 RDD 不能加载到内存,而是因为 task 执行的数据集过大,例如正在执行 groupByKey 操作的 reduce 任务。Spark 的 shuffle 操作(sortByKey 、groupByKey 、reduceByKey 、join 等)为了实现 group 会为每一个任务创建哈希表,哈希表有可能非常大。最简单的修复方法是增加并行度,这样,每一个 task 的输入会变小。Spark 能够非常有效的支持短的 task(例如 200ms),因为他会复用一个 executor 的 JVM 来执行多个 task,这样能减小 task 启动的消耗,所以你可以放心的增加任务的并行度到大于集群的 CPU 核数。
使用 SparkContext 的 广播功能 可以有效减小每个序列化的 task 的大小以及在集群中启动 job 的消耗。如果 task 使用 driver program 中比较大的对象(例如静态查找表),考虑将其变成广播变量。Spark 会在 master 打印每一个 task 序列化后的大小,所以你可以通过它来检查 task 是不是过于庞大。通常来讲,大于 20KB 的 task 可能都是值得优化的。
数据本地性会对 Spark jobs 造成重大影响。如果数据和操作数据的代码在一起,计算就会变快。但如果代码和数据是分离的,其中一个必须移动到另一个。一般来说,移动序列化的代码比移动数据块来的快,因为代码大小远小于数据大小。Spark 根据该数据本地性的统一法则来构建 scheduling 计划。
数据本地性是指数据和操作该数据的代码有多近。根据数据的当前路径,有以下几个本地性级别。根据从近到远的数据排列 :
Spark 倾向于调度所有的 task 在最好的本地性级别,但未必总是行得通。如果所有空闲的 executor 都没有未处理的数据,Spark 就会切换到更低的本地性级别。
这样就有两个选择 :
Spark 典型做法是等待一段 timeout(超时)时间直到 CPU 释放资源。一旦 timeout 结束,它就开始移动数据到远处、有空闲 CPU 的地方。各个级别切换所等待的 timeout 时间可以单独配置或统一通过一个参数配置,需要更多细节可以查看配置页面的 spark.locality 参数。如果你的 task 看起来很长而且本地性差,就要考虑增加这些设置值,但默认设置一般都运行良好。
该文指出了 Spark 程序优化所需要关注的几个关键点 - 最主要的是 数据序列化 和 内存优化 。对于大多数程序而言,采用 Kryo 序列化以及以序列化方式存储数据能够解决大部分性能问题。非常欢迎在 Spark mailing list 提问优化相关的问题。