提交 032292ba 编写于 作者: 取昵称好难啊's avatar 取昵称好难啊

"( " to "(\ and " )" to ")"

上级 6f81fcd9
......@@ -4,7 +4,7 @@
# 启动 Spark on YARN
确保 `HADOOP_CONF_DIR` 或者 `YARN_CONF_DIR` 指向包含 Hadoop 集群的(客户端)配置文件的目录。这些配置被用于写入 HDFS 并连接到 YARN ResourceManager。此目录中包含的配置将被分发到 YARN 集群,以便 application(应用程序)使用的所有的所有 containers(容器)都使用相同的配置。如果配置引用了 Java 系统属性或者未由 YARN 管理的环境变量,则还应在 Spark 应用程序的配置(driver(驱动程序),executors(执行器),和在客户端模式下运行时的 AM )。
确保 `HADOOP_CONF_DIR` 或者 `YARN_CONF_DIR` 指向包含 Hadoop 集群的(客户端)配置文件的目录。这些配置被用于写入 HDFS 并连接到 YARN ResourceManager。此目录中包含的配置将被分发到 YARN 集群,以便 application(应用程序)使用的所有的所有 containers(容器)都使用相同的配置。如果配置引用了 Java 系统属性或者未由 YARN 管理的环境变量,则还应在 Spark 应用程序的配置(driver(驱动程序),executors(执行器),和在客户端模式下运行时的 AM)。
有两种部署模式可以用于在 YARN 上启动 Spark 应用程序。在 `cluster` 集群模式下,Spark driver 运行在集群上由 YARN 管理的application master 进程内,并且客户端可以在初始化应用程序后离开。在 `client` 客户端模式下,driver 在客户端进程中运行,并且 application master 仅用于从 YARN 请求资源。
......@@ -79,7 +79,7 @@ yarn logs -applicationId <app ID>
* 使用 `spark-submit` 上传一个自定义的 `log4j.properties`,通过将 spark-submit 添加到要与应用程序一起上传的文件的 –files 列表中。
* add `-Dlog4j.configuration=&lt;location of configuration file&gt;` to `spark.driver.extraJavaOptions` (for the driver) or `spark.executor.extraJavaOptions` (for executors). Note that if using a file, the `file:` protocol should be explicitly provided, and the file needs to exist locally on all the nodes.
* 添加 `-Dlog4j.configuration=&lt;配置文件的位置&gt;``spark.driver.extraJavaOptions`(对于驱动程序)或者 containers(对于执行者)。请注意,如果使用文件,文件:协议(protocol )应该被显式提供,并且该文件需要在所有节点的本地存在。
* 添加 `-Dlog4j.configuration=&lt;配置文件的位置&gt;``spark.driver.extraJavaOptions`(对于驱动程序)或者 containers(对于执行者)。请注意,如果使用文件,文件:协议(protocol)应该被显式提供,并且该文件需要在所有节点的本地存在。
* 更新 `$SPARK_CONF_DIR/log4j.properties` 文件,并且它将与其他配置一起自动上传。请注意,如果指定了多个选项,其他 2 个选项的优先级高于此选项。
请注意,对于第一个选项,executors 和 application master 将共享相同的 log4j 配置,这当它们在同一个节点上运行的时候,可能会导致问题(例如,试图写入相同的日志文件)。
......@@ -126,9 +126,9 @@ To use a custom metrics.properties for the application master and executors, upd
| `spark.yarn.submit.waitAppCompletion` | `true` | 在 YARN cluster 集群模式下,控制客户端是否等待退出,直到应用程序完成。如果设置为 `true`,客户端将保持报告应用程序的状态。否则,客户端进程将在提交后退出。 |
| `spark.yarn.am.nodeLabelExpression` | (none) | 一个将调度限制节点 AM 集合的 YARN 节点标签表达式。只有大于或等于 2.6 版本的 YARN 版本支持节点标签表达式,因此在对较早版本运行时,此属性将被忽略。 |
| `spark.yarn.executor.nodeLabelExpression` | (none) | 一个将调度限制节点执行器(executor)集合的 YARN 节点标签表达式。只有大于或等于 2.6 版本的 YARN 版本支持节点标签表达式,因此在对较早版本运行时,此属性将被忽略。 |
| `spark.yarn.tags` | (none) | 以逗号分隔的字符串列表,作为 YARN application 标记中显示的 YARN application 标记传递,可用于在查询 YARN apps 时进行过滤(filtering )。 |
| `spark.yarn.tags` | (none) | 以逗号分隔的字符串列表,作为 YARN application 标记中显示的 YARN application 标记传递,可用于在查询 YARN apps 时进行过滤(filtering)。 |
| `spark.yarn.keytab` | (none) | 包含上面指定的主体(principal)的 keytab 的文件的完整路径。此 keytab 将通过安全分布式缓存(Secure Distributed Cache)复制到运行 YARN Application Master 的节点,以定期更新 login tickets 和 delegation tokens.(也可以在 "local" master 下工作)。 |
| `spark.yarn.principal` | (none) | 在安全的 HDFS 上运行时用于登录 KDC 的主体(Principal )。(也可以在 "local" master 下工作) |
| `spark.yarn.principal` | (none) | 在安全的 HDFS 上运行时用于登录 KDC 的主体(Principal)。(也可以在 "local" master 下工作) |
| `spark.yarn.config.gatewayPath` | (none) | 一个在网关主机(gateway host)(启动 Spark application 的 host)上有效的路径,但对于集群中其他节点中相同资源的路径可能不同。结合 `spark.yarn.config.replacementPath`,这个用于支持具有异构配置的集群,以便 Spark 可以正确启动远程进程.替换路径(replacement path)通常将包含对由 YARN 导出的某些环境变量(以及,因此对于 Spark 容器可见)的引用.例如,如果网关节点(gateway node)在 `/disk1/hadoop` 上安装了 Hadoop 库,并且 Hadoop 安装的位置由 YARN 作为 `HADOOP_HOME` 环境变量导出,则将此值设置为 `/disk1/hadoop`,将替换路径(replacement path)设置为 `$HADOOP_HOME` 将确保用于启动远程进程的路径正确引用本地 YARN 配置。 |
| `spark.yarn.config.replacementPath` | (none) | 请看 `spark.yarn.config.gatewayPath`。 |
| `spark.yarn.security.credentials.${service}.enabled` | `true` | 控制是否在启用安全性时获取服务的 credentials(凭据)。默认情况下,在这些服务时,将检索所有支持的服务的凭据配置,但如果与某些方式冲突,可以禁用该行为。详情请见 [在安全的集群中运行](running-on-yarn.html#running-in-a-secure-cluster) |
......
......@@ -248,7 +248,7 @@ JavaSerializer | 用于序列化将通过网络发送或需要以序列化形式
| `spark.default.parallelism` | 对于分布式混洗(shuffle)操作,如 `reduceByKey``join`,父 RDD 中分区的最大数量。对于没有父 RDD 的 `parallelize` 操作,它取决于集群管理器:<br><li>本地模式:本地机器上的 core 数<br><li>Mesos 细粒度模式:8<br><li>其他:所有执行器节点上的 core 总数或者 2,以较大者为准 | 如果用户没有指定参数值,则这个属性是 `join``reduceByKey`,和 `parallelize` 等转换返回的 RDD 中的默认分区数。 |
| `spark.executor.heartbeatInterval` | 10s | 每个执行器的心跳与驱动程序之间的间隔。心跳让驱动程序知道执行器仍然存活,并用正在进行的任务的指标更新它 |
| `spark.files.fetchTimeout` | 60s | 获取文件的通讯超时,所获取的文件是从驱动程序通过 SparkContext.addFile() 添加的。 |
| `spark.files.useFetchCache` | true | 如果设置为 true(默认),文件提取将使用由属于同一应用程序的执行器共享的本地缓存,这可以提高在同一主机上运行许多执行器时的任务启动性能。如果设置为 false,这些缓存优化将被禁用,所有执行器将获取它们自己的文件副本。如果使用驻留在 NFS 文件系统上的 Spark 本地目录,可以禁用此优化(有关详细信息,请参阅 [SPARK-6313](https://issues.apache.org/jira/browse/SPARK-6313) )。 |
| `spark.files.useFetchCache` | true | 如果设置为 true(默认),文件提取将使用由属于同一应用程序的执行器共享的本地缓存,这可以提高在同一主机上运行许多执行器时的任务启动性能。如果设置为 false,这些缓存优化将被禁用,所有执行器将获取它们自己的文件副本。如果使用驻留在 NFS 文件系统上的 Spark 本地目录,可以禁用此优化(有关详细信息,请参阅 [SPARK-6313](https://issues.apache.org/jira/browse/SPARK-6313))。 |
| `spark.files.overwrite` | false | 当目标文件存在且其内容与源不匹配的情况下,是否覆盖通过 SparkContext.addFile() 添加的文件。 |
| `spark.files.maxPartitionBytes` | 134217728 (128 MB) | The maximum number of bytes to pack into a single partition when reading files. |
| `spark.files.openCostInBytes` | 4194304 (4 MB) | The estimated cost to open a file, measured by the number of bytes could be scanned in the same time. This is used when putting multiple files into a partition. It is better to over estimate, then the partitions with small files will be faster than partitions with bigger files. |
......@@ -316,7 +316,7 @@ It also allows a different address from the local one to be advertised to execut
| `spark.dynamicAllocation.enabled` | false | 是否使用动态资源分配,它根据工作负载调整为此应用程序注册的执行程序数量。有关更多详细信息,请参阅 [here](job-scheduling.html#dynamic-resource-allocation) 的说明。这需要设置 `spark.shuffle.service.enabled`。以下配置也相关:`spark.dynamicAllocation.minExecutors``spark.dynamicAllocation.maxExecutors``spark.dynamicAllocation.initialExecutors`。 |
| `spark.dynamicAllocation.executorIdleTimeout` | 60s | 如果启用动态分配,并且执行程序已空闲超过此持续时间,则将删除执行程序。有关更多详细信息,请参阅此[description](job-scheduling.html#resource-allocation-policy)。 |
| `spark.dynamicAllocation.cachedExecutorIdleTimeout` | infinity | 如果启用动态分配,并且已缓存数据块的执行程序已空闲超过此持续时间,则将删除执行程序。有关详细信息,请参阅此 [description](job-scheduling.html#resource-allocation-policy)。 |
| `spark.dynamicAllocation.initialExecutors` | `spark.dynamicAllocation.minExecutors` | 启用动态分配时要运行的执行程序的初始数。如果 `--num-executors`(或 `spark.executor.instances` )被设置并大于此值,它将被用作初始执行器数。 |
| `spark.dynamicAllocation.initialExecutors` | `spark.dynamicAllocation.minExecutors` | 启用动态分配时要运行的执行程序的初始数。如果 `--num-executors`(或 `spark.executor.instances`)被设置并大于此值,它将被用作初始执行器数。 |
| `spark.dynamicAllocation.maxExecutors` | infinity | 启用动态分配的执行程序数量的上限。 |
| `spark.dynamicAllocation.minExecutors` | 0 | 启用动态分配的执行程序数量的下限。 |
| `spark.dynamicAllocation.schedulerBacklogTimeout` | 1s | 如果启用动态分配,并且有超过此持续时间的挂起任务积压,则将请求新的执行者。有关更多详细信息,请参阅此 [description](job-scheduling.html#resource-allocation-policy)。 |
......@@ -451,15 +451,15 @@ Spark 中的每个集群管理器都有额外的配置选项,这些配置可
# Environment Variables(环境变量)
通过环境变量配置特定的 Spark 设置。环境变量从 Spark 安装目录下的 `conf/spark-env.sh` 脚本读取(或者是 window 环境下的 `conf/spark-env.cmd` )。在 Standalone 和 Mesos 模式下,这个文件可以指定机器的特定信息,比如 hostnames。它也可以为正在运行的 Spark Application 或者提交脚本提供 sourced(来源).
通过环境变量配置特定的 Spark 设置。环境变量从 Spark 安装目录下的 `conf/spark-env.sh` 脚本读取(或者是 window 环境下的 `conf/spark-env.cmd`)。在 Standalone 和 Mesos 模式下,这个文件可以指定机器的特定信息,比如 hostnames。它也可以为正在运行的 Spark Application 或者提交脚本提供 sourced(来源).
注意,当 Spark 被安装,默认情况下 `conf/spark-env.sh` 是不存在的。但是,你可以通过拷贝 `conf/spark-env.sh.template` 来创建它。确保你的拷贝文件时可执行的。`spark-env.sh`:中有有以下变量可以被设置 :
| Environment Variable(环境变量)| Meaning(含义)|
| --- | --- |
| `JAVA_HOME` | Java 的安装路径(如果不在你的默认 `PATH` 下)。 |
| `PYSPARK_PYTHON` | 在 driver 和 worker 中 PySpark 用到的 Python 二进制可执行文件(如何有默认为 `python2.7`,否则为 `python` )。如果设置了属性 `spark.pyspark.python`,则会优先考虑。 |
| `PYSPARK_DRIVER_PYTHON` | 只在 driver 中 PySpark 用到的 Python 二进制可执行文件(默认为 `PYSPARK_PYTHON` )。如果设置了属性 `spark.pyspark.driver.python` ,则优先考虑。 |
| `SPARKR_DRIVER_R` | SparkR shell 用到的 R 二进制可执行文件(默认为 `R` )。如果设置了属性 `spark.r.shell.command` 则会优先考虑。 |
| `PYSPARK_PYTHON` | 在 driver 和 worker 中 PySpark 用到的 Python 二进制可执行文件(如何有默认为 `python2.7`,否则为 `python`)。如果设置了属性 `spark.pyspark.python`,则会优先考虑。 |
| `PYSPARK_DRIVER_PYTHON` | 只在 driver 中 PySpark 用到的 Python 二进制可执行文件(默认为 `PYSPARK_PYTHON`)。如果设置了属性 `spark.pyspark.driver.python` ,则优先考虑。 |
| `SPARKR_DRIVER_R` | SparkR shell 用到的 R 二进制可执行文件(默认为 `R`)。如果设置了属性 `spark.r.shell.command` 则会优先考虑。 |
| `SPARK_LOCAL_IP` | 机器绑定的 IP 地址。 |
| `SPARK_PUBLIC_DNS` | 你的 Spark 程序通知其他机器的 Hostname。 |
......@@ -475,7 +475,7 @@ Spark 用 [log4j](http://logging.apache.org/log4j/) 生成日志,你可以通
# Overriding configuration directory(覆盖配置目录)
如果你想指定不同的配置目录,而不是默认的 “SPARK_HOME/conf”,你可以设置 SPARK_CONF_DIR。Spark 将从这一目录下读取文件( spark-defaults.conf,spark-env.sh,log4j.properties 等)
如果你想指定不同的配置目录,而不是默认的 “SPARK_HOME/conf”,你可以设置 SPARK_CONF_DIR。Spark 将从这一目录下读取文件(spark-defaults.conf,spark-env.sh,log4j.properties 等)
# Inheriting Hadoop Cluster Configuration(继承 Hadoop 集群配置)
......
......@@ -14,7 +14,7 @@
* [数据本地化](#数据本地化)
* [概要](#概要)
由于大多数 Spark 计算的内存性质,Spark 程序可能由集群中的任何资源( CPU,网络带宽或内存)导致瓶颈。通常情况下,如果数据有合适的内存,瓶颈就是网络带宽,但有时您还需要进行一些调整,例如 [以序列化形式存储 RDD](programming-guide.html#rdd-persistence) 来减少内存的使用。本指南将涵盖两个主要的主题:数据序列化,这对于良好的网络性能至关重要,并且还可以减少内存使用和内存优化。我们选几个较小的主题进行展开。
由于大多数 Spark 计算的内存性质,Spark 程序可能由集群中的任何资源(CPU,网络带宽或内存)导致瓶颈。通常情况下,如果数据有合适的内存,瓶颈就是网络带宽,但有时您还需要进行一些调整,例如 [以序列化形式存储 RDD](programming-guide.html#rdd-persistence) 来减少内存的使用。本指南将涵盖两个主要的主题:数据序列化,这对于良好的网络性能至关重要,并且还可以减少内存使用和内存优化。我们选几个较小的主题进行展开。
# 数据序列化
......@@ -49,7 +49,7 @@ val sc = new SparkContext(conf)
* 每个不同的 Java 对象都有一个 “object header”,它大约是16个字节,包含一个指向它的类的指针。对于一个数据很少的对象(比如说一个`Int`字段),这可以比数据大。
* Java `String` 在原始字符串数据上具有大约40字节的开销(因为它们存储在 `Char` 数组中并保留额外的数据,例如长度),并且由于 UTF-16 的内部使用而将每个字符存储为_两个_字节 `String` 编码。因此,一个10个字符的字符串可以容易地消耗60个字节。
* 公共收集类,例如 `HashMap``LinkedList`,使用链接的数据结构,其中每个条目(例如: `Map.Entry` )存在”包装器”对象。该对象不仅具有 header,还包括指针(通常为8个字节)到列表中的下一个对象。
* 公共收集类,例如 `HashMap``LinkedList`,使用链接的数据结构,其中每个条目(例如: `Map.Entry`)存在”包装器”对象。该对象不仅具有 header,还包括指针(通常为8个字节)到列表中的下一个对象。
* 原始类型的集合通常将它们存储为”盒装”对象,例如: `java.lang.Integer`
本节将从 Spark 的内存管理概述开始,然后讨论用户可以采取的具体策略,以便在他/她的应用程序中更有效地使用内存。具体来说,我们将描述如何确定对象的内存使用情况,以及如何改进数据结构,或通过以串行格式存储数据。然后我们将介绍调整 Spark 的缓存大小和 Java 垃圾回收器。
......@@ -77,7 +77,7 @@ Spark 中的内存使用大部分属于两类:执行和存储。执行存储
减少内存消耗的第一种方法是避免添加开销的 Java 功能,例如基于指针的数据结构和包装对象。有几种方法可以做到这一点:
1. 将数据结构设计为偏好对象数组和原始类型,而不是标准的 Java 或 Scala 集合类(例如: `HashMap` )。该 [fastutil](http://fastutil.di.unimi.it) 库提供方便的集合类基本类型是与 Java 标准库兼容。
1. 将数据结构设计为偏好对象数组和原始类型,而不是标准的 Java 或 Scala 集合类(例如: `HashMap`)。该 [fastutil](http://fastutil.di.unimi.it) 库提供方便的集合类基本类型是与 Java 标准库兼容。
2. 尽可能避免使用很多小对象和指针的嵌套结构。
3. 考虑使用数字 ID 或枚举对象而不是键的字符串。
4. 如果您的 RAM 小于32 GB,请设置 JVM 标志 `-XX:+UseCompressedOops`,使指针为4个字节而不是8个字节。您可以添加这些选项 [`spark-env.sh`](configuration.html#environment-variables)
......@@ -88,13 +88,13 @@ Spark 中的内存使用大部分属于两类:执行和存储。执行存储
## 垃圾收集调整
当您的程序存储的 RDD 有很大的”流失”时,JVM 垃圾收集可能是一个问题。(程序中通常没有问题,只读一次 RDD,然后在其上运行许多操作)。当 Java 需要驱逐旧对象为新的对象腾出空间时,需要跟踪所有 Java 对象并找到未使用的。要记住的要点是,垃圾收集的成本与 Java 对象的数量成正比,因此使用较少对象的数据结构(例如: `Ints` 数组,而不是 `LinkedList` )大大降低了此成本。一个更好的方法是如上所述以序列化形式持久化对象:现在每个 RDD 分区只有一个对象(一个字节数组)。在尝试其他技术之前,如果 GC 是一个问题,首先要使用[序列化缓存](#serialized-rdd-storage)
当您的程序存储的 RDD 有很大的”流失”时,JVM 垃圾收集可能是一个问题。(程序中通常没有问题,只读一次 RDD,然后在其上运行许多操作)。当 Java 需要驱逐旧对象为新的对象腾出空间时,需要跟踪所有 Java 对象并找到未使用的。要记住的要点是,垃圾收集的成本与 Java 对象的数量成正比,因此使用较少对象的数据结构(例如: `Ints` 数组,而不是 `LinkedList`)大大降低了此成本。一个更好的方法是如上所述以序列化形式持久化对象:现在每个 RDD 分区只有一个对象(一个字节数组)。在尝试其他技术之前,如果 GC 是一个问题,首先要使用[序列化缓存](#serialized-rdd-storage)
由于任务的工作记忆(运行任务所需的空间)和缓存在节点上的 RDD 之间的干扰,GC 也可能是一个问题。我们将讨论如何控制分配给RDD缓存的空间来减轻这一点。
**测量 GC 的影响**
GC 调整的第一步是收集关于垃圾收集发生频率和GC花费的时间的统计信息。这可以通过添加 `-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps` 到 Java 选项来完成。(有关将 Java 选项传递给 Spark 作业的信息,请参阅[配置指南](configuration.html#Dynamically-Loading-Spark-Properties))下次运行 Spark 作业时,每当发生垃圾回收时,都会看到在工作日志中打印的消息。请注意,这些日志将在您的群集的工作节点上( `stdout` 在其工作目录中的文件中),而不是您的驱动程序。
GC 调整的第一步是收集关于垃圾收集发生频率和GC花费的时间的统计信息。这可以通过添加 `-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps` 到 Java 选项来完成。(有关将 Java 选项传递给 Spark 作业的信息,请参阅[配置指南](configuration.html#Dynamically-Loading-Spark-Properties))下次运行 Spark 作业时,每当发生垃圾回收时,都会看到在工作日志中打印的消息。请注意,这些日志将在您的群集的工作节点上(`stdout` 在其工作目录中的文件中),而不是您的驱动程序。
**高级 GC 优化**
......@@ -150,7 +150,7 @@ Spark 中 GC 调优的目的是确保只有长寿命的 RDD 存储在 Old 版本
* `RACK_LOCAL` 数据位于同一机架上的服务器上。数据位于同一机架上的不同服务器上,因此需要通过网络发送,通常通过单个交换机发送
* `ANY` 数据在网络上的其他地方,而不在同一个机架中
Spark 喜欢将所有 task 安排在最佳的本地级别,但这并不总是可能的。在任何空闲 executor 中没有未处理数据的情况下,Spark 将切换到较低的本地级别。有两个选项: a )等待一个繁忙的 CPU 释放在相同服务器上的数据上启动任务,或者 b )立即在更远的地方启动一个新的任务,需要在那里移动数据。
Spark 喜欢将所有 task 安排在最佳的本地级别,但这并不总是可能的。在任何空闲 executor 中没有未处理数据的情况下,Spark 将切换到较低的本地级别。有两个选项: a)等待一个繁忙的 CPU 释放在相同服务器上的数据上启动任务,或者 b)立即在更远的地方启动一个新的任务,需要在那里移动数据。
Spark 通常做的是等待一个繁忙的 CPU 释放的希望。一旦超时,它将开始将数据从远处移动到可用的 CPU。每个级别之间的回退等待超时可以在一个参数中单独配置或全部配置; 有关详细信息,请参阅[配置页面](configuration.html#scheduling) `spark.locality` 上的 参数。如果您的 task 很长,并且本地化差,您应该增加这些设置,但默认值通常会很好。
......
......@@ -705,7 +705,7 @@ We could also use `counts.sortByKey()`, for example, to sort the pairs alphabeti
| Action(动作)| Meaning(含义)|
| --- | --- |
| **reduce**(_func_) | 使用函数 _func_ 聚合 dataset 中的元素,这个函数 _func_ 输入为两个元素,返回为一个元素。这个函数应该是可交换(commutative )和关联(associative)的,这样才能保证它可以被并行地正确计算。
| **reduce**(_func_) | 使用函数 _func_ 聚合 dataset 中的元素,这个函数 _func_ 输入为两个元素,返回为一个元素。这个函数应该是可交换(commutative)和关联(associative)的,这样才能保证它可以被并行地正确计算。
| **collect**() | 在 driver 程序中,以一个 array 数组的形式返回 dataset 的所有元素。这在过滤器(filter)或其他操作(other operation)之后返回足够小(sufficiently small)的数据子集通常是有用的。
| **count**() | 返回 dataset 中元素的个数。
| **first**() | 返回 dataset 中的第一个元素(类似于 take(1)。
......@@ -831,7 +831,7 @@ broadcastVar.value();
## Accumulators(累加器)
Accumulators(累加器)是一个仅可以执行 “added”(添加)的变量来通过一个关联和交换操作,因此可以高效地执行支持并行。累加器可以用于实现 counter( 计数,类似在 MapReduce 中那样)或者 sums(求和)。原生 Spark 支持数值型的累加器,并且程序员可以添加新的支持类型。
Accumulators(累加器)是一个仅可以执行 “added”(添加)的变量来通过一个关联和交换操作,因此可以高效地执行支持并行。累加器可以用于实现 counter(计数,类似在 MapReduce 中那样)或者 sums(求和)。原生 Spark 支持数值型的累加器,并且程序员可以添加新的支持类型。
作为一个用户,您可以创建 accumulators(累加器)并且重命名。如下图所示,一个命名的 accumulator 累加器(在这个例子中是 `counter`)将显示在 web UI 中,用于修改该累加器的阶段。Spark 在 “Tasks” 任务表中显示由任务修改的每个累加器的值.
......
......@@ -506,7 +506,7 @@ Input DStreams 也可以从自定义数据源中创建。如果您想这样做
可以有两种基于他们的 _reliability可靠性_ 的数据源。数据源(如 Kafka 和 Flume)允许传输的数据被确认。如果系统从这些可靠的数据来源接收数据,并且被确认(acknowledges)正确地接收数据,它可以确保数据不会因为任何类型的失败而导致数据丢失。这样就出现了 2 种接收器(receivers):
1. _Reliable Receiver(可靠的接收器)_ - 当数据被接收并存储在 Spark 中并带有备份副本时,一个可靠的接收器(reliable receiver)正确地发送确认(acknowledgment)给一个可靠的数据源(reliable source).
2. _Unreliable Receiver(不可靠的接收器)_ - 一个不可靠的接收器( unreliable receiver )不发送确认(acknowledgment)到数据源。这可以用于不支持确认的数据源,或者甚至是可靠的数据源当你不想或者不需要进行复杂的确认的时候.
2. _Unreliable Receiver(不可靠的接收器)_ - 一个不可靠的接收器(unreliable receiver)不发送确认(acknowledgment)到数据源。这可以用于不支持确认的数据源,或者甚至是可靠的数据源当你不想或者不需要进行复杂的确认的时候.
[自定义 Receiver 指南](streaming-custom-receivers.html) 中描述了关于如何去编写一个 reliable receiver(可靠的接收器)的细节.
......@@ -1026,7 +1026,7 @@ words.foreachRDD(process)
请参阅完整的 [源代码](https://github.com/apache/spark/blob/v2.2.0/examples/src/main/python/streaming/sql_network_wordcount.py).
您还可以对来自不同线程的流数据(即异步运行的 StreamingContext )上定义的表运行 SQL 查询。只需确保您将 StreamingContext 设置为记住足够数量的流数据,以便查询可以运行。否则,不知道任何异步 SQL 查询的 StreamingContext 将在查询完成之前删除旧的流数据。例如,如果要查询最后一个批次,但是您的查询可能需要5分钟才能运行,则可以调用 `streamingContext.remember(Minutes(5))`(以 Scala 或其他语言的等价物).
您还可以对来自不同线程的流数据(即异步运行的 StreamingContext)上定义的表运行 SQL 查询。只需确保您将 StreamingContext 设置为记住足够数量的流数据,以便查询可以运行。否则,不知道任何异步 SQL 查询的 StreamingContext 将在查询完成之前删除旧的流数据。例如,如果要查询最后一个批次,但是您的查询可能需要5分钟才能运行,则可以调用 `streamingContext.remember(Minutes(5))`(以 Scala 或其他语言的等价物).
有关DataFrames的更多信息,请参阅 [DataFrames 和 SQL 指南](sql-programming-guide.html).
......@@ -1327,7 +1327,7 @@ wordCounts.foreachRDD(echo)
* _配置 checkpoint_ - 如果 streaming 应用程序需要它,则 Hadoop API 兼容容错存储(例如:HDFS,S3等)中的目录必须配置为 checkpoint 目录,并且流程应用程序以 checkpoint 信息的方式编写 用于故障恢复。有关详细信息,请参阅 [checkpoint](#checkpointing) 部分.
* _配置应用程序 driver 的自动重新启动_ - 要从 driver 故障自动恢复,用于运行流应用程序的部署基础架构必须监视 driver 进程,并在 driver 发生故障时重新启动 driver.不同的 [集群管理者](cluster-overview.html#cluster-manager-types) 有不同的工具来实现这一点.
* _Spark Standalone_ - 可以提交 Spark 应用程序 driver 以在Spark Standalone集群中运行(请参阅 [集群部署模式](spark-standalone.html#launching-spark-applications) ),即应用程序 driver 本身在其中一个工作节点上运行。此外,可以指示独立的群集管理器来监督 driver,如果由于非零退出代码而导致 driver 发生故障,或由于运行 driver 的节点发生故障,则可以重新启动它。有关详细信息,请参阅 [Spark Standalone 指南]](spark-standalone.html) 中的群集模式和监督.
* _Spark Standalone_ - 可以提交 Spark 应用程序 driver 以在Spark Standalone集群中运行(请参阅 [集群部署模式](spark-standalone.html#launching-spark-applications)),即应用程序 driver 本身在其中一个工作节点上运行。此外,可以指示独立的群集管理器来监督 driver,如果由于非零退出代码而导致 driver 发生故障,或由于运行 driver 的节点发生故障,则可以重新启动它。有关详细信息,请参阅 [Spark Standalone 指南]](spark-standalone.html) 中的群集模式和监督.
* _YARN_ - Yarn 支持类似的机制来自动重新启动应用程序.有关详细信息,请参阅 YARN文档.
* _Mesos_ - [Marathon](https://github.com/mesosphere/marathon) 已被用来实现这一点与Mesos.
* _配置预写日志_ - 自 Spark 1.2 以来,我们引入了写入日志来实现强大的容错保证.如果启用,则从 receiver 接收的所有数据都将写入配置 checkpoint 目录中的写入日志.这可以防止 driver 恢复时的数据丢失,从而确保零数据丢失(在 [容错语义](#fault-tolerance-semantics) 部分中详细讨论).可以通过将 [配置参数](configuration.html#spark-streaming) `spark.streaming.receiver.writeAheadLog.enable` 设置为 `true`来启用此功能.然而,这些更强的语义可能以单个 receiver 的接收吞吐量为代价.通过 [并行运行更多的 receiver](#level-of-parallelism-in-data-receiving) 可以纠正这一点,以增加总吞吐量.另外,建议在启用写入日志时,在日志已经存储在复制的存储系统中时,禁用在 Spark 中接收到的数据的复制.这可以通过将输入流的存储级别设置为 `StorageLevel.MEMORY_AND_DISK_SER` 来完成.使用 S3(或任何不支持刷新的文件系统)写入日志时,请记住启用 `spark.streaming.driver.writeAheadLog.closeFileAfterWrite``spark.streaming.receiver.writeAheadLog.closeFileAfterWrite`.有关详细信息,请参阅 [Spark Streaming配](configuration.html#spark-streaming).请注意,启用 I/O 加密时,Spark 不会将写入写入日志的数据加密.如果需要对提前记录数据进行加密,则应将其存储在本地支持加密的文件系统中.
......@@ -1401,13 +1401,13 @@ unifiedStream = streamingContext.union(*kafkaStreams)
unifiedStream.pprint()
```
应考虑的另一个参数是 receiver’s block interval(接收器的块间隔),这由[configuration parameter(配置参数)](configuration.html#spark-streaming)`spark.streaming.blockInterval` 决定.对于大多数 receivers(接收器),接收到的数据 coalesced(合并)在一起存储在 Spark 内存之前的 blocks of data(数据块).每个 batch(批次)中的 blocks(块)数确定将用于处理接收到的数据以 map-like(类似与 map 形式的)transformation(转换)的 task(任务)的数量.每个 receiver(接收器)每 batch(批次)的任务数量将是大约( batch interval(批间隔)/ block interval(块间隔)).例如,200 ms的 block interval(块间隔)每 2 秒 batches(批次)创建 10 个 tasks(任务).如果 tasks(任务)数量太少(即少于每个机器的内核数量),那么它将无效,因为所有可用的内核都不会被使用处理数据.要增加 given batch interval(给定批间隔)的 tasks(任务)数量,请减少 block interval(块间​​隔).但是,推荐的 block interval(块间隔)最小值约为 50ms,低于此任务启动开销可能是一个问题.
应考虑的另一个参数是 receiver’s block interval(接收器的块间隔),这由[configuration parameter(配置参数)](configuration.html#spark-streaming)`spark.streaming.blockInterval` 决定.对于大多数 receivers(接收器),接收到的数据 coalesced(合并)在一起存储在 Spark 内存之前的 blocks of data(数据块).每个 batch(批次)中的 blocks(块)数确定将用于处理接收到的数据以 map-like(类似与 map 形式的)transformation(转换)的 task(任务)的数量.每个 receiver(接收器)每 batch(批次)的任务数量将是大约(batch interval(批间隔)/ block interval(块间隔)).例如,200 ms的 block interval(块间隔)每 2 秒 batches(批次)创建 10 个 tasks(任务).如果 tasks(任务)数量太少(即少于每个机器的内核数量),那么它将无效,因为所有可用的内核都不会被使用处理数据.要增加 given batch interval(给定批间隔)的 tasks(任务)数量,请减少 block interval(块间​​隔).但是,推荐的 block interval(块间隔)最小值约为 50ms,低于此任务启动开销可能是一个问题.
使用 multiple input streams(多个输入流)/ receivers(接收器)接收数据的替代方法是明确 repartition(重新分配)input data stream(输入数据流)(使用 `inputStream.repartition(&lt;number of partitions&gt;)` )。这会在 further processing(进一步处理)之前将 received batches of data(收到的批次数据)distributes(分发)到集群中指定数量的计算机.
使用 multiple input streams(多个输入流)/ receivers(接收器)接收数据的替代方法是明确 repartition(重新分配)input data stream(输入数据流)(使用 `inputStream.repartition(&lt;number of partitions&gt;)`)。这会在 further processing(进一步处理)之前将 received batches of data(收到的批次数据)distributes(分发)到集群中指定数量的计算机.
### Level of Parallelism in Data Processing(数据处理中的并行度水平)
如果在任何 computation(计算)阶段中使用 number of parallel tasks(并行任务的数量),则 Cluster resources(集群资源)可能未得到充分利用。例如,对于 distributed reduce(分布式 reduce)操作,如 `reduceByKey``reduceByKeyAndWindow`,默认并行任务的数量由 `spark.default.parallelism` [configuration property](configuration.html#spark-properties) 控制。您 可以通过 parallelism(并行度)作为参数(见 [`PairDStreamFunctions`](api/scala/index.html#org.apache.spark.streaming.dstream.PairDStreamFunctions) 文档 ),或设置 `spark.default.parallelism` [configuration property](configuration.html#spark-properties) 更改默认值.
如果在任何 computation(计算)阶段中使用 number of parallel tasks(并行任务的数量),则 Cluster resources(集群资源)可能未得到充分利用。例如,对于 distributed reduce(分布式 reduce)操作,如 `reduceByKey``reduceByKeyAndWindow`,默认并行任务的数量由 `spark.default.parallelism` [configuration property](configuration.html#spark-properties) 控制。您 可以通过 parallelism(并行度)作为参数(见 [`PairDStreamFunctions`](api/scala/index.html#org.apache.spark.streaming.dstream.PairDStreamFunctions) 文档),或设置 `spark.default.parallelism` [configuration property](configuration.html#spark-properties) 更改默认值.
### Data Serialization(数据序列化)
......@@ -1453,11 +1453,11 @@ memory tuning(内存调优)的另一个方面是 garbage collection(垃圾
有几个 parameters(参数)可以帮助您调整 memory usage(内存使用量)和 GC 开销:
* **Persistence Level of DStreams(DStreams 的持久性级别)**:如前面在 [Data Serialization](#data-serialization) 部分中所述,input data 和 RDD 默认保持为 serialized bytes(序列化字节).与 deserialized persistence(反序列化持久性)相比,这减少了内存使用量和 GC 开销.启用 Kryo serialization 进一步减少了 serialized sizes(序列化大小)和 memory usage(内存使用).可以通过 compression(压缩)来实现内存使用的进一步减少(参见Spark配置 `spark.rdd.compress` ),代价是 CPU 时间.
* **Persistence Level of DStreams(DStreams 的持久性级别)**:如前面在 [Data Serialization](#data-serialization) 部分中所述,input data 和 RDD 默认保持为 serialized bytes(序列化字节).与 deserialized persistence(反序列化持久性)相比,这减少了内存使用量和 GC 开销.启用 Kryo serialization 进一步减少了 serialized sizes(序列化大小)和 memory usage(内存使用).可以通过 compression(压缩)来实现内存使用的进一步减少(参见Spark配置 `spark.rdd.compress`),代价是 CPU 时间.
* **Clearing old data(清除旧数据)**:默认情况下,DStream 转换生成的所有 input data 和 persisted RDDs 将自动清除。Spark Streaming 决定何时根据所使用的 transformations(转换)来清除数据.例如,如果您使用 10 分钟的 window operation(窗口操作),则 Spark Streaming 将保留最近 10 分钟的数据,并主动丢弃旧数据。数据可以通过设置 `streamingContext.remember` 保持更长的持续时间(例如交互式查询旧数据).
* **CMS Garbage Collector(CMS垃圾收集器)**:强烈建议使用 concurrent mark-and-sweep GC,以保持 GC 相关的暂停始终如一.即使 concurrent GC 已知可以减少 系统的整体处理吞吐量,其使用仍然建议实现更多一致的 batch processing times(批处理时间).确保在 driver(使用 `--driver-java-options``spark-submit` )和 executors(使用 [Spark configuration](configuration.html#runtime-environment) `spark.executor.extraJavaOptions` )中设置 CMS GC.
* **CMS Garbage Collector(CMS垃圾收集器)**:强烈建议使用 concurrent mark-and-sweep GC,以保持 GC 相关的暂停始终如一.即使 concurrent GC 已知可以减少 系统的整体处理吞吐量,其使用仍然建议实现更多一致的 batch processing times(批处理时间).确保在 driver(使用 `--driver-java-options``spark-submit`)和 executors(使用 [Spark configuration](configuration.html#runtime-environment) `spark.executor.extraJavaOptions`)中设置 CMS GC.
* **Other tips(其他提示)**:为了进一步降低 GC 开销,以下是一些更多的提示.
......@@ -1480,7 +1480,7 @@ memory tuning(内存调优)的另一个方面是 garbage collection(垃圾
* 如果您有两个 dstream,将会有两个 RDD 形成,并且将创建两个将被安排在另一个之后的作业.为了避免这种情况,你可以联合两个 dstream .这将确保为 dstream 的两个 RDD 形成一个 unionRDD .这个 unionRDD 然后被认为是一个 single job(单一的工作).但 RDD 的 partitioning(分区)不受影响.
* 如果 batch processing time(批处理时间)超过 batchinterval(批次间隔),那么显然 receiver 的内存将会开始填满,最终会抛出 exceptions(最可能是 BlockNotFoundException ).目前没有办法暂停 receiver .使用 SparkConf 配置 `spark.streaming.receiver.maxRate`,receiver 的 rate 可以受到限制.
* 如果 batch processing time(批处理时间)超过 batchinterval(批次间隔),那么显然 receiver 的内存将会开始填满,最终会抛出 exceptions(最可能是 BlockNotFoundException).目前没有办法暂停 receiver .使用 SparkConf 配置 `spark.streaming.receiver.maxRate`,receiver 的 rate 可以受到限制.
* * *
......@@ -1498,7 +1498,7 @@ memory tuning(内存调优)的另一个方面是 garbage collection(垃圾
2. 如果 RDD 的任何 partition 由于工作节点故障而丢失,则该分区可以是 从 original fault-tolerant dataset(原始容错数据集)中使用业务流程重新计算.
3. 假设所有的 RDD transformations 都是确定性的,最后的数据被转换,无论 Spark 集群中的故障如何,RDD 始终是一样的.
Spark 运行在容错文件系统(如 HDFS 或 S3 )中的数据上.因此,从容错数据生成的所有 RDD 也都是容错的.但是,这不是在大多数情况下,Spark Streaming 作为数据的情况通过网络接收(除非 `fileStream` 被使用).为了为所有生成的 RDD 实现相同的 fault-tolerance properties(容错属性),接收的数据在集群中的工作节点中的多个 Spark executors 之间进行复制(默认 replication factor(备份因子)为 2).这导致了发生故障时需要恢复的系统中的两种数据:
Spark 运行在容错文件系统(如 HDFS 或 S3)中的数据上.因此,从容错数据生成的所有 RDD 也都是容错的.但是,这不是在大多数情况下,Spark Streaming 作为数据的情况通过网络接收(除非 `fileStream` 被使用).为了为所有生成的 RDD 实现相同的 fault-tolerance properties(容错属性),接收的数据在集群中的工作节点中的多个 Spark executors 之间进行复制(默认 replication factor(备份因子)为 2).这导致了发生故障时需要恢复的系统中的两种数据:
1. _Data received and replicated(数据接收和复制)_ - 这个数据在单个工作节点作为副本的故障中幸存下来,它存在于其他节点之一上.
2. _Data received but buffered for replication(接收数据但缓冲进行复制)_ - 由于不复制,恢复此数据的唯一方法是从 source 重新获取.
......@@ -1534,7 +1534,7 @@ streaming systems(流系统)的语义通常是通过系统可以处理每个
2. _Transforming the data(转换数据)_:所有已收到的数据都将被处理 _exactly once_,这得益于 RDD 提供的保证.即使存在故障,只要接收到的输入数据可访问,最终变换的 RDD 将始终具有相同的内容.
3. _Pushing out the data(推出数据)_:默认情况下的输出操作确保 _at-least once_ 语义,因为它取决于输出操作的类型( idempotent(幂等))或 downstream system(下游系统)的语义(是否支持 transactions(事务)).但用户可以实现自己的事务机制来实现 _exactly-once_ 语义.这将在本节后面的更多细节中讨论.
3. _Pushing out the data(推出数据)_:默认情况下的输出操作确保 _at-least once_ 语义,因为它取决于输出操作的类型(idempotent(幂等))或 downstream system(下游系统)的语义(是否支持 transactions(事务)).但用户可以实现自己的事务机制来实现 _exactly-once_ 语义.这将在本节后面的更多细节中讨论.
## Semantics of Received Data(接收数据的语义)
......@@ -1569,7 +1569,7 @@ streaming systems(流系统)的语义通常是通过系统可以处理每个
## Semantics of output operations(输出操作的语义)
Output operations(输出操作)(如 `foreachRDD` )具有 _at-least once_ 语义,也就是说,transformed data(变换后的数据)可能会不止一次写入 external entity(外部实体)在一个 worker 故障事件中.虽然这是可以接受的使用 `saveAs***Files`操作(因为文件将被相同的数据简单地覆盖)保存到文件系统,可能需要额外的努力来实现 exactly-once(一次且仅一次)语义.有两种方法.
Output operations(输出操作)(如 `foreachRDD`)具有 _at-least once_ 语义,也就是说,transformed data(变换后的数据)可能会不止一次写入 external entity(外部实体)在一个 worker 故障事件中.虽然这是可以接受的使用 `saveAs***Files`操作(因为文件将被相同的数据简单地覆盖)保存到文件系统,可能需要额外的努力来实现 exactly-once(一次且仅一次)语义.有两种方法.
* _Idempotent updates(幂等更新)_:多次尝试总是写入相同的数据.例如,`saveAs***Files` 总是将相同的数据写入生成的文件.
......
......@@ -1383,7 +1383,7 @@ Spark SQL 支持通过 DataFrame 接口对各种 data sources(数据源)进
## Generic Load/Save Functions(通用 加载/保存 功能)
在最简单的形式中,默认数据源(`parquet`,除非另有配置 `spark.sql.sources.default` )将用于所有操作.
在最简单的形式中,默认数据源(`parquet`,除非另有配置 `spark.sql.sources.default`)将用于所有操作.
```
val usersDF = spark.read.load("examples/src/main/resources/users.parquet")
......@@ -1416,7 +1416,7 @@ write.df(select(df, "name", "favorite_color"), "namesAndFavColors.parquet")
### Manually Specifying Options(手动指定选项)
您还可以 manually specify(手动指定)将与任何你想传递给 data source 的其他选项一起使用的 data source。Data sources 由其 fully qualified name(完全限定名称)(即 `org.apache.spark.sql.parquet` ),但是对于 built-in sources(内置的源),你也可以使用它们的 shortnames(短名称)(`json``parquet``jdbc``orc``libsvm``csv``text`).从任何 data source type(数据源类型)加载 DataFrames 可以使用此 syntax(语法)转换为其他类型.
您还可以 manually specify(手动指定)将与任何你想传递给 data source 的其他选项一起使用的 data source。Data sources 由其 fully qualified name(完全限定名称)(即 `org.apache.spark.sql.parquet`),但是对于 built-in sources(内置的源),你也可以使用它们的 shortnames(短名称)(`json``parquet``jdbc``orc``libsvm``csv``text`).从任何 data source type(数据源类型)加载 DataFrames 可以使用此 syntax(语法)转换为其他类型.
```
val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json")
......@@ -1490,7 +1490,7 @@ Save operations(保存操作)可以选择使用 `SaveMode`,它指定如何
### Saving to Persistent Tables(保存到持久表)
`DataFrames` 也可以使用 `saveAsTable` 命令作为 persistent tables(持久表)保存到 Hive metastore 中。请注意,existing Hive deployment(现有的 Hive 部署)不需要使用此功能。Spark 将为您创建默认的 local Hive metastore(本地 Hive metastore)(使用 Derby )。与 `createOrReplaceTempView` 命令不同,`saveAsTable` 将 materialize(实现)DataFrame 的内容,并创建一个指向 Hive metastore 中数据的指针。即使您的 Spark 程序重新启动,Persistent tables(持久性表)仍然存在,因为您保持与同一个 metastore 的连接。可以通过使用表的名称在 `SparkSession` 上调用 `table` 方法来创建 persistent tabl(持久表)的 DataFrame .
`DataFrames` 也可以使用 `saveAsTable` 命令作为 persistent tables(持久表)保存到 Hive metastore 中。请注意,existing Hive deployment(现有的 Hive 部署)不需要使用此功能。Spark 将为您创建默认的 local Hive metastore(本地 Hive metastore)(使用 Derby)。与 `createOrReplaceTempView` 命令不同,`saveAsTable` 将 materialize(实现)DataFrame 的内容,并创建一个指向 Hive metastore 中数据的指针。即使您的 Spark 程序重新启动,Persistent tables(持久性表)仍然存在,因为您保持与同一个 metastore 的连接。可以通过使用表的名称在 `SparkSession` 上调用 `table` 方法来创建 persistent tabl(持久表)的 DataFrame .
对于 file-based(基于文件)的 data source(数据源),例如 text,parquet,json等,您可以通过 `path` 选项指定 custom table path(自定义表路径),例如 `df.write.option("path", "/some/path").saveAsTable("t")`。当表被 dropped(删除)时,custom table path(自定义表路径)将不会被删除,并且表数据仍然存在。如果未指定自定义表路径,Spark 将把数据写入 warehouse directory(仓库目录)下的默认表路径。当表被删除时,默认的表路径也将被删除.
......@@ -2523,7 +2523,7 @@ bin/spark-shell --driver-class-path postgresql-9.4.1207.jar --jars postgresql-9.
| `batchsize` | JDBC 批处理的大小,用于确定每次数据往返传递的行数。这有利于提升 JDBC driver 的性能。该选项仅适用于写操作。默认值为 `1000`
| `isolationLevel` | 事务隔离级别,适用于当前连接。它可以是 `NONE``READ_COMMITTED``READ_UNCOMMITTED``REPEATABLE_READ`,或 `SERIALIZABLE` 之一,对应于 JDBC 连接对象定义的标准事务隔离级别,默认为 `READ_UNCOMMITTED`。此选项仅适用于写操作。请参考 `java.sql.Connection` 中的文档。 |
| `truncate` | 这是一个与 JDBC 相关的选项。启用 `SaveMode.Overwrite` 时,此选项会导致 Spark 截断现有表,而不是删除并重新创建。这可以更有效,并且防止表元数据(例如,索引)被移除。但是,在某些情况下,例如当新数据具有不同的模式时,它将无法工作。它默认为 `false`。此选项仅适用于写操作。 |
| `createTableOptions` | 这是一个与JDBC相关的选项。如果指定,此选项允许在创建表时设置特定于数据库的表和分区选项(例如:`CREATE TABLE t (name string) ENGINE=InnoDB.` )。此选项仅适用于写操作。 |
| `createTableOptions` | 这是一个与JDBC相关的选项。如果指定,此选项允许在创建表时设置特定于数据库的表和分区选项(例如:`CREATE TABLE t (name string) ENGINE=InnoDB.`)。此选项仅适用于写操作。 |
| `createTableColumnTypes` | 使用数据库列数据类型而不是默认值,创建表时。数据类型信息应以与 CREATE TABLE 列语法相同的格式指定(例如:`"name CHAR(64), comments VARCHAR(1024)"`)。指定的类型应该是有效的 spark sql 数据类型。此选项仅适用于写操作。 |
```
......
......@@ -31,7 +31,7 @@
# 概述
GraphX 是 Spark 中用于图形和图形并行计算的新组件。在高层次上,GraphX 通过引入一个新的[图形](#property_graph)抽象来扩展 Spark [RDD](api/scala/index.html#org.apache.spark.rdd.RDD):一种具有附加到每个顶点和边缘的属性的定向多重图形。为了支持图计算,GraphX 公开了一组基本运算符(例如: [subgraph](#structural_operators)[joinVertices](#join_operators)[aggregateMessages](#aggregateMessages) )以及 [Pregel](#pregel) API 的优化变体。此外,GraphX 还包括越来越多的图形[算法](#graph_algorithms)[构建器](#graph_builders),以简化图形分析任务。
GraphX 是 Spark 中用于图形和图形并行计算的新组件。在高层次上,GraphX 通过引入一个新的[图形](#property_graph)抽象来扩展 Spark [RDD](api/scala/index.html#org.apache.spark.rdd.RDD):一种具有附加到每个顶点和边缘的属性的定向多重图形。为了支持图计算,GraphX 公开了一组基本运算符(例如: [subgraph](#structural_operators)[joinVertices](#join_operators)[aggregateMessages](#aggregateMessages))以及 [Pregel](#pregel) API 的优化变体。此外,GraphX 还包括越来越多的图形[算法](#graph_algorithms)[构建器](#graph_builders),以简化图形分析任务。
# 入门
......@@ -48,7 +48,7 @@ import org.apache.spark.rdd.RDD
# 属性 Graph
[属性 Graph](api/scala/index.html#org.apache.spark.graphx.Graph) 是一个定向多重图形,用户定义的对象附加到每个顶点和边缘。定向多图是具有共享相同源和目标顶点的潜在多个平行边缘的有向图。支持平行边缘的能力简化了在相同顶点之间可以有多个关系(例如: 同事和朋友)的建模场景。每个顶点都由唯一的64位长标识符( `VertexId` )键入。GraphX 不对顶点标识符施加任何排序约束。类似地,边缘具有对应的源和目标顶点标识符。
[属性 Graph](api/scala/index.html#org.apache.spark.graphx.Graph) 是一个定向多重图形,用户定义的对象附加到每个顶点和边缘。定向多图是具有共享相同源和目标顶点的潜在多个平行边缘的有向图。支持平行边缘的能力简化了在相同顶点之间可以有多个关系(例如: 同事和朋友)的建模场景。每个顶点都由唯一的64位长标识符(`VertexId`)键入。GraphX 不对顶点标识符施加任何排序约束。类似地,边缘具有对应的源和目标顶点标识符。
属性图是通过 vertex (`VD`)和 edge (`ED`) 类型进行参数化的。这些是分别与每个顶点和边缘相关联的对象的类型。
......@@ -151,7 +151,7 @@ facts.collect.foreach(println(_))
# Graph 运算符
正如 RDDs 有这样的基本操作 `map``filter`,以及 `reduceByKey`,性能图表也有采取用户定义的函数基本运算符的集合,产生具有转化特性和结构的新图。定义了优化实现的核心运算符,并定义了 [`Graph`](api/scala/index.html#org.apache.spark.graphx.Graph) 表示为核心运算符组合的方便运算符 [`GraphOps`](api/scala/index.html#org.apache.spark.graphx.GraphOps)。不过,由于 Scala 的含义,操作员 `GraphOps` 可自动作为成员使用 `Graph`。例如,我们可以通过以下方法计算每个顶点的入度(定义 `GraphOps` ):
正如 RDDs 有这样的基本操作 `map``filter`,以及 `reduceByKey`,性能图表也有采取用户定义的函数基本运算符的集合,产生具有转化特性和结构的新图。定义了优化实现的核心运算符,并定义了 [`Graph`](api/scala/index.html#org.apache.spark.graphx.Graph) 表示为核心运算符组合的方便运算符 [`GraphOps`](api/scala/index.html#org.apache.spark.graphx.GraphOps)。不过,由于 Scala 的含义,操作员 `GraphOps` 可自动作为成员使用 `Graph`。例如,我们可以通过以下方法计算每个顶点的入度(定义 `GraphOps`):
```
val graph: Graph[(String, String), String]
......@@ -329,7 +329,7 @@ val validCCGraph = ccGraph.mask(validGraph)
## Join 运算符
在许多情况下,有必要使用图形连接来自外部收集( RDD )的数据。例如,我们可能有额外的用户属性,我们要与现有的图形合并,或者我们可能希望将顶点属性从一个图形拉到另一个。这些任务可以使用 _join_ 运算符完成。下面我们列出关键 join 运算符:
在许多情况下,有必要使用图形连接来自外部收集(RDD)的数据。例如,我们可能有额外的用户属性,我们要与现有的图形合并,或者我们可能希望将顶点属性从一个图形拉到另一个。这些任务可以使用 _join_ 运算符完成。下面我们列出关键 join 运算符:
```
class Graph[VD, ED] {
......@@ -373,7 +373,7 @@ val joinedGraph = graph.joinVertices(uniqueCosts,
## 邻域聚合
许多图形分析任务的关键步骤是聚合关于每个顶点邻域的信息。例如,我们可能想知道每个用户拥有的关注者数量或每个用户的追随者的平均年龄。许多迭代图表算法(例如:网页级别,最短路径,以及连接成分)相邻顶点(例如:电流值的 PageRank,最短到源路径,和最小可达顶点 ID )的重复聚合性质。
许多图形分析任务的关键步骤是聚合关于每个顶点邻域的信息。例如,我们可能想知道每个用户拥有的关注者数量或每个用户的追随者的平均年龄。许多迭代图表算法(例如:网页级别,最短路径,以及连接成分)相邻顶点(例如:电流值的 PageRank,最短到源路径,和最小可达顶点 ID)的重复聚合性质。
> 为了提高性能,主聚合操作员 `graph.mapReduceTriplets` 从新的更改 `graph.AggregateMessages`。虽然 API 的变化相对较小,但我们在下面提供了一个转换指南。
......@@ -497,7 +497,7 @@ class GraphOps[VD, ED] {
## Caching and Uncaching
在 Spark 中,默认情况下,RDD 不会保留在内存中。为了避免重新计算,在多次使用它们时,必须明确缓存它们(参见 [Spark Programming Guide](programming-guide.html#rdd-persistence) )。GraphX 中的图形表现方式相同。**当多次使用图表时,请务必先调用 [`Graph.cache()`](api/scala/index.html#org.apache.spark.graphx.Graph@cache():Graph[VD,ED])。**
在 Spark 中,默认情况下,RDD 不会保留在内存中。为了避免重新计算,在多次使用它们时,必须明确缓存它们(参见 [Spark Programming Guide](programming-guide.html#rdd-persistence))。GraphX 中的图形表现方式相同。**当多次使用图表时,请务必先调用 [`Graph.cache()`](api/scala/index.html#org.apache.spark.graphx.Graph@cache():Graph[VD,ED])。**
在迭代计算中,_uncaching_ 也可能是最佳性能所必需的。默认情况下,缓存的 RDD 和图形将保留在内存中,直到内存压力迫使它们以 LRU 顺序逐出。对于迭代计算,来自先前迭代的中间结果将填满缓存。虽然它们最终被驱逐出来,但存储在内存中的不必要的数据会减慢垃圾收集速度。一旦不再需要中间结果,就会更有效率。这涉及每次迭代实现(缓存和强制)图形或 RDD,取消所有其他数据集,并且仅在将来的迭代中使用实例化数据集。然而,由于图形由多个 RDD 组成,所以很难将它们正确地分开。**对于迭代计算,我们建议使用Pregel API,它可以正确地解析中间结果。**
......@@ -546,7 +546,7 @@ class GraphOps[VD, ED] {
}
```
请注意,Pregel 需要两个参数列表(即:`graph.pregel(list1)(list2)`。第一个参数列表包含配置参数,包括初始消息,最大迭代次数以及发送消息的边缘方向(默认情况下为边缘)。第二个参数列表包含用于接收消息(顶点程序 `vprog`),计算消息( `sendMsg` )和组合消息的用户定义函数 `mergeMsg`
请注意,Pregel 需要两个参数列表(即:`graph.pregel(list1)(list2)`。第一个参数列表包含配置参数,包括初始消息,最大迭代次数以及发送消息的边缘方向(默认情况下为边缘)。第二个参数列表包含用于接收消息(顶点程序 `vprog`),计算消息(`sendMsg`)和组合消息的用户定义函数 `mergeMsg`
在以下示例中,我们可以使用 Pregel 运算符来表达单源最短路径的计算。
......@@ -592,7 +592,7 @@ object GraphLoader {
}
```
[`GraphLoader.edgeListFile`](api/scala/index.html#org.apache.spark.graphx.GraphLoader$@edgeListFile(SparkContext,String,Boolean,Int):Graph[Int,Int]) 提供了从磁盘边缘列表中加载图形的方法。它解析以下形式的(源顶点 ID,目标顶点 ID )对的邻接列表,跳过以下开始的注释行 `#`
[`GraphLoader.edgeListFile`](api/scala/index.html#org.apache.spark.graphx.GraphLoader$@edgeListFile(SparkContext,String,Boolean,Int):Graph[Int,Int]) 提供了从磁盘边缘列表中加载图形的方法。它解析以下形式的(源顶点 ID,目标顶点 ID)对的邻接列表,跳过以下开始的注释行 `#`
```
# This is a comment
......
......@@ -189,7 +189,7 @@ RDDs 是被一元素类型参数化的静态类型对象,比如,RDD[Int] 表
![](img/528d923f7f97cd08ede583ebf28f5f9a.jpg)
其中 links 表示( URL , outlinks )键值对。这个程序的 RDD 的血缘关系图如图三。在每一次迭代中我们都是根据上一次迭代的 contribs 和 ranks 以及原始不变的 links 数据集来创建一个新的 ranks 数据集。随着迭代次数的变多这张图会变的越长,这个是这个图比较有意思的特点。如果这个 job 的迭代次数很多的话,那么备份一些版本的 ranks 来达到减少从错误中恢复出来的时间是很有必要的,用户可以调用标记为 RELIABLE 的 persist 函数来达到这个目的。需要注意的是,links 是不需要备份的,因为它的分区数据可以快速的从重新计算输入文件中对应的数据块而得到,这个数据集一般会比 ranks 数据集大上很多倍,因为每一个文档会有很多的连接但只会有一个排名值,所以利用 RDD 的血缘关系来恢复数据肯定比 checkpoint 内存中的数据快很多(因为数据量太大).
其中 links 表示(URL , outlinks)键值对。这个程序的 RDD 的血缘关系图如图三。在每一次迭代中我们都是根据上一次迭代的 contribs 和 ranks 以及原始不变的 links 数据集来创建一个新的 ranks 数据集。随着迭代次数的变多这张图会变的越长,这个是这个图比较有意思的特点。如果这个 job 的迭代次数很多的话,那么备份一些版本的 ranks 来达到减少从错误中恢复出来的时间是很有必要的,用户可以调用标记为 RELIABLE 的 persist 函数来达到这个目的。需要注意的是,links 是不需要备份的,因为它的分区数据可以快速的从重新计算输入文件中对应的数据块而得到,这个数据集一般会比 ranks 数据集大上很多倍,因为每一个文档会有很多的连接但只会有一个排名值,所以利用 RDD 的血缘关系来恢复数据肯定比 checkpoint 内存中的数据快很多(因为数据量太大).
最后,我们可以控制 RDDs 的分区方式来优化 PageRank 中的节点通讯。如果我们事先为 links 指定一个分区方式(比如,根据 link 的 url 来 hash 分区,就是将相同的 url 发送到同一个节点中),然后我们对 ranks 进行相同的分区方式,这样就可以保证 links 和 ranks 之间的 join 不需要机器节点之间的通讯(因为相同的 url 都在同一个机器节点了,那么相对应的 rank 和 link 肯定也是在同一个机器节点了)。我们也可以自定义分区器来实现将一组页面 url 放到一起(比如按照 url 的 domain 进行分区)。以上两种优化方式都可以通过在定义 links 的时候调用 partitionBy 来实现:
......@@ -234,13 +234,13 @@ Spark 中的这些 RDDs 的通用接口使的实现很多 transformations 操作
Spark 可以利用已经存在的 hadoop 的 api 组件读取任何的 hadoop 的输入数据源(比如:HDFS 和 Hbase 等),这个程序 api 是运行在没有更改的 scala 版本上.
我们会简要的概括下几个比较有意思的技术点:我们的 job 调度器( 5.1 节),可以用于交互的 spark 解释器( 5.2 节),内存管理( 5.3 节)以及对 checkpointing 的支持( 5.4 节).
我们会简要的概括下几个比较有意思的技术点:我们的 job 调度器(5.1 节),可以用于交互的 spark 解释器(5.2 节),内存管理(5.3 节)以及对 checkpointing 的支持(5.4 节).
### 5.1 job 调度器
spark 的调度器依赖我们在第 4 章中讨论的 RDDs 的表达.
从总体上看,我们的调度系统有点和 Dryad 相似,但是它还考虑了被存储的 RDDs 的哪些分区还在内存中.当一个用户对某个 RDD 调用了 action 操作(比如 count 或者 save )的时候调度器会检查这个 RDD 的血缘关系图,然后根据这个血缘关系图构建一个含有 stages 的有向无环图( DAG ),最后按照步骤执行这个 DAG 中的 stages,如图 5 的说明.每一个 stage 包含了尽可能多的带有窄依赖的 transformations 操作。这个 stage 的划分是根据需要 shuffle 操作的宽依赖或者任何可以切断对父亲 RDD 计算的某个操作(因为这些父亲 RDD 的分区已经计算过了)。然后调度器可以调度启动 tasks 来执行没有父亲 stage 的 stage(或者父亲 stage 已经计算好了的 stage ),一直到计算完我们的最后的目标 RDD .
从总体上看,我们的调度系统有点和 Dryad 相似,但是它还考虑了被存储的 RDDs 的哪些分区还在内存中.当一个用户对某个 RDD 调用了 action 操作(比如 count 或者 save)的时候调度器会检查这个 RDD 的血缘关系图,然后根据这个血缘关系图构建一个含有 stages 的有向无环图(DAG),最后按照步骤执行这个 DAG 中的 stages,如图 5 的说明.每一个 stage 包含了尽可能多的带有窄依赖的 transformations 操作。这个 stage 的划分是根据需要 shuffle 操作的宽依赖或者任何可以切断对父亲 RDD 计算的某个操作(因为这些父亲 RDD 的分区已经计算过了)。然后调度器可以调度启动 tasks 来执行没有父亲 stage 的 stage(或者父亲 stage 已经计算好了的 stage),一直到计算完我们的最后的目标 RDD .
![](img/9a1ca367603942cde5c30e70396e8fa3.jpg) 图五:怎么计算 spark job stage 的例子.实现的方框表示 RDDs ,带有颜色的方形表示分区,黑色的是表示这个分区的数据存储在内存中,对 RDD G 调用 action 操作,我们根据宽依赖生成很多 stages,且将窄依赖的 transformations 操作放在 stage 中.在这个场景中,stage 1 的输出结果已经在内存中,所以我们开始运行 stage 2,然后是 stage 3.
......@@ -248,7 +248,7 @@ spark 的调度器依赖我们在第 4 章中讨论的 RDDs 的表达.
对于宽依赖(比如 shuffle 依赖),我们将中间数据写入到节点的磁盘中以利于从错误中恢复,这个和 MapReduce 将 map 后的结果写入到磁盘中是很相似的.
只要一个任务所在的 stage 的父亲 stage 还是有效的话,那么当这个 task 失败的时候,我们就可以在其他的机器节点中重新跑这个任务。如果一些 stages 变的无效的话(比如因为一个 shuffle 过程中 map 端的一个输出结果丢失了),我们需要重新并行提交没有父亲 stage 的 stage(或者父亲 stage 已经计算好了的 stage )的计算任务。虽然备份 RDD 的血缘关系图示比较容易的,但是我们还不能容忍调度器调度失败的场景.
只要一个任务所在的 stage 的父亲 stage 还是有效的话,那么当这个 task 失败的时候,我们就可以在其他的机器节点中重新跑这个任务。如果一些 stages 变的无效的话(比如因为一个 shuffle 过程中 map 端的一个输出结果丢失了),我们需要重新并行提交没有父亲 stage 的 stage(或者父亲 stage 已经计算好了的 stage)的计算任务。虽然备份 RDD 的血缘关系图示比较容易的,但是我们还不能容忍调度器调度失败的场景.
虽然目前 spark 中所有的计算都是响应 driver 程序中调用的 action 操作,但是我们也是需要尝试在集群中调用 lookup 操作,这种操作是根据 key 来随机访问已经 hash 分区过的 RDD 所有元素以获取相应的 value。在这种场景中,如果一个分区没有计算的话,那么 task 需要将这个信息告诉调度器.
......@@ -261,7 +261,7 @@ Scala 解释器通常是将用户输入的每一行代码编译成一个类,
我们对 spark 中的解释器做了如下两个改变:
1. Class shipping:为了让 worker 节点能拿到用户输入的每一行代码编译成的 class 的二进制代码,我们使的解释器为这些 classes 的二进制代码提供 HTTP 服务.
2. 修改了代码生成:正常情况下,我们通过访问对应的类的静态方法来达到访问将用户输入每一行代码编译成的单例对象.这个以为着,当我们将一个含有在前面行中定义的变量(比如上面例子中的 Line 1.x )的闭包序列化发送到 worker 节点的时候,java 是不会通过对象图来跟踪含有 x 的实力 Line 1 的,这样的话 worker 节点将收不到变量 x.我们修改了代码生成逻辑来达到能直接引用每一行代码生成的实例.
2. 修改了代码生成:正常情况下,我们通过访问对应的类的静态方法来达到访问将用户输入每一行代码编译成的单例对象.这个以为着,当我们将一个含有在前面行中定义的变量(比如上面例子中的 Line 1.x)的闭包序列化发送到 worker 节点的时候,java 是不会通过对象图来跟踪含有 x 的实力 Line 1 的,这样的话 worker 节点将收不到变量 x.我们修改了代码生成逻辑来达到能直接引用每一行代码生成的实例.
图六显示了经过我们的改变后,解释器是如何将用户输入的一系列的代码转换成 java 对象.
......@@ -283,7 +283,7 @@ Spark 在持久化 RDDs 的时候提供了 3 种存储选:存在内存中的
一般来说,checkpointing 对具有很长的血缘关系链且包含了宽依赖的 RDDs 是非常有用的,比如我们在 3.2.2 小节中提到的 PageRank 的例子。在这些场景下,集群中的某个节点的失败会导致每一个父亲 RDD 的一些数据的丢失,进而需要重新所有的计算。与此相反的,对于存储在稳定存储系统中且是窄依赖的 RDDs(比如 3.2.1 小节中线性回归例子中的 points 和 PageRank 中的 link 列表数据),checkpointing 可能一点用都没有。如果一个节点失败了,我们可以在其他的节点中并行的重新计算出丢失了数据的分区,这个成本只是备份整个 RDD 的成本的一点点而已.
spark 目前提供了一个 checkpointing 的 api( persist 中的标识为 REPLICATE,还有 checkpoint()),但是需要将哪些数据需要 checkpointing 的决定权留给了用户。然而,我们也在调查怎么样自动的 checkpoing,因为我们的调度系统知道数据集的大小以及第一次计算这个数据集花的时间,所以有必要选择一些最佳的 RDDs 来进行 checkpointing,来达到最小化恢复时间
spark 目前提供了一个 checkpointing 的 api(persist 中的标识为 REPLICATE,还有 checkpoint()),但是需要将哪些数据需要 checkpointing 的决定权留给了用户。然而,我们也在调查怎么样自动的 checkpoing,因为我们的调度系统知道数据集的大小以及第一次计算这个数据集花的时间,所以有必要选择一些最佳的 RDDs 来进行 checkpointing,来达到最小化恢复时间
最后,需要知道的事 RDDs 天生的只读的特性使的他们比一般的共享内存系统做 checkpointing 更简单了。因为不用考虑数据的一致性,我们可以不终止程序或者 take 快照,然后在后台将 RDDs 的数据写入到存储系统中.
......@@ -296,7 +296,7 @@ spark 目前提供了一个 checkpointing 的 api( persist 中的标识为 REP
* 当节点失败的时候,spark 可以通过重新计算失去的 rdd 分区数据达到快速的恢复.
* spark 在查询 1 TB 的数据的时候的延迟可以控制在 5 到 7 秒.
我们通过和 hadoop 对比,展示迭代式机器学习( 6.1 节)和 PageRank( 6.2 节)的基准测试.然后我们评估了 spark 的错误恢复机制( 6.3 节)以及当内存不足以存储一个数据集的行为( 6.4 节),最后我们讨论了用户应用( 6.5 节)和交互式数据挖掘( 6.6 节)的结果 除非另外声明,我们都是用类型为 m 1.xlarge 的 EC 2 节点,4 核以及 15 GB 内存。我们是有数据块大小为 256 M 的 HDFS 存储系统。在每一次测试之前,我们都会清理 OS 的缓存,以达到准确的测量 IO 成本的目的
我们通过和 hadoop 对比,展示迭代式机器学习(6.1 节)和 PageRank(6.2 节)的基准测试.然后我们评估了 spark 的错误恢复机制(6.3 节)以及当内存不足以存储一个数据集的行为(6.4 节),最后我们讨论了用户应用(6.5 节)和交互式数据挖掘(6.6 节)的结果 除非另外声明,我们都是用类型为 m 1.xlarge 的 EC 2 节点,4 核以及 15 GB 内存。我们是有数据块大小为 256 M 的 HDFS 存储系统。在每一次测试之前,我们都会清理 OS 的缓存,以达到准确的测量 IO 成本的目的
### 6.1 迭代式机器学习应用
......@@ -314,7 +314,7 @@ spark 目前提供了一个 checkpointing 的 api( persist 中的标识为 REP
![](img/f7825d58d347fcf078aa4696b00e2a0b.jpg) 图八: hadoop 、 hadoopBinMem 以及 spark 在随后的迭代花的时间,都是处理 100 G 的数据
**理解为什么提速了**:我们惊奇的发现 spark 甚至比基于内存存储二进制数据的 hadoopBinMem 还要快 20 倍。在 hadoopBinMem 中,我们使用的是 hadoop 标准的二进制文件格式( sequenceFile )和 256 m 这么大的数据块大小,以及我们强制将 hadoop 的数据目录放在一个内存的文件系统中。然而,Hadoop 仍然因为下面几点而比 spark 慢:
**理解为什么提速了**:我们惊奇的发现 spark 甚至比基于内存存储二进制数据的 hadoopBinMem 还要快 20 倍。在 hadoopBinMem 中,我们使用的是 hadoop 标准的二进制文件格式(sequenceFile)和 256 m 这么大的数据块大小,以及我们强制将 hadoop 的数据目录放在一个内存的文件系统中。然而,Hadoop 仍然因为下面几点而比 spark 慢:
1. Hadoop 软件栈的最低开销.
2. HDFS 提供数据服务的开销.
......@@ -322,7 +322,7 @@ spark 目前提供了一个 checkpointing 的 api( persist 中的标识为 REP
我们依次来调查上面的每一个因素.为了测量第一个因素,我们跑了一些空的 hadoop 任务,我们发现单单完成 job 的设置、任务的启动以及任务的清理等工作就花掉了至少 25 秒钟。对于第二个元素,我们发现 HDFS 需要执行多份内存数据的拷贝以及为每一个数据块做 checksum 计算.
最后,为了测试第 3 个因素,我们在单机上做了一个微型的基准测试,就是针对不同文件类型的 256 M 数据来跑线性回归计算。我们特别的对比了分别从 HDFS 文件( HDFS 技术栈的耗时将会很明显)和本地内存文件(内核可以很高效的将数据传输给应用程序)中处理文本和二进制类型数据所话的时间、
最后,为了测试第 3 个因素,我们在单机上做了一个微型的基准测试,就是针对不同文件类型的 256 M 数据来跑线性回归计算。我们特别的对比了分别从 HDFS 文件(HDFS 技术栈的耗时将会很明显)和本地内存文件(内核可以很高效的将数据传输给应用程序)中处理文本和二进制类型数据所话的时间、
图九中是我们我们测试结果的展示。从 In - memory HDFS(数据是在本地机器中的内存中)中读数据比从本地内存文件中读数据要多花费 2 秒中.解析文本文件要比解析二进制文件多花费 7 秒钟。最后,即使从本地内存文件中读数据,但是将预先解析了二进制数据转换成 java 对象也需要 3 秒钟,这个对于线性回归来说也是一个非常耗时的操作。Spark 将 RDDs 所有元素以 java 对象的形式存储在内存中,进而避免了上述说的所有的耗时
......@@ -354,7 +354,7 @@ spark 目前提供了一个 checkpointing 的 api( persist 中的标识为 REP
**内存中分析**: Conviva Inc 是一个视频提供商,他们用 spark 来加速之前在 hadoop 上运行的几个数据报表分析。比如,其中一个报表是运行一系列的 Hive 查询来计算一个用户的各种统计信息。这些查询都是基于相同的数据子集(基于自定义的过滤器过滤出来的数据)但是需要很多 MapReduce 任务来为分组字段进行聚合运算(平均值、百分位数值以及 count distinct)。将这些数据子集创建成一个可以共享的 spark 中的 RDD 来实现这些查询使的这个报表的速度提升了 40 倍。对 200 GB 已经压缩的数据在 hadoop 集群上跑这个报表花了 20 个小时,但是利用 2 台机器的 spark 只用了 30 分钟而已。此外,spark 程序只花了 96 G 的内存,因为只需要将报表关心的列数据存储在内存中进行共享就行,而不是所有的解压后的数据.
**交通模型**:伯克利分校的 Mobile Millennium 项目组的研究员在收集到的零星的汽车的 GPS 信息上并行运行一个机器学习算法试图推断出道路交通是否拥挤。在都市区域道路网络中的 10000 条道路和 600000 个装有 GPS 设备的汽车点对点的旅行时间(每一条路线的旅行时间可能包含了多条道路)样本是数据源。利用交通模型可以估算出通过每一条独立的道路需要多长时间。研究人员利用 EM 算法来训练模型,这个算法在迭代的过程中重复执行 map 和 reduceByKey 步骤。这个应用近似线性的将机器规模从 20 台扩展到 80 台,每台机器 4 个 cores,如图 13( a )所示.
**交通模型**:伯克利分校的 Mobile Millennium 项目组的研究员在收集到的零星的汽车的 GPS 信息上并行运行一个机器学习算法试图推断出道路交通是否拥挤。在都市区域道路网络中的 10000 条道路和 600000 个装有 GPS 设备的汽车点对点的旅行时间(每一条路线的旅行时间可能包含了多条道路)样本是数据源。利用交通模型可以估算出通过每一条独立的道路需要多长时间。研究人员利用 EM 算法来训练模型,这个算法在迭代的过程中重复执行 map 和 reduceByKey 步骤。这个应用近似线性的将机器规模从 20 台扩展到 80 台,每台机器 4 个 cores,如图 13(a)所示.
![](img/c4abf9f69923cfef574f0e484b041d61.jpg) 图十三:两个用 spark 实现的用户应用的每次迭代的时间,误差线表示标准误差
......@@ -362,24 +362,24 @@ spark 目前提供了一个 checkpointing 的 api( persist 中的标识为 REP
### 6.6 交互性的数据挖掘
为了演示 spark 在交互查询大数据集的能力,我们来分析 1 TB 的维基页面访问日志数据( 2 年的数据)。在这个实验中,我们使用 100 个 m 2.4 xlarge EC 2 实例,每一个实例 8 个 cores 以及 68 G 内存。我们查询出( 1 )所有页面的浏览量,( 2 )页面标题等于某个单词的页面的浏览量以及( 3 )页面标题部分的等于某个单词的页面的浏览量。每一个查询都是扫描整个输入数据集.
为了演示 spark 在交互查询大数据集的能力,我们来分析 1 TB 的维基页面访问日志数据(2 年的数据)。在这个实验中,我们使用 100 个 m 2.4 xlarge EC 2 实例,每一个实例 8 个 cores 以及 68 G 内存。我们查询出(1)所有页面的浏览量,(2)页面标题等于某个单词的页面的浏览量以及(3)页面标题部分的等于某个单词的页面的浏览量。每一个查询都是扫描整个输入数据集.
图十四展示的分别是查询整个数据集、一半数据集一集十分之一的数据集的响应时间。即使是 1 TB 的数据,用 spark 来查询仅仅花了 5-7 秒而已.这个比查询磁盘数据的速度快了一个数量级,比如,查询磁盘文件中的 1 TB 数据需要 170 秒.这个可以说明 RDDs 使的 spark 是一个非常强大的交互型数据挖掘的工具.
## 7 讨论
虽然由于 RDDs 的天然不可变性以及粗粒度的转换导致它们似乎提供了有限制的编程接口,但是我们发现它们适合很多类型的应用。特别的,RDDs 可以表达出现在各种各样的框架提出的编程模型,而且还可以将这些模型组合在同一个程序中(比如跑一个 MapReduce 任务来创建一个图,然后基于这个图来运行 Pregel )以及可以在这些模型中共享数据。在这一章中,我们在第 7.1 节中讨论 RDDs 可以表达哪些模型以及为什么适合表达这些编程模型。另外,我们在第 7.2 节中讨论我们推崇的 RDD 的血缘信息的好处,利用这些信息可以帮助我们 debug 模型.
虽然由于 RDDs 的天然不可变性以及粗粒度的转换导致它们似乎提供了有限制的编程接口,但是我们发现它们适合很多类型的应用。特别的,RDDs 可以表达出现在各种各样的框架提出的编程模型,而且还可以将这些模型组合在同一个程序中(比如跑一个 MapReduce 任务来创建一个图,然后基于这个图来运行 Pregel)以及可以在这些模型中共享数据。在这一章中,我们在第 7.1 节中讨论 RDDs 可以表达哪些模型以及为什么适合表达这些编程模型。另外,我们在第 7.2 节中讨论我们推崇的 RDD 的血缘信息的好处,利用这些信息可以帮助我们 debug 模型.
### 7.1 已经存在的编程模型的表达
对于到目前为止很多独立提出的编程模型,RDDs 都可以高效的表达出来。这里所说的 “高效”,不仅仅是指使用 RDDs 的输出结果和独立提出的编程模型狂简的输出结果是一致的,而且 RDDs 在优化性能方面比这些框架还要强大,比如将特定的数据保存在内存中、对数据分区以减少网络传输以及高效的从错误中恢复。可以用 RDDs 表达的模型如下:
* MapReduce:可以利用 spark 中的 flatMap 和 groupByKey 操作来表达这个模型,或者如果需要聚合的话可以使用 reduceByKey .
* DryadLINQ:DryadLINQ 系统比 MapReduce 更多的操作,但是这些操作都是直接和 RDD 的转换操作( map,groupByKey,join 等)对应的批量操作.
* DryadLINQ:DryadLINQ 系统比 MapReduce 更多的操作,但是这些操作都是直接和 RDD 的转换操作(map,groupByKey,join 等)对应的批量操作.
* SQL:和 DryadLINQ 一样,SQL 查询都是对一个数据集进行并行的操作计算.
* Pregel:Google 的 Pregel 是一个专门解决迭代图计算应用的模型,它一开始看起来和面向数据集的编程模型的其他系统完全不同.在 Pregel 中,一个程序运行一些列的相互协调的“ supersteps ”.在每一个 superstep 上,对图上的每一个顶点运行用户自定义的函数来更新这个顶点的相关的状态、改变图的拓扑结构以及向其他顶点发送下一个 superstep 需要的消息.这种模型可以表达非常多的图计算算法,包括最短路径、二部图匹配以及 PageRank.
Pregel 在每一次迭代中都是对所有顶点应用相同的用户定义的函数,这个是使的我们用 RDDs 来实现这个模型的关键点。因此,每次迭代后,我们都可以将顶点的状态保存在 RDD 中,然后执行一个批量转换操作( apply )来应用这个函数以及生成一个消息的 RDD。然后我们可以用这个 RDD 通顶点的状态进行 join 来完成消息的交换。和 Pregel 一样,RDDs 允许将点的状态保存在内存中、控制它们的分区以减少网络通讯以及指出从失败中恢复。我们在 spark 上用了 200 行代码的包实现了 Pregel,读者可以参考第 33 个文献来了解更多的细节
Pregel 在每一次迭代中都是对所有顶点应用相同的用户定义的函数,这个是使的我们用 RDDs 来实现这个模型的关键点。因此,每次迭代后,我们都可以将顶点的状态保存在 RDD 中,然后执行一个批量转换操作(apply)来应用这个函数以及生成一个消息的 RDD。然后我们可以用这个 RDD 通顶点的状态进行 join 来完成消息的交换。和 Pregel 一样,RDDs 允许将点的状态保存在内存中、控制它们的分区以减少网络通讯以及指出从失败中恢复。我们在 spark 上用了 200 行代码的包实现了 Pregel,读者可以参考第 33 个文献来了解更多的细节
* 迭代 MapReduce:最近提出的几个系统,包括 HaLoop 和 Twister,它们提供了可以让用户循环跑一系列的 MapReduce 任务的迭代式 MapReduce 模型.这些系统在迭代之间保持数据分区一致,Twister 也可以将数据保存在内存中。RDDs 可以很简单的表达以上两个优化,而且我们基于 spark 花了 200 行代码实现了 HaLoop.
* 批量流处理: 研究人员最近提出了一些增量处理系统,这些系统是为定期接受新数据然后根据数据更新结果的应用服务的.比如,一个应用需要实时接收新数据,然后每 15 分钟就将接收到的数据和前面 15 分钟的时间窗口的数据进行 join 聚合,将聚合的结果更新到统计数据中.这些系统执行和 Dryad 类似的批处理,但是它们将应用的状态数据存储在分布式系统中.将中间结果放在 RDDs 中可以提高处理速度.
......@@ -402,7 +402,7 @@ Pregel 在每一次迭代中都是对所有顶点应用相同的用户定义的
* 第一,像 MapReduce,Dryad 以及 Ciel 一样支持一系列处理数据的操作,并且需要通过稳定的存储系统来共享数据,RDDs 表达了一种比稳定存储系统更高效的数据共享抽象,因为它避免了数据备份、 I / O 以及序列化的成本.
* 第二,几个数据流系统的高层面上的编程接口,包括 DryadLINQ 和 FlumeJava ,它们提供了语言集成 api,使的用户可以通过像 map 和 join 等操作来操作并行的集合.然而,在这些系统中,并行的集合是指在磁盘上的文件或者一个查询计划表达的临时数据集.虽然这些系统在相同的查询中的操作之间组装数据的 pipeline(比如,一个 map 操作后跟另外一个 map ),但是它们不能在查询之间进行高效的数据共享.我们在并行集合模式上建立 spark api,是由于它的便利性以及在集成语言接口上不要求新颖性,但是我们基于在这些接口背后以 RDDs 作为存储抽象,就可以使的 spark 支持大量类型的应用了.
* 第二,几个数据流系统的高层面上的编程接口,包括 DryadLINQ 和 FlumeJava ,它们提供了语言集成 api,使的用户可以通过像 map 和 join 等操作来操作并行的集合.然而,在这些系统中,并行的集合是指在磁盘上的文件或者一个查询计划表达的临时数据集.虽然这些系统在相同的查询中的操作之间组装数据的 pipeline(比如,一个 map 操作后跟另外一个 map),但是它们不能在查询之间进行高效的数据共享.我们在并行集合模式上建立 spark api,是由于它的便利性以及在集成语言接口上不要求新颖性,但是我们基于在这些接口背后以 RDDs 作为存储抽象,就可以使的 spark 支持大量类型的应用了.
* 第三种系统为许多专门的需要数据共享的应用提供了高层的接口.比如,pregel 支持迭代式的图计算应用、 Twister 和 HaLoop 支持迭代式的 MapReduce .然而,这些框架只是为他们支持的计算模型隐式的共享数据,并没有提供可以让用户根据自己的需求显式的共享数据的通用抽象.比如,一个用户不能用 Pregel 或者 Twister 将数据加载到内存中然后决定在数据集上面跑什么样的查询。RDDs 提供了一个显式的分布式存储抽象,因此可以支持那些特殊系统不能支持的应用,比如交互式数据挖掘.
最后,一些系统暴露共享可变状态以使的用户可以执行内存计算。比如,Piccolo 使的用户通过分布式的函数来读取和更新分布式 hash 表中的单元格数据。DSM 和像 RAMCloud 这样的 key - value 存储系统提供了类似的模型。RDDs 和这些系统有两个方面的不同,第一,RDDs 基于像 map,sot 以及 join 等操作提供了高层的编程接口,然而,在 Piccolo 和 DSM 中的接口只是读取和更新表的单元格数据。第二,Piccolo 和 DSM 通过 checkpoint 和回滚机制实现容错,在许多应用中这种机制比机遇血缘机制的 RDDs 的容错的成本更大。最后,如 2.3 小节讨论的,相对于 DSM,RDDs 还提供了其他的优势功能,比如执行慢的 task 的处理机制
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册