提交 e47e92ad 编写于 作者: 取昵称好难啊's avatar 取昵称好难啊

" ," to ","

上级 08406520
......@@ -27,7 +27,7 @@ Spark 除了运行在 Mesos 或者 YARN 上以外,Spark 还提供了一个简
./sbin/start-master.sh
```
一旦启动,master 将会为自己打印出一个 `spark://HOST:PORT` URL,您可以使用它来连接 workers,或者像传递 “master” 参数一样传递到 `SparkContext`。您在 master 的web UI 上也会找到这个 URL ,默认情况下是 [http://localhost:8080](http://localhost:8080)
一旦启动,master 将会为自己打印出一个 `spark://HOST:PORT` URL,您可以使用它来连接 workers,或者像传递 “master” 参数一样传递到 `SparkContext`。您在 master 的web UI 上也会找到这个 URL,默认情况下是 [http://localhost:8080](http://localhost:8080)
类似地,您可以启动一个或多个 workers 并且通过下面的代码连接到 master :
......@@ -43,8 +43,8 @@ Spark 除了运行在 Mesos 或者 YARN 上以外,Spark 还提供了一个简
| --- | --- |
| `-h HOST``--host HOST` | 监听的 Hostname |
| `-i HOST``--ip HOST` | 监听的 Hostname (已弃用,请使用 -h or --host) |
| `-p PORT``--port PORT` | 监听的服务 Port (端口) (默认: master 是 7077 , worker 是随机的) |
| `--webui-port PORT` | web UI 的端口(默认: master 是 8080 , worker 是 8081) |
| `-p PORT``--port PORT` | 监听的服务 Port (端口) (默认: master 是 7077, worker 是随机的) |
| `--webui-port PORT` | web UI 的端口(默认: master 是 8080, worker 是 8081) |
| `-c CORES``--cores CORES` | Spark 应用程序在机器上可以使用的全部的 CPU 核数(默认是全部可用的);这个选项仅在 worker 上可用 |
| `-m MEM``--memory MEM` | Spark 应用程序可以使用的内存数量,格式像 1000M 或者 2G(默认情况是您的机器内存数减去 1 GB);这个选项仅在 worker 上可用 |
| `-d DIR``--work-dir DIR` | 用于 scratch space (暂存空间)和作业输出日志的目录(默认是:SPARK_HOME/work);这个选项仅在 worker 上可用 |
......@@ -52,7 +52,7 @@ Spark 除了运行在 Mesos 或者 YARN 上以外,Spark 还提供了一个简
# 集群启动脚本
要使用启动脚本启动 Spark standalone 集群,你应该首先在 Spark 目录下创建一个叫做 conf/slaves 的文件,这个文件中必须包含所有你想要启动的 Spark workers 的机器的 hostname ,每个 hostname 占一行。如果 conf/slaves 不存在,启动脚本默认启动单个机器(localhost),这对于测试是有效的。注意, master 机器通过 ssh 访问所有的 worker 机器。默认情况下,ssh 是 parallel (并行)运行的并且需要配置无密码(使用一个私钥)的访问。如果您没有设置无密码访问,您可以设置环境变量 SPARK_SSH_FOREGROUND 并且为每个 worker 提供一个密码。
要使用启动脚本启动 Spark standalone 集群,你应该首先在 Spark 目录下创建一个叫做 conf/slaves 的文件,这个文件中必须包含所有你想要启动的 Spark workers 的机器的 hostname,每个 hostname 占一行。如果 conf/slaves 不存在,启动脚本默认启动单个机器(localhost),这对于测试是有效的。注意, master 机器通过 ssh 访问所有的 worker 机器。默认情况下,ssh 是 parallel (并行)运行的并且需要配置无密码(使用一个私钥)的访问。如果您没有设置无密码访问,您可以设置环境变量 SPARK_SSH_FOREGROUND 并且为每个 worker 提供一个密码。
一旦您创建了这个文件,您就可以启动或者停止您的集群使用下面的 shell 脚本,基于 Hadoop 的部署脚本,并在 `SPARK_HOME/sbin` 中可用:
......@@ -97,7 +97,7 @@ SPARK_MASTER_OPTS 支持以下系统属性:
|
| `spark.deploy.spreadOut` | true | 这个选项控制 standalone 集群 manager 是应该跨界店 spread (传播)应用程序还是应该努力将应用程序整合到尽可能少的节点上。在 HDFS 中, Spreading 是数据本地化的更好的选择,但是对于计算密集型的负载,整合会更有效率。
|
| `spark.deploy.defaultCores` | (infinite) | 如果没有设置 `spark.cores.max` ,在 Spark 的 standalone 模式下默认分配给应用程序的 cores (核)数。如果没有设置,应用程序将总是获得所有的可用核,除非设置了 `spark.cores.max`。在共享集群中设置较低的核数,可用于防止用户 grabbing (抓取)整个集群。
| `spark.deploy.defaultCores` | (infinite) | 如果没有设置 `spark.cores.max`,在 Spark 的 standalone 模式下默认分配给应用程序的 cores (核)数。如果没有设置,应用程序将总是获得所有的可用核,除非设置了 `spark.cores.max`。在共享集群中设置较低的核数,可用于防止用户 grabbing (抓取)整个集群。
|
| `spark.deploy.maxExecutorRetries` | 10 | 限制在 standalone 集群 manager 删除一个不正确地应用程序之前可能发生的 back-to-back 执行器失败的最大次数。如果一个应用程序有任何正在运行的执行器,则它永远不会被删除。如果一个应用程序经历过超过 `spark.deploy.maxExecutorRetries` 次的连续失败,没有执行器成功开始运行在这些失败之间,并且应用程序没有运行着的执行器,然后 standalone 集群 manager 将会移除这个应用程序并将它标记为失败。要禁用这个自动删除功能,设置`spark.deploy.maxExecutorRetries``-1`
|
......@@ -116,7 +116,7 @@ SPARK_WORKER_OPTS 支持以下的系统属性:
要在 Spark 集群中运行一个应用程序,只需要简单地将 master 的 `spark://IP:PORT` URL 传递到 [`SparkContext` constructor](programming-guide.html#initializing-spark)
要针对集群运行交互式 Spark shell ,运行下面的命令:
要针对集群运行交互式 Spark shell,运行下面的命令:
```
./bin/spark-shell --master spark://IP:PORT
......@@ -128,7 +128,7 @@ SPARK_WORKER_OPTS 支持以下的系统属性:
[`spark-submit` 脚本](submitting-applications.html) 提供了最简单的方法将一个编译好的 Spark 应用程序提交到集群中。对于 standalone 集群,Spark 目前支持两种部署模式。在 `client` 模式下,driver 在与 client 提交应用程序相同的进程中启动。在 `cluster` 模式下,driver 是集群中的某个 Worker 中的进程中启动,并且 client 进程将会在完成提交应用程序的任务之后退出,而不需要等待应用程序完成再退出。
如果您的应用程序是通过 Spark 提交来启动的,则应用程序的 jar 将自动启动分发给所有的 worker 节点。对于您的应用程序依赖的其他的 jar ,您应该通过 `--jars` 标志使用逗号作为分隔符(例如 `--jars jar1,jar2`)来指定它们。要控制应用程序的配置或执行环境,请参阅 [Spark Configuration](configuration.html)
如果您的应用程序是通过 Spark 提交来启动的,则应用程序的 jar 将自动启动分发给所有的 worker 节点。对于您的应用程序依赖的其他的 jar,您应该通过 `--jars` 标志使用逗号作为分隔符(例如 `--jars jar1,jar2`)来指定它们。要控制应用程序的配置或执行环境,请参阅 [Spark Configuration](configuration.html)
另外,standalone `cluster` 模式支持自动重新启动应用程序如果它是以非零的退出码退出的。要使用此功能,您可以在启动您的应用程序的时候将 `--supervise` 标志传入 `spark-submit`。然后,如果您想杀死一个重复失败的应用程序,您可以使用如下方式:
......@@ -170,7 +170,7 @@ export SPARK_MASTER_OPTS="-Dspark.deploy.defaultCores=<value>"
Spark 的 standalone 模式提供了一个基于 web 的用户接口来监控集群。master 和每个 worker 都有它自己的显示集群和作业信息的 web UI。默认情况下,您可以通过 master 的 8080 端口来访问 web UI。这个端口可以通过配置文件修改或者通过命令行选项修改。
此外,对于每个作业的详细日志输出也会写入到每个 slave 节点的工作目录中。(默认是 `SPARK_HOME/work`)。你会看到每个作业的两个文件,分别是 `stdout``stderr` ,其中所有输出都写入其控制台。
此外,对于每个作业的详细日志输出也会写入到每个 slave 节点的工作目录中。(默认是 `SPARK_HOME/work`)。你会看到每个作业的两个文件,分别是 `stdout``stderr`,其中所有输出都写入其控制台。
# 与 Hadoop 集成
......@@ -196,7 +196,7 @@ Spark 对网络的需求比较高,并且一些环境对于使用严格的防
为了启用这个恢复模式,您可以在 spark-env 中设置 SPARK_DAEMON_JAVA_OPTS 通过配置 `spark.deploy.recoveryMode` 和相关的 spark.deploy.zookeeper.* 配置。有关这些配置的更多信息,请参阅 [配置文档](configuration.html#deploy)
可能的陷阱:如果您在您的集群中有多个 Masters 但是没有正确地配置 Masters 使用 ZooKeeper , Masters 将无法相互发现,并认为它们都是 leader。这将不会形成一个健康的集群状态(因为所有的 Masters 将会独立调度)。
可能的陷阱:如果您在您的集群中有多个 Masters 但是没有正确地配置 Masters 使用 ZooKeeper, Masters 将无法相互发现,并认为它们都是 leader。这将不会形成一个健康的集群状态(因为所有的 Masters 将会独立调度)。
**细节**
......@@ -227,4 +227,4 @@ ZooKeeper 是生产级别的高可用性的最佳方法,但是如果您只是
* 该解决方案可以与像 [monit](http://mmonit.com/monit/) 这样的过程 monitor/manager 一起使用,或者只是通过重新启动手动恢复。
* 尽管文件系统恢复似乎比完全没有任何恢复更好,但是对于某些特定的开发或者实验目的,此模式可能不太适合。特别是,通过 stop-master.sh 杀死 master 并不会清除其恢复状态,所以每当重新启动一个新的 Master 时,它将进入恢复模式。如果需要等待所有先前注册的 Worker/clients 超时,这可能会将启动时间增加 1 分钟。
* 虽然没有正式的支持,你也可以挂载 NFS 目录作为恢复目录。如果 original Master w安全地死亡,则您可以在不同的节点上启动 Master ,这将正确恢复所有以前注册的 Workers/applications (相当于 ZooKeeper 恢复)。然而,未来的应用程序必须能够找到新的 Master 才能注册。
\ No newline at end of file
* 虽然没有正式的支持,你也可以挂载 NFS 目录作为恢复目录。如果 original Master w安全地死亡,则您可以在不同的节点上启动 Master,这将正确恢复所有以前注册的 Workers/applications (相当于 ZooKeeper 恢复)。然而,未来的应用程序必须能够找到新的 Master 才能注册。
\ No newline at end of file
......@@ -47,7 +47,7 @@ Spark 2.2.0 专门为 Mesos 1.0.0 或更新的版本并且不需要 Mesos 的任
## 从源码安装
通过源码安装 Apache Mesos ,请按照以下步骤:
通过源码安装 Apache Mesos,请按照以下步骤:
1. 从镜像网站 [mirror](http://www.apache.org/dyn/closer.lua/mesos/1.0.0/) 下载一个 Mesos 发行版。
2. 按照 Mesos 的 [快速开始](http://mesos.apache.org/gettingstarted) 页面来编译和安装 Mesos。
......@@ -67,17 +67,17 @@ Mesosphere 安装文档建议安装 ZooKeeper 来处理 Mesos master 故障切
## 验证
要验证 Mesos 集群是否已经准备好用于 Spark ,请导航到 Mesos master 的 webui 界面,端口是: `:5050` 来确认所有预期的机器都在 slaves 选项卡中。
要验证 Mesos 集群是否已经准备好用于 Spark,请导航到 Mesos master 的 webui 界面,端口是: `:5050` 来确认所有预期的机器都在 slaves 选项卡中。
# 连接 Spark 到 Mesos
要使用 Spark 中的 Mesos ,您需要一个 Spark 的二进制包放到 Mesos 可以访问的地方,然后一个 Spark driver 程序配置来连接 Mesos。
要使用 Spark 中的 Mesos,您需要一个 Spark 的二进制包放到 Mesos 可以访问的地方,然后一个 Spark driver 程序配置来连接 Mesos。
或者,您也可以将 Spark 安装在所有 Mesos slaves 中的相同位置,并且配置 `spark.mesos.executor.home` (默认是 SPARK_HOME)来指向该位置。
## 上传 Spark 包
当 Mesos 第一次在 Mesos slave 上运行任务的时候,这个 slave 必须有一个 Spark binary package (Spark 二进制包)用于执行 Spark Mesos executor backend (执行器后端)。Spark 软件包可以在任何 Hadoop 可访问的 URI 上托管,包括 HTTP 通过 `http://`[Amazon Simple Storage Service](http://aws.amazon.com/s3) 通过 `s3n://` ,或者 HDFS 通过 `hdfs://`
当 Mesos 第一次在 Mesos slave 上运行任务的时候,这个 slave 必须有一个 Spark binary package (Spark 二进制包)用于执行 Spark Mesos executor backend (执行器后端)。Spark 软件包可以在任何 Hadoop 可访问的 URI 上托管,包括 HTTP 通过 `http://`[Amazon Simple Storage Service](http://aws.amazon.com/s3) 通过 `s3n://`,或者 HDFS 通过 `hdfs://`
要使用预编译的包:
......@@ -137,9 +137,9 @@ val sc = new SparkContext(conf)
Spark on Mesos 还支持 cluster mode (集群模式),其中 driver 在集群中启动并且 client(客户端)可以在 Mesos Web UI 中找到 driver 的 results。
要使用集群模式,你必须在您的集群中通过 `sbin/start-mesos-dispatcher.sh` 脚本启动 `MesosClusterDispatcher` ,传入 Mesos master URL (例如:mesos://host:5050)。这将启动 `MesosClusterDispatcher` 作为在主机上运行的守护程序。
要使用集群模式,你必须在您的集群中通过 `sbin/start-mesos-dispatcher.sh` 脚本启动 `MesosClusterDispatcher`,传入 Mesos master URL (例如:mesos://host:5050)。这将启动 `MesosClusterDispatcher` 作为在主机上运行的守护程序。
如果您喜欢使用 Marathon 来运行 `MesosClusterDispatcher` ,您需要在 foreground (前台)运行 `MesosClusterDispatcher` (即 `bin/spark-class org.apache.spark.deploy.mesos.MesosClusterDispatcher`)。注意,`MesosClusterDispatcher` 尚不支持 HA 的多个实例。
如果您喜欢使用 Marathon 来运行 `MesosClusterDispatcher`,您需要在 foreground (前台)运行 `MesosClusterDispatcher` (即 `bin/spark-class org.apache.spark.deploy.mesos.MesosClusterDispatcher`)。注意,`MesosClusterDispatcher` 尚不支持 HA 的多个实例。
`MesosClusterDispatcher` 还支持将 recovery state (恢复状态)写入 Zookeeper。这将允许 `MesosClusterDispatcher` 能够在重新启动时恢复所有提交和运行的 containers (容器)。为了启用这个恢复模式,您可以在 spark-env 中通过配置 `spark.deploy.recoveryMode` 来设置 SPARK_DAEMON_JAVA_OPTS 和相关的 spark.deploy.zookeeper.* 配置。有关这些配置的更多信息,请参阅配置 [doc](configurations.html#deploy)
......@@ -163,7 +163,7 @@ Spark on Mesos 还支持 cluster mode (集群模式),其中 driver 在集
请注意,传入到 spark-submit 的 jars 或者 python 文件应该是 Mesos slaves 可访问的 URIs ,因为 Spark driver 不会自动上传本地 jars。
请注意,传入到 spark-submit 的 jars 或者 python 文件应该是 Mesos slaves 可访问的 URIs,因为 Spark driver 不会自动上传本地 jars。
# Mesos 运行模式
......@@ -179,11 +179,11 @@ Spark 可以以两种模式运行 Mesos : “coarse-grained(粗粒度)”
请参阅 [Spark 配置](configuration.html) 页面来了解细节和默认值。
当应用程序启动时,执行器就会高涨,直到达到 `spark.cores.max`。如果您没有设置 `spark.cores.max` ,Spark 应用程序将会保留 Mesos 提供的所有的资源,因此我们当然会敦促您在任何类型的多租户集群上设置此变量,包括运行多个并发 Spark 应用程序的集群。
当应用程序启动时,执行器就会高涨,直到达到 `spark.cores.max`。如果您没有设置 `spark.cores.max`,Spark 应用程序将会保留 Mesos 提供的所有的资源,因此我们当然会敦促您在任何类型的多租户集群上设置此变量,包括运行多个并发 Spark 应用程序的集群。
调度程序将会在提供的 Mesos 上启动执行器循环给它,但是没有 spread guarantees (传播保证),因为 Mesos 不提供这样的保证在提供流上。
在这个模式下 spark 执行器将遵守 port (端口)分配如果这些事由用户提供的。特别是如果用户在 Spark 配置中定义了 `spark.executor.port` 或者 `spark.blockManager.port` ,mesos 调度器将检查有效端口的可用 offers 包含端口号。如果没有这样的 range 可用,它会不启动任何任务。如果用户提供的端口号不受限制,临时端口像往常一样使用。如果用户定义了一个端口,这个端口实现意味着 one task per host (每个主机一个任务)。在未来网络中,isolation 将被支持。
在这个模式下 spark 执行器将遵守 port (端口)分配如果这些事由用户提供的。特别是如果用户在 Spark 配置中定义了 `spark.executor.port` 或者 `spark.blockManager.port`,mesos 调度器将检查有效端口的可用 offers 包含端口号。如果没有这样的 range 可用,它会不启动任何任务。如果用户提供的端口号不受限制,临时端口像往常一样使用。如果用户定义了一个端口,这个端口实现意味着 one task per host (每个主机一个任务)。在未来网络中,isolation 将被支持。
粗粒度模式的好处是开销要低得多,但是在应用程序的整个持续时间内保留 Mesos 资源的代价。要配置您的作业以动态调整资源需求,请参阅 [动态分配](#dynamic-resource-allocation-with-mesos)
......@@ -215,7 +215,7 @@ conf.set("spark.mesos.constraints", "os:centos7;us-east-1:false")
例如,假设将 `spark.mesos.constraints` 设置为 `os:centos7;us-east-1:false` ,然后将检查资源 offers 以查看它们是否满足这两个约束,然后才会被接受以启动新的执行器。
例如,假设将 `spark.mesos.constraints` 设置为 `os:centos7;us-east-1:false`,然后将检查资源 offers 以查看它们是否满足这两个约束,然后才会被接受以启动新的执行器。
# Mesos Docker 支持
......@@ -225,11 +225,11 @@ Spark 可以通过在您的 [SparkConf](configuration.html#spark-properties) 中
需要 Mesos 的 0.20.1 版本或者更高版本。
请注意,默认情况下,如果 agent (代理程序中)的 Mesos 代理已经存在,则 Mesos agents 将不会 pull 图像。如果您使用 mutable image tags (可变图像标签)可以将 `spark.mesos.executor.docker.forcePullImage` 设置为 `true` ,以强制 agent 总是在运行执行器之前拉取 image。Force pulling images (强制拉取图像)仅在 Mesos 0.22 版本及以上版本中可用。
请注意,默认情况下,如果 agent (代理程序中)的 Mesos 代理已经存在,则 Mesos agents 将不会 pull 图像。如果您使用 mutable image tags (可变图像标签)可以将 `spark.mesos.executor.docker.forcePullImage` 设置为 `true`,以强制 agent 总是在运行执行器之前拉取 image。Force pulling images (强制拉取图像)仅在 Mesos 0.22 版本及以上版本中可用。
# 集成 Hadoop 运行
您可以在现有的 Hadoop 集群集成运行 Spark 和 Mesos ,只需要在机器上启动他们作为分开的服务即可。要从 Spark 访问 Hadoop 数据,需要一个完整的 `hdfs://` URL (通常为 `hdfs://&lt;namenode&gt;:9000/path`),但是您可以在 Hadoop Namenode web UI 上找到正确的 URL。
您可以在现有的 Hadoop 集群集成运行 Spark 和 Mesos,只需要在机器上启动他们作为分开的服务即可。要从 Spark 访问 Hadoop 数据,需要一个完整的 `hdfs://` URL (通常为 `hdfs://&lt;namenode&gt;:9000/path`),但是您可以在 Hadoop Namenode web UI 上找到正确的 URL。
此外,还可以在 Mesos 上运行 Hadoop MapReduce,以便在两者之间实现更好的资源隔离和共享。在这种情况下,Mesos 将作为统一的调度程序,将 Core 核心分配给 Hadoop 或 Spark,而不是通过每个节点上的 Linux 调度程序共享资源。请参考 [Hadoop on Mesos](https://github.com/mesos/hadoop)
......@@ -237,7 +237,7 @@ Spark 可以通过在您的 [SparkConf](configuration.html#spark-properties) 中
Mesos 仅支持使用粗粒度模式的动态分配,这可以基于应用程序的统计信息调整执行器的数量。有关一般信息,请参阅 [Dynamic Resource Allocation](job-scheduling.html#dynamic-resource-allocation)
要使用的外部 Shuffle 服务是 Mesos Shuffle 服务。它在 Shuffle 服务之上提供 shuffle 数据清理功能,因为 Mesos 尚不支持通知另一个框架的终止。要启动它,在所有从节点上运 `$SPARK_HOME/sbin/start-mesos-shuffle-service.sh` ,并将 `spark.shuffle.service.enabled` 设置为`true`
要使用的外部 Shuffle 服务是 Mesos Shuffle 服务。它在 Shuffle 服务之上提供 shuffle 数据清理功能,因为 Mesos 尚不支持通知另一个框架的终止。要启动它,在所有从节点上运 `$SPARK_HOME/sbin/start-mesos-shuffle-service.sh`,并将 `spark.shuffle.service.enabled` 设置为`true`
这也可以通过 Marathon,使用唯一的主机约束和以下命令实现 : `bin/spark-class org.apache.spark.deploy.mesos.MesosExternalShuffleService`
......@@ -284,7 +284,7 @@ key1=val1,key2=val2,key3=val3
* 如果没有作为约束的一部分存在的值,则将接受具有相应属性的任何报价(没有值检查)。
|
| `spark.mesos.containerizer` | `docker` | 这只影响 docker containers ,而且必须是 "docker" 或 "mesos"。Mesos 支持两种类型 docker 的 containerizer:"docker" containerizer,和首选 "mesos" containerizer。在这里阅读更多:http://mesos.apache.org/documentation/latest/container-image/ |
| `spark.mesos.containerizer` | `docker` | 这只影响 docker containers,而且必须是 "docker" 或 "mesos"。Mesos 支持两种类型 docker 的 containerizer:"docker" containerizer,和首选 "mesos" containerizer。在这里阅读更多:http://mesos.apache.org/documentation/latest/container-image/ |
| `spark.mesos.driver.webui.url` | `(none)` | 设置 Spark Mesos 驱动程序 Web UI URL 以与框架交互。如果取消设置,它将指向 Spark 的内部 Web UI。 |
| `spark.mesos.driverEnv.[EnvironmentVariableName]` | `(none)` | 这仅影响以群集模式提交的驱动程序。添加由EnvironmentVariableName指定的环境变量驱动程序进程。用户可以指定多个这些设置多个环境变量。 |
| `spark.mesos.dispatcher.webui.url` | `(none)` | 设置 Spark Mesos 分派器 Web UI URL 以与框架交互。如果取消设置,它将指向 Spark 的内部 Web UI。 |
......
......@@ -40,7 +40,7 @@ $ ./bin/spark-shell --master yarn --deploy-mode client
## 添加其他的 JARs
`cluster` 集群模式下,driver 在与客户端不同的机器上运行,因此 `SparkContext.addJar` 将不会立即使用客户端本地的文件运行。要使客户端上的文件可用于 `SparkContext.addJar` ,请在启动命令中使用 `--jars` 选项来包含这些文件。
`cluster` 集群模式下,driver 在与客户端不同的机器上运行,因此 `SparkContext.addJar` 将不会立即使用客户端本地的文件运行。要使客户端上的文件可用于 `SparkContext.addJar`,请在启动命令中使用 `--jars` 选项来包含这些文件。
```
$ ./bin/spark-submit --class my.main.Class \
......@@ -53,9 +53,9 @@ $ ./bin/spark-submit --class my.main.Class \
# 准备
在 YARN 上运行 Spark 需要使用 YARN 支持构建的二进制分布式的 Spark (a binary distribution of Spark)。二进制文件(binary distributions)可以从项目网站的 [下载页面](http://spark.apache.org/downloads.html) 下载。要自己构建 Spark ,请参考 [构建 Spark](building-spark.html)
在 YARN 上运行 Spark 需要使用 YARN 支持构建的二进制分布式的 Spark (a binary distribution of Spark)。二进制文件(binary distributions)可以从项目网站的 [下载页面](http://spark.apache.org/downloads.html) 下载。要自己构建 Spark,请参考 [构建 Spark](building-spark.html)
要使 Spark 运行时 jars 可以从 YARN 端访问,您可以指定 `spark.yarn.archive` 或者 `spark.yarn.jars`。更多详细的信息,请参阅 [Spark 属性](running-on-yarn.html#spark-properties)。如果既没有指定 `spark.yarn.archive` 也没有指定 `spark.yarn.jars` ,Spark 将在 `$SPARK_HOME/jars` 目录下创建一个包含所有 jar 的 zip 文件,并将其上传到 distributed cache(分布式缓存)中。
要使 Spark 运行时 jars 可以从 YARN 端访问,您可以指定 `spark.yarn.archive` 或者 `spark.yarn.jars`。更多详细的信息,请参阅 [Spark 属性](running-on-yarn.html#spark-properties)。如果既没有指定 `spark.yarn.archive` 也没有指定 `spark.yarn.jars`,Spark 将在 `$SPARK_HOME/jars` 目录下创建一个包含所有 jar 的 zip 文件,并将其上传到 distributed cache(分布式缓存)中。
# 配置
......@@ -71,13 +71,13 @@ yarn logs -applicationId <app ID>
将打印来自给定的应用程序的所有容器(containers)的所有的日志文件的内容。你还可以使用 HDFS shell 或者 API 直接在 HDFS 中查看容器日志文件(container log files)。可以通过查看您的 YARN 配置(`yarn.nodemanager.remote-app-log-dir``yarn.nodemanager.remote-app-log-dir-suffix`)找到它们所在的目录。日志还可以在 Spark Web UI 的 “执行程序(Executors)”选项卡下找到。您需要同时运行 Spark 历史记录服务器(Spark history server) 和 MapReduce 历史记录服务器(MapReduce history server),并在 `yarn-site.xm`l 文件中正确配置 `yarn.log.server.url`。Spark 历史记录服务器 UI 上的日志将重定向您到 MapReduce 历史记录服务器以显示聚合日志(aggregated logs)。
当未启用日志聚合时,日志将在每台计算机上的本地保留在 `YARN_APP_LOGS_DIR 目录下`,通常配置为 `/tmp/logs` 或者 `$HADOOP_HOME/logs/userlogs` ,具体取决于 Hadoop 版本和安装。查看容器(container)的日志需要转到包含它们的主机并在此目录中查看它们。子目录根据应用程序 ID (application ID)和 容器 ID (container ID)组织日志文件。日志还可以在 Spark Web UI 的 “执行程序(Executors)”选项卡下找到,并且不需要运行 MapReduce history server。
当未启用日志聚合时,日志将在每台计算机上的本地保留在 `YARN_APP_LOGS_DIR 目录下`,通常配置为 `/tmp/logs` 或者 `$HADOOP_HOME/logs/userlogs`,具体取决于 Hadoop 版本和安装。查看容器(container)的日志需要转到包含它们的主机并在此目录中查看它们。子目录根据应用程序 ID (application ID)和 容器 ID (container ID)组织日志文件。日志还可以在 Spark Web UI 的 “执行程序(Executors)”选项卡下找到,并且不需要运行 MapReduce history server。
要查看每个 container(容器)的启动环境,请将 `yarn.nodemanager.delete.debug-delay-sec` 增加到一个较大的值(例如 `36000`),然后通过 `yarn.nodemanager.local-dirs` 访问应用程序缓存,在容器启动的节点上。此目录包含启动脚本(launch script), JARs ,和用于启动每个容器的所有的环境变量。这个过程对于调试 classpath 问题特别有用。(请注意,启用此功能需要集群设置的管理员权限并且还需要重新启动所有的 node managers,因此这不适用于托管集群)。
要查看每个 container(容器)的启动环境,请将 `yarn.nodemanager.delete.debug-delay-sec` 增加到一个较大的值(例如 `36000`),然后通过 `yarn.nodemanager.local-dirs` 访问应用程序缓存,在容器启动的节点上。此目录包含启动脚本(launch script), JARs,和用于启动每个容器的所有的环境变量。这个过程对于调试 classpath 问题特别有用。(请注意,启用此功能需要集群设置的管理员权限并且还需要重新启动所有的 node managers,因此这不适用于托管集群)。
要为 application master 或者 executors 使用自定义的 log4j 配置,请选择以下选项:
* 使用 `spark-submit` 上传一个自定义的 `log4j.properties` ,通过将 spark-submit 添加到要与应用程序一起上传的文件的 –files 列表中。
* 使用 `spark-submit` 上传一个自定义的 `log4j.properties`,通过将 spark-submit 添加到要与应用程序一起上传的文件的 –files 列表中。
* add `-Dlog4j.configuration=&lt;location of configuration file&gt;` to `spark.driver.extraJavaOptions` (for the driver) or `spark.executor.extraJavaOptions` (for executors). Note that if using a file, the `file:` protocol should be explicitly provided, and the file needs to exist locally on all the nodes.
* 添加 `-Dlog4j.configuration=&lt;配置文件的位置&gt;``spark.driver.extraJavaOptions`(对于驱动程序)或者 containers (对于执行者)。请注意,如果使用文件,文件: 协议(protocol )应该被显式提供,并且该文件需要在所有节点的本地存在。
* 更新 `$SPARK_CONF_DIR/log4j.properties` 文件,并且它将与其他配置一起自动上传。请注意,如果指定了多个选项,其他 2 个选项的优先级高于此选项。
......@@ -99,7 +99,7 @@ To use a custom metrics.properties for the application master and executors, upd
| `spark.yarn.am.memory` | `512m` | 在客户端模式下用于 YARN Application Master 的内存量,与 JVM 内存字符串格式相同 (e.g. `512m``2g`). 在 `cluster` 集群模式下,使用 `spark.driver.memory` 代替.使用小写字母尾后缀,例如. `k``m``g``t`,and `p`,分别表示 kibi-,mebi-,gibi-,tebi- 和 pebibytes,respectively. |
| `spark.yarn.am.cores` | `1` | 在客户端模式下用于 YARN Application Master 的核数。在集群模式下,请改用 `spark.driver.cores` 代替. |
| `spark.yarn.am.waitTime` | `100s` | 在 `cluster 集群` 模式中,YARN Application Master 等待 SparkContext 被初始化的时间. 在 `client 客户端` 模式中,YARN Application Master 等待 driver 连接它的时间. |
| `spark.yarn.submit.file.replication` | 默认的 HDFS 副本数 (通常是 `3`) | 用于应用程序上传到 HDFS 的文件的 HDFS 副本级别。这些包括诸如 Spark jar ,app jar, 和任何分布式缓存 files/archives 之类的东西。 |
| `spark.yarn.submit.file.replication` | 默认的 HDFS 副本数 (通常是 `3`) | 用于应用程序上传到 HDFS 的文件的 HDFS 副本级别。这些包括诸如 Spark jar,app jar, 和任何分布式缓存 files/archives 之类的东西。 |
| `spark.yarn.stagingDir` | 文件系统中当前用户的主目录 | 提交应用程序时使用的临时目录. |
| `spark.yarn.preserve.staging.files` | `false` | 设置为 `true` 以便在作业结束保留暂存文件(Spark jar, app jar,分布式缓存文件),而不是删除它们. |
| `spark.yarn.scheduler.heartbeat.interval-ms` | `3000` | Spark application master 心跳到 YARN ResourceManager 中的间隔(以毫秒为单位)。该值的上限为到期时间间隔的 YARN 配置值的一半,即 `yarn.am.liveness-monitor.expiry-interval-ms`. |
......@@ -115,7 +115,7 @@ To use a custom metrics.properties for the application master and executors, upd
| `spark.yarn.am.memoryOverhead` | AM memory * 0.10,最小值是 384 | 与 `spark.yarn.driver.memoryOverhead` 一样,但是只适用于客户端模式下的 YARN Application Master. |
| `spark.yarn.am.port` | (random) | 被 YARN Application Master 监听的端口。在 YARN 客户端模式下,这用于在网关(gateway)上运行的 Spark 驱动程序(driver)和在 YARN 上运行的 YARN Application Master 之间进行通信。在 YARN 集群模式下,这用于动态执行器(executor)功能,其中它处理从调度程序后端的 kill. |
| `spark.yarn.queue` | `default` | 提交应用程序(application)的 YARN 队列名称. |
| `spark.yarn.jars` | (none) | 包含要分发到 YARN 容器(container)的 Spark 代码的库(libraries)列表。默认情况下, Spark on YARN 将使用本地安装的 Spark jar,但是 Spark jar 也可以在 HDFS 上的一个任何位置都可读的位置。这允许 YARN 将其缓存在节点上,使得它不需要在每次运行应用程序时分发。例如,要指向 HDFS 上的 jars ,请将此配置设置为 `hdfs:///some/path`。允许使用 globs. |
| `spark.yarn.jars` | (none) | 包含要分发到 YARN 容器(container)的 Spark 代码的库(libraries)列表。默认情况下, Spark on YARN 将使用本地安装的 Spark jar,但是 Spark jar 也可以在 HDFS 上的一个任何位置都可读的位置。这允许 YARN 将其缓存在节点上,使得它不需要在每次运行应用程序时分发。例如,要指向 HDFS 上的 jars,请将此配置设置为 `hdfs:///some/path`。允许使用 globs. |
| `spark.yarn.archive` | (none) | 包含所需的 Spark jar 的归档(archive),以分发到 YARN 高速缓存。如果设置,此配置将替换 `spark.yarn.jars`,并且归档(archive)在所有的应用程序(application)的容器(container)中使用。归档(archive)应该在其根目录中包含 jar 文件。与以前的选项一样,归档也可以托管在 HDFS 上以加快文件分发速度. |
| `spark.yarn.access.hadoopFileSystems` | (none) | 应用程序将要访问的安全 Hadoop 文件系统的逗号分隔列表。例如,`spark.yarn.access.hadoopFileSystems=hdfs://nn1.com:8032,hdfs://nn2.com:8032, webhdfs://nn3.com:50070`. 应用程序必须能够访问列出的文件系统并且 Kerberos 必须被正确地配置为能够访问它们(在同一个 realm 域或在受信任的 realm 域)。Spark 为每个文件系统获取安全的 tokens 令牌 Spark 应用程序可以访问这些远程 Hadoop 文件系统。Spark `spark.yarn.access.namenodes` 已经过时了,请使用这个来代替. |
| `spark.yarn.appMasterEnv.[EnvironmentVariableName]` | (none) | 将由 `EnvironmentVariableName` 指定的环境变量添加到在 YARN 上启动的 Application Master 进程。用户可以指定其中的多个并设置多个环境变量。在 `cluster 集群` 模式下,这控制 Spark 驱动程序(driver)的环境,在 `client 客户端` 模式下,它只控制执行器(executor)启动器(launcher)的环境. |
......@@ -131,7 +131,7 @@ To use a custom metrics.properties for the application master and executors, upd
| `spark.yarn.tags` | (none) | 以逗号分隔的字符串列表,作为 YARN application 标记中显示的 YARN application 标记传递,可用于在查询 YARN apps 时进行过滤(filtering ). |
| `spark.yarn.keytab` | (none) | 包含上面指定的主体(principal)的 keytab 的文件的完整路径。此 keytab 将通过安全分布式缓存(Secure Distributed Cache)复制到运行 YARN Application Master 的节点,以定期更新 login tickets 和 delegation tokens.(也可以在 "local" master 下工作)。 |
| `spark.yarn.principal` | (none) | 在安全的 HDFS 上运行时用于登录 KDC 的主体(Principal ). (也可以在 "local" master 下工作) |
| `spark.yarn.config.gatewayPath` | (none) | 一个在网关主机(gateway host)(启动 Spark application 的 host)上有效的路径,但对于集群中其他节点中相同资源的路径可能不同。结合 `spark.yarn.config.replacementPath`,这个用于支持具有异构配置的集群,以便 Spark 可以正确启动远程进程.替换路径(replacement path)通常将包含对由 YARN 导出的某些环境变量(以及,因此对于 Spark 容器可见)的引用.例如,如果网关节点(gateway node)在 `/disk1/hadoop` 上安装了 Hadoop 库,并且 Hadoop 安装的位置由 YARN 作为 `HADOOP_HOME` 环境变量导出,则将此值设置为 `/disk1/hadoop` ,将替换路径(replacement path)设置为 `$HADOOP_HOME` 将确保用于启动远程进程的路径正确引用本地 YARN 配置。 |
| `spark.yarn.config.gatewayPath` | (none) | 一个在网关主机(gateway host)(启动 Spark application 的 host)上有效的路径,但对于集群中其他节点中相同资源的路径可能不同。结合 `spark.yarn.config.replacementPath`,这个用于支持具有异构配置的集群,以便 Spark 可以正确启动远程进程.替换路径(replacement path)通常将包含对由 YARN 导出的某些环境变量(以及,因此对于 Spark 容器可见)的引用.例如,如果网关节点(gateway node)在 `/disk1/hadoop` 上安装了 Hadoop 库,并且 Hadoop 安装的位置由 YARN 作为 `HADOOP_HOME` 环境变量导出,则将此值设置为 `/disk1/hadoop`,将替换路径(replacement path)设置为 `$HADOOP_HOME` 将确保用于启动远程进程的路径正确引用本地 YARN 配置。 |
| `spark.yarn.config.replacementPath` | (none) | 请看 `spark.yarn.config.gatewayPath`. |
| `spark.yarn.security.credentials.${service}.enabled` | `true` | 控制是否在启用安全性时获取服务的 credentials(凭据)。默认情况下,在这些服务时,将检索所有支持的服务的凭据配置,但如果与某些方式冲突,可以禁用该行为. 详情请见 [在安全的集群中运行](running-on-yarn.html#running-in-a-secure-cluster) |
| `spark.yarn.rolledLog.includePattern` | (none) | Java Regex 过滤与定义的包含模式匹配的日志文件,这些日志文件将以滚动的方式进行聚合。这将与 YARN 的滚动日志聚合一起使用,在 yarn-site.xml 文件中配置 `yarn.nodemanager.log-aggregation.roll-monitoring-interval-seconds` 以在 YARN 方面启用此功能. 此功能只能与 Hadoop 2.6.4+ 一起使用。Spark log4j appender 需要更改才能使用 FileAppender 或其他 appender 可以处理正在运行的文件被删除。基于在 log4j 配置中配置的文件名(如 spark.log)上,用户应该设置正则表达式(spark *)包含需要聚合的所有日志文件。 |
......
......@@ -40,7 +40,7 @@ Spark 提供了三个位置来配置系统:
Spark 属性控制大多数应用程序设置,并为每个应用程序单独配置. 这些属性可以直接在 [SparkConf](api/scala/index.html#org.apache.spark.SparkConf) 上设置并传递给您的 `SparkContext` . `SparkConf` 可以让你配置一些常见的属性(例如 master URL 和应用程序名称),以及通过 `set()` 方法来配置任意 key-value pairs (键值对). 例如,我们可以使用两个线程初始化一个应用程序,如下所示:
请注意,我们运行 local[2] ,意思是两个线程 - 代表 “最小” 并行性,这可以帮助检测在只存在于分布式环境中运行时的错误.
请注意,我们运行 local[2],意思是两个线程 - 代表 “最小” 并行性,这可以帮助检测在只存在于分布式环境中运行时的错误.
......@@ -109,11 +109,11 @@ spark.eventLog.enabled true
spark.serializer org.apache.spark.serializer.KryoSerializer
```
指定为 flags (标志)或属性文件中的任何值都将传递给应用程序并与通过 SparkConf 指定的那些值 merge (合并). 属性直接在 SparkConf 上设置采取最高优先级,然后 flags (标志)传递给 `spark-submit``spark-shell` ,然后选项在 `spark-defaults.conf` 文件中. 自从 Spark 版本的早些时候,一些 configuration keys (配置键)已被重命名 ; 在这种情况下,旧的 key names (键名)仍然被接受,但要比较新的 key 优先级都要低一些.
指定为 flags (标志)或属性文件中的任何值都将传递给应用程序并与通过 SparkConf 指定的那些值 merge (合并). 属性直接在 SparkConf 上设置采取最高优先级,然后 flags (标志)传递给 `spark-submit``spark-shell`,然后选项在 `spark-defaults.conf` 文件中. 自从 Spark 版本的早些时候,一些 configuration keys (配置键)已被重命名 ; 在这种情况下,旧的 key names (键名)仍然被接受,但要比较新的 key 优先级都要低一些.
## 查看 Spark 属性
在应用程序的 web UI `http://&lt;driver&gt;:4040` 中,“Environment” tab (“环境”选项卡)中列出了 Spark 的属性. 这是一个检查您是否正确设置了您的属性的一个非常有用的地方. 注意,只有显示地通过 `spark-defaults.conf` `SparkConf` 或者命令行设置的值将会出现. 对于所有其他配置属性,您可以认为使用的都是默认值.
在应用程序的 web UI `http://&lt;driver&gt;:4040` 中,“Environment” tab (“环境”选项卡)中列出了 Spark 的属性. 这是一个检查您是否正确设置了您的属性的一个非常有用的地方. 注意,只有显示地通过 `spark-defaults.conf``SparkConf` 或者命令行设置的值将会出现. 对于所有其他配置属性,您可以认为使用的都是默认值.
## 可用属性
......@@ -299,7 +299,7 @@ It also allows a different address from the local one to be advertised to execut
| Property Name (属性名称) | Default (默认值) | Meaning (含义) |
| --- | --- | --- |
| `spark.cores.max` | (not set) | 当以 “coarse-grained(粗粒度)” 共享模式在 [standalone deploy cluster](spark-standalone.html)[Mesos cluster in "coarse-grained" sharing mode](running-on-mesos.html#mesos-run-modes) 上运行时,从集群(而不是每台计算机)请求应用程序的最大 CPU 内核数量. 如果未设置,默认值将是 Spar k的 standalone deploy 管理器上的 `spark.deploy.defaultCores` ,或者 Mesos上的无限(所有可用核心). |
| `spark.cores.max` | (not set) | 当以 “coarse-grained(粗粒度)” 共享模式在 [standalone deploy cluster](spark-standalone.html)[Mesos cluster in "coarse-grained" sharing mode](running-on-mesos.html#mesos-run-modes) 上运行时,从集群(而不是每台计算机)请求应用程序的最大 CPU 内核数量. 如果未设置,默认值将是 Spar k的 standalone deploy 管理器上的 `spark.deploy.defaultCores`,或者 Mesos上的无限(所有可用核心). |
| `spark.locality.wait` | 3s | 等待启动本地数据任务多长时间,然后在较少本地节点上放弃并启动它. 相同的等待将用于跨越多个地点级别(process-local,node-local,rack-local 等所有). 也可以通过设置 `spark.locality.wait.node` 等来自定义每个级别的等待时间. 如果任务很长并且局部性较差,则应该增加此设置,但是默认值通常很好. |
| `spark.locality.wait.node` | spark.locality.wait | 自定义 node locality 等待时间. 例如,您可以将其设置为 0 以跳过 node locality,并立即搜索机架位置(如果群集具有机架信息). |
| `spark.locality.wait.process` | spark.locality.wait | 自定义 process locality 等待时间. 这会影响尝试访问特定执行程序进程中的缓存数据的任务. |
......@@ -317,7 +317,7 @@ It also allows a different address from the local one to be advertised to execut
| `spark.blacklist.application.maxFailedTasksPerExecutor` | 2 | (Experimental) How many different tasks must fail on one executor, in successful task sets, before the executor is blacklisted for the entire application. Blacklisted executors will be automatically added back to the pool of available resources after the timeout specified by `spark.blacklist.timeout`. Note that with dynamic allocation, though, the executors may get marked as idle and be reclaimed by the cluster manager. |
| `spark.blacklist.application.maxFailedExecutorsPerNode` | 2 | (Experimental) How many different executors must be blacklisted for the entire application, before the node is blacklisted for the entire application. Blacklisted nodes will be automatically added back to the pool of available resources after the timeout specified by `spark.blacklist.timeout`. Note that with dynamic allocation, though, the executors on the node may get marked as idle and be reclaimed by the cluster manager. |
| `spark.blacklist.killBlacklistedExecutors` | false | (Experimental) If set to "true", allow Spark to automatically kill, and attempt to re-create, executors when they are blacklisted. Note that, when an entire node is added to the blacklist, all of the executors on that node will be killed. |
| `spark.speculation` | false | 如果设置为 "true" ,则执行任务的推测执行. 这意味着如果一个或多个任务在一个阶段中运行缓慢,则将重新启动它们. |
| `spark.speculation` | false | 如果设置为 "true",则执行任务的推测执行. 这意味着如果一个或多个任务在一个阶段中运行缓慢,则将重新启动它们. |
| `spark.speculation.interval` | 100ms | Spark 检查要推测的任务的时间间隔. |
| `spark.speculation.multiplier` | 1.5 | 一个任务的速度可以比推测的平均值慢多少倍. |
| `spark.speculation.quantile` | 0.75 | 对特定阶段启用推测之前必须完成的任务的分数. |
......@@ -447,7 +447,7 @@ showDF(properties, numRows = 200, truncate = FALSE)
| `spark.streaming.receiver.maxRate` | not set | 每秒钟每个 receiver 将接收的数据的最大速率(每秒钟的记录数目). 有效的情况下,每个流每秒将最多消耗这个数目的记录. 设置这个配置为 0 或者 -1 将会不作限制. 细节参见 Spark Streaming 编程指南的 [deployment guide](streaming-programming-guide.html#deploying-applications) 一节. |
| `spark.streaming.receiver.writeAheadLog.enable` | false | 为 receiver 启用 write ahead logs. 所有通过接收器接收输入的数据将被保存到 write ahead logs,以便它在驱动程序故障后进行恢复. 见星火流编程指南部署指南了解更多详情. 细节参见 Spark Streaming 编程指南的 [deployment guide](streaming-programming-guide.html#deploying-applications) 一节. |
| `spark.streaming.unpersist` | true | 强制通过 Spark Streaming 生成并持久化的 RDD 自动从 Spark 内存中非持久化. 通过 Spark Streaming 接收的原始输入数据也将清除. 设置这个属性为 false 允许流应用程序访问原始数据和持久化 RDD,因为它们没有被自动清除. 但是它会造成更高的内存花费. |
| `spark.streaming.stopGracefullyOnShutdown` | false | 如果为 `true` ,Spark 将 gracefully (缓慢地)关闭在 JVM 运行的 StreamingContext ,而非立即执行. |
| `spark.streaming.stopGracefullyOnShutdown` | false | 如果为 `true`,Spark 将 gracefully (缓慢地)关闭在 JVM 运行的 StreamingContext,而非立即执行. |
| `spark.streaming.kafka.maxRatePerPartition` | not set | 在使用新的 Kafka direct stream API 时,从每个 kafka 分区读到的最大速率(每秒的记录数目). 详见 [Kafka Integration guide](streaming-kafka-integration.html) . |
| `spark.streaming.kafka.maxRetries` | 1 | driver 连续重试的最大次数,以此找到每个分区 leader 的最近的(latest)的偏移量(默认为 1 意味着 driver 将尝试最多两次). 仅应用于新的 kafka direct stream API. |
| `spark.streaming.ui.retainedBatches` | 1000 | 在垃圾回收之前,Spark Streaming UI 和状态API 所能记得的 批处理(batches)数量. |
......@@ -476,7 +476,7 @@ showDF(properties, numRows = 200, truncate = FALSE)
| Property Name (属性名称) | Default (默认值) | Meaning (含义) |
| --- | --- | --- |
| `spark.deploy.recoveryMode` | NONE | 集群模式下,Spark jobs 执行失败或者重启时,恢复提交 Spark jobs 的恢复模式设定. |
| `spark.deploy.zookeeper.url` | None | 当 `spark.deploy.recoveryMode` 被设定为 ZOOKEEPER ,这一配置被用来连接 zookeeper URL. |
| `spark.deploy.zookeeper.url` | None | 当 `spark.deploy.recoveryMode` 被设定为 ZOOKEEPER,这一配置被用来连接 zookeeper URL. |
| `spark.deploy.zookeeper.dir` | None | 当 `spark.deploy.recoveryMode` 被设定为 ZOOKEEPER,这一配置被用来设定 zookeeper 目录为 store recovery state. |
### Cluster Managers (集群管理器)
......@@ -507,7 +507,7 @@ Spark 中的每个集群管理器都有额外的配置选项,这些配置可
因为 `spark-env.sh` 是 shell 脚本,一些可以通过程序的方式来设置,比如你可以通过特定的网络接口来计算 `SPARK_LOCAL_IP` .
注意 : 当以 `cluster` mode (集群模式)运行 Spark on YARN 时 ,环境变量需要通过在您的 `conf/spark-defaults.conf` 文件中 `spark.yarn.appMasterEnv.[EnvironmentVariableName]` 来设定. `cluster` mode (集群模式)下,`spark-env.sh` 中设定的环境变量将不会在 YARN Application Master 过程中反应出来. 详见 [YARN-related Spark Properties](running-on-yarn.html#spark-properties).
注意 : 当以 `cluster` mode (集群模式)运行 Spark on YARN 时,环境变量需要通过在您的 `conf/spark-defaults.conf` 文件中 `spark.yarn.appMasterEnv.[EnvironmentVariableName]` 来设定. `cluster` mode (集群模式)下,`spark-env.sh` 中设定的环境变量将不会在 YARN Application Master 过程中反应出来. 详见 [YARN-related Spark Properties](running-on-yarn.html#spark-properties).
# Configuring Logging (配置 Logging)
......@@ -515,7 +515,7 @@ Spark 用 [log4j](http://logging.apache.org/log4j/) 生成日志,你可以通
# Overriding configuration directory (覆盖配置目录)
如果你想指定不同的配置目录,而不是默认的 “SPARK_HOME/conf” ,你可以设置 SPARK_CONF_DIR. Spark 将从这一目录下读取文件( spark-defaults.conf,spark-env.sh,log4j.properties 等)
如果你想指定不同的配置目录,而不是默认的 “SPARK_HOME/conf”,你可以设置 SPARK_CONF_DIR. Spark 将从这一目录下读取文件( spark-defaults.conf,spark-env.sh,log4j.properties 等)
# Inheriting Hadoop Cluster Configuration (继承 Hadoop 集群配置)
......
......@@ -23,7 +23,7 @@
./sbin/start-history-server.sh
```
默认情况下,会在 `http://&lt;server-url&gt;:18080` 创建一个Web 界面 ,显示未完成、完成以及其他尝试的任务信息。
默认情况下,会在 `http://&lt;server-url&gt;:18080` 创建一个Web 界面,显示未完成、完成以及其他尝试的任务信息。
当使用file-system提供程序类(见下面 `spark.history.provider`)时,基本日志记录目录必须在`spark.history.fs.logDirectory`配置选项中提供,并且应包含每个代表应用程序的事件日志的子目录。
......@@ -171,7 +171,7 @@ Spark还支持由于许可限制而不包含在默认构建中的Ganglia接收
* `GangliaSink`: 向Ganglia节点或 multicast组发送metrics。
要安装 `GangliaSink` ,您需要执行Spark的自定义构建。_**请注意,通过嵌入此库,您将包括 [LGPL](http://www.gnu.org/copyleft/lesser.html)-licensed Spark包中的代码**_。对于sbt用户,在构建之前设置 `SPARK_GANGLIA_LGPL`环境变量。对于Maven用户,启用 `-Pspark-ganglia-lgpl`配置文件。除了修改集群的Spark构建用户,应用程序还需要链接到 `spark-ganglia-lgpl`工件。
要安装 `GangliaSink`,您需要执行Spark的自定义构建。_**请注意,通过嵌入此库,您将包括 [LGPL](http://www.gnu.org/copyleft/lesser.html)-licensed Spark包中的代码**_。对于sbt用户,在构建之前设置 `SPARK_GANGLIA_LGPL`环境变量。对于Maven用户,启用 `-Pspark-ganglia-lgpl`配置文件。除了修改集群的Spark构建用户,应用程序还需要链接到 `spark-ganglia-lgpl`工件。
metrics配置文件的语法在示例配置文件 `$SPARK_HOME/conf/metrics.properties.template`中定义。
......
......@@ -14,7 +14,7 @@
* [数据本地化](#数据本地化)
* [概要](#概要)
由于大多数 Spark 计算的内存性质, Spark 程序可能由集群中的任何资源( CPU ,网络带宽或内存)导致瓶颈。通常情况下,如果数据有合适的内存,瓶颈就是网络带宽,但有时您还需要进行一些调整,例如 [以序列化形式存储 RDD](programming-guide.html#rdd-persistence) 来减少内存的使用。本指南将涵盖两个主要的主题:数据序列化,这对于良好的网络性能至关重要,并且还可以减少内存使用和内存优化。我们选几个较小的主题进行展开。
由于大多数 Spark 计算的内存性质, Spark 程序可能由集群中的任何资源( CPU,网络带宽或内存)导致瓶颈。通常情况下,如果数据有合适的内存,瓶颈就是网络带宽,但有时您还需要进行一些调整,例如 [以序列化形式存储 RDD](programming-guide.html#rdd-persistence) 来减少内存的使用。本指南将涵盖两个主要的主题:数据序列化,这对于良好的网络性能至关重要,并且还可以减少内存使用和内存优化。我们选几个较小的主题进行展开。
# 数据序列化
......@@ -51,9 +51,9 @@ val sc = new SparkContext(conf)
默认情况下, Java 对象可以快速访问,但可以轻松地消耗比其字段中的 “raw” 数据多2-5倍的空间。这是由于以下几个原因:
* 每个不同的 Java 对象都有一个 “object header” ,它大约是16个字节,包含一个指向它的类的指针。对于一个数据很少的对象(比如说一个`Int`字段),这可以比数据大。
* 每个不同的 Java 对象都有一个 “object header”,它大约是16个字节,包含一个指向它的类的指针。对于一个数据很少的对象(比如说一个`Int`字段),这可以比数据大。
* Java `String` 在原始字符串数据上具有大约40字节的开销(因为它们存储在 `Char` 数组中并保留额外的数据,例如长度),并且由于 UTF-16 的内部使用而将每个字符存储为_两个_字节 `String` 编码。因此,一个10个字符的字符串可以容易地消耗60个字节。
* 公共收集类,例如 `HashMap``LinkedList` ,使用链接的数据结构,其中每个条目(例如: `Map.Entry` )存在”包装器”对象。该对象不仅具有 header ,还包括指针(通常为8个字节)到列表中的下一个对象。
* 公共收集类,例如 `HashMap``LinkedList`,使用链接的数据结构,其中每个条目(例如: `Map.Entry` )存在”包装器”对象。该对象不仅具有 header,还包括指针(通常为8个字节)到列表中的下一个对象。
* 原始类型的集合通常将它们存储为”盒装”对象,例如: `java.lang.Integer`
本节将从 Spark 的内存管理概述开始,然后讨论用户可以采取的具体策略,以便在他/她的应用程序中更有效地使用内存。具体来说,我们将描述如何确定对象的内存使用情况,以及如何改进数据结构,或通过以串行格式存储数据。然后我们将介绍调整 Spark 的缓存大小和 Java 垃圾回收器。
......@@ -73,7 +73,7 @@ Spark 中的内存使用大部分属于两类:执行和存储。执行存储
## 确定内存消耗
大小数据集所需的内存消耗量的最佳方式是创建 RDD ,将其放入缓存中,并查看 Web UI 中的“存储”页面。该页面将告诉您 RDD 占用多少内存。
大小数据集所需的内存消耗量的最佳方式是创建 RDD,将其放入缓存中,并查看 Web UI 中的“存储”页面。该页面将告诉您 RDD 占用多少内存。
为了估计特定对象的内存消耗,使用 `SizeEstimator``estimate` 方法这是用于与不同的数据布局试验修剪内存使用情况,以及确定的空间的广播变量将占据每个执行器堆的量是有用的。
......@@ -84,15 +84,15 @@ Spark 中的内存使用大部分属于两类:执行和存储。执行存储
1. 将数据结构设计为偏好对象数组和原始类型,而不是标准的 Java 或 Scala 集合类(例如: `HashMap` )。该 [fastutil](http://fastutil.di.unimi.it) 库提供方便的集合类基本类型是与 Java 标准库兼容。
2. 尽可能避免使用很多小对象和指针的嵌套结构。
3. 考虑使用数字 ID 或枚举对象而不是键的字符串。
4. 如果您的 RAM 小于32 GB,请设置 JVM 标志 `-XX:+UseCompressedOops` ,使指针为4个字节而不是8个字节。您可以添加这些选项 [`spark-env.sh`](configuration.html#environment-variables)
4. 如果您的 RAM 小于32 GB,请设置 JVM 标志 `-XX:+UseCompressedOops`,使指针为4个字节而不是8个字节。您可以添加这些选项 [`spark-env.sh`](configuration.html#environment-variables)
## 序列化 RDD 存储
当您的对象仍然太大而无法有效存储,尽管这种调整,减少内存使用的一个更简单的方法是以序列化形式存储它们,使用 [RDD 持久性 API](programming-guide.html#rdd-persistence) 中的序列化 StorageLevel ,例如: `MEMORY_ONLY_SER`。Spark 将会将每个 RDD 分区存储为一个大字节数组。以序列化形式存储数据的唯一缺点是访问时间较短,因为必须对每个对象进行反序列化。如果您想以序列化形式缓存数据,我们强烈建议[使用 Kryo](#data-serialization) ,因为它导致比 Java 序列化更小的尺寸(而且肯定比原 Java 对象)更小。
当您的对象仍然太大而无法有效存储,尽管这种调整,减少内存使用的一个更简单的方法是以序列化形式存储它们,使用 [RDD 持久性 API](programming-guide.html#rdd-persistence) 中的序列化 StorageLevel,例如: `MEMORY_ONLY_SER`。Spark 将会将每个 RDD 分区存储为一个大字节数组。以序列化形式存储数据的唯一缺点是访问时间较短,因为必须对每个对象进行反序列化。如果您想以序列化形式缓存数据,我们强烈建议[使用 Kryo](#data-serialization),因为它导致比 Java 序列化更小的尺寸(而且肯定比原 Java 对象)更小。
## 垃圾收集调整
当您的程序存储的 RDD 有很大的”流失”时, JVM 垃圾收集可能是一个问题。(程序中通常没有问题,只读一次 RDD ,然后在其上运行许多操作)。当 Java 需要驱逐旧对象为新的对象腾出空间时,需要跟踪所有 Java 对象并找到未使用的。要记住的要点是,垃圾收集的成本与 Java 对象的数量成正比,因此使用较少对象的数据结构(例如: `Ints` 数组,而不是 `LinkedList` )大大降低了此成本。一个更好的方法是如上所述以序列化形式持久化对象:现在每个 RDD 分区只有一个对象(一个字节数组)。在尝试其他技术之前,如果 GC 是一个问题,首先要使用[序列化缓存](#serialized-rdd-storage)
当您的程序存储的 RDD 有很大的”流失”时, JVM 垃圾收集可能是一个问题。(程序中通常没有问题,只读一次 RDD,然后在其上运行许多操作)。当 Java 需要驱逐旧对象为新的对象腾出空间时,需要跟踪所有 Java 对象并找到未使用的。要记住的要点是,垃圾收集的成本与 Java 对象的数量成正比,因此使用较少对象的数据结构(例如: `Ints` 数组,而不是 `LinkedList` )大大降低了此成本。一个更好的方法是如上所述以序列化形式持久化对象:现在每个 RDD 分区只有一个对象(一个字节数组)。在尝试其他技术之前,如果 GC 是一个问题,首先要使用[序列化缓存](#serialized-rdd-storage)
由于任务的工作记忆(运行任务所需的空间)和缓存在节点上的 RDD 之间的干扰, GC 也可能是一个问题。我们将讨论如何控制分配给RDD缓存的空间来减轻这一点。
......@@ -106,15 +106,15 @@ GC 调整的第一步是收集关于垃圾收集发生频率和GC花费的时间
* Java堆空间分为两个区域 Young 和 Old。Young 一代的目的是持有短命的物体,而 Old 一代的目标是使用寿命更长的物体。
* Young 一代进一步分为三个区域[ Eden , Survivor1 , Survivor2 ]。
* Young 一代进一步分为三个区域[ Eden, Survivor1, Survivor2 ]。
* 垃圾收集过程的简化说明:当 Eden 已满时, Eden 上运行了一个小型 GC ,并将 Eden 和 Survivor1 中存在的对象复制到 Survivor2。幸存者地区被交换。如果一个对象足够老,或者 Survivor2 已满,则会移动到 Old。最后,当 Old 接近满时,一个完整的 GC 被调用。
* 垃圾收集过程的简化说明:当 Eden 已满时, Eden 上运行了一个小型 GC,并将 Eden 和 Survivor1 中存在的对象复制到 Survivor2。幸存者地区被交换。如果一个对象足够老,或者 Survivor2 已满,则会移动到 Old。最后,当 Old 接近满时,一个完整的 GC 被调用。
Spark 中 GC 调优的目的是确保只有长寿命的 RDD 存储在 Old 版本中,并且 Young 版本的大小足够存储短命期的对象。这将有助于避免使用完整的 GC 来收集任务执行期间创建的临时对象。可能有用的一些步骤是:
* 通过收集 GC 统计信息来检查垃圾收集是否太多。如果在任务完成之前多次调用完整的 GC ,这意味着没有足够的可用于执行任务的内存。
* 通过收集 GC 统计信息来检查垃圾收集是否太多。如果在任务完成之前多次调用完整的 GC,这意味着没有足够的可用于执行任务的内存。
* 如果太小的集合太多,而不是很多主要的 GC ,为 Eden 分配更多的内存将会有所帮助。您可以将 Eden 的大小设置为对每个任务需要多少内存的估计。如果确定 Eden 的大小 `E` ,那么您可以使用该选项设置年轻一代的大小 `-Xmn=4/3*E`。(按比例增加4/3是考虑幸存者地区使用的空间。)
* 如果太小的集合太多,而不是很多主要的 GC,为 Eden 分配更多的内存将会有所帮助。您可以将 Eden 的大小设置为对每个任务需要多少内存的估计。如果确定 Eden 的大小 `E`,那么您可以使用该选项设置年轻一代的大小 `-Xmn=4/3*E`。(按比例增加4/3是考虑幸存者地区使用的空间。)
* 在打印的 GC 统计信息中,如果 OldGen 接近于满,则通过降低减少用于缓存的内存量 `spark.memory.fraction` ; 缓存较少的对象比减慢任务执行更好。或者,考虑减少年轻一代的大小。这意味着 `-Xmn` 如果您将其设置为如上所述降低。如果没有,请尝试更改 JVM `NewRatio` 参数的值。许多 JVM 默认为2,这意味着 Old 版本占据堆栈的2/3。它应该足够大,使得该分数超过 `spark.memory.fraction`
......@@ -132,11 +132,11 @@ Spark 中 GC 调优的目的是确保只有长寿命的 RDD 存储在 Old 版本
## 并行度水平
集群不会被充分利用,除非您将每个操作的并行级别设置得足够高。自动星火设置的 “地图” 任务的数量根据其大小对每个文件运行(尽管你可以通过可选的参数来控制它 `SparkContext.textFile` ,等等),以及用于分布式”减少”操作,如: `groupByKey``reduceByKey` ,它采用了最大父 RDD 的分区数。您可以将并行级别作为第二个参数传递(请参阅 [`spark.PairRDDFunctions`](api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions) 文档),或者将 config 属性设置 `spark.default.parallelism` 为更改默认值。一般来说,我们建议您的群集中每个 CPU 内核有2-3个任务。
集群不会被充分利用,除非您将每个操作的并行级别设置得足够高。自动星火设置的 “地图” 任务的数量根据其大小对每个文件运行(尽管你可以通过可选的参数来控制它 `SparkContext.textFile`,等等),以及用于分布式”减少”操作,如: `groupByKey``reduceByKey`,它采用了最大父 RDD 的分区数。您可以将并行级别作为第二个参数传递(请参阅 [`spark.PairRDDFunctions`](api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions) 文档),或者将 config 属性设置 `spark.default.parallelism` 为更改默认值。一般来说,我们建议您的群集中每个 CPU 内核有2-3个任务。
## 减少任务的内存使用
有时,您将得到一个 OutOfMemoryError ,因为您的 RDD 不适合内存,而是因为您的其中一个任务的工作集(如其中一个 reduce 任务`groupByKey`)太大。Spark 的 shuffle 操作(`sortByKey``groupByKey``reduceByKey``join`,等)建立每个任务中的哈希表来进行分组,而这往往是很大的。这里最简单的解决方案是_增加并行级别_,以便每个任务的输入集都更小。Spark 可以有效地支持短达200 ms 的任务,因为它可以将多个任务中的一个执行者JVM重用,并且任务启动成本低,因此您可以将并行级别安全地提高到集群中的核心数量。
有时,您将得到一个 OutOfMemoryError,因为您的 RDD 不适合内存,而是因为您的其中一个任务的工作集(如其中一个 reduce 任务`groupByKey`)太大。Spark 的 shuffle 操作(`sortByKey``groupByKey``reduceByKey``join`,等)建立每个任务中的哈希表来进行分组,而这往往是很大的。这里最简单的解决方案是_增加并行级别_,以便每个任务的输入集都更小。Spark 可以有效地支持短达200 ms 的任务,因为它可以将多个任务中的一个执行者JVM重用,并且任务启动成本低,因此您可以将并行级别安全地提高到集群中的核心数量。
## 广播大的变量
......
......@@ -69,7 +69,7 @@ Spark会分轮次来申请执行器。实际的资源申请,会在任务挂起
这种需求对于混洗操作尤其重要。混洗过程中,Spark 执行器首先将 map 输出写到本地磁盘,同时执行器本身又是一个文件服务器,这样其他执行器就能够通过该执行器获得对应的 map 结果数据。一旦有某些任务执行时间过长,动态分配有可能在混洗结束前移除任务异常的执行器,而这些被移除的执行器对应的数据将会被重新计算,但这些重算其实是不必要的。
要解决这一问题,就需要用到 external shuffle service ,该服务在 Spark 1.2 引入。该服务在每个节点上都会启动一个不依赖于任何 Spark 应用或执行器的独立进程。一旦该服务启用,Spark 执行器不再从各个执行器上获取 shuffle 文件,转而从这个 service 获取。这意味着,任何执行器输出的混洗状态数据都可能存留时间比对应的执行器进程还长。
要解决这一问题,就需要用到 external shuffle service,该服务在 Spark 1.2 引入。该服务在每个节点上都会启动一个不依赖于任何 Spark 应用或执行器的独立进程。一旦该服务启用,Spark 执行器不再从各个执行器上获取 shuffle 文件,转而从这个 service 获取。这意味着,任何执行器输出的混洗状态数据都可能存留时间比对应的执行器进程还长。
除了混洗文件之外,执行器也会在磁盘或者内存中缓存数。一旦执行器被移除,其缓存数据将无法访问。这个问题目前还没有解决。或许在未来的版本中,可能会采用外部混洗服务类似的方法,将缓存数据保存在堆外存储中以解决这一问题。
......
......@@ -22,7 +22,7 @@ Spark 开发者都会遇到一个常见问题,那就是如何为 Spark 配置
您需要多少内存取决于您的应用程序。如果您需要确定的应用程序中某个特定数据集占用内存的大小,您可以把这个数据集加载到一个 Spark RDD 中,然后在 Spark 监控 UI 页面(`http://&lt;driver-node&gt;:4040`)中的 Storage 选项卡下查看它在内存中的大小。需要注意的是,存储级别和序列化格式对内存使用量有很大的影响 - 如何减少内存使用量的建议,请参阅[调优指南](tuning.html)
最后,需要注意的是 Java 虚拟机在超过 200GB 的 RAM 时表现得并不好。如果您购置的机器有比这更多的 RAM ,您可以在每个节点上运行多个 Worker 的 JVM 实例。在 Spark 的 [standalone mode](spark-standalone.html) 下,您可以通过 `conf/spark-env.sh` 中的 `SPARK_WORKER_INSTANCES``SPARK_WORKER_CORES` 两个参数来分别设置每个节点的 Worker 数量和每个 Worker 使用的 Core 数量。
最后,需要注意的是 Java 虚拟机在超过 200GB 的 RAM 时表现得并不好。如果您购置的机器有比这更多的 RAM,您可以在每个节点上运行多个 Worker 的 JVM 实例。在 Spark 的 [standalone mode](spark-standalone.html) 下,您可以通过 `conf/spark-env.sh` 中的 `SPARK_WORKER_INSTANCES``SPARK_WORKER_CORES` 两个参数来分别设置每个节点的 Worker 数量和每个 Worker 使用的 Core 数量。
# 网络
......
......@@ -49,12 +49,12 @@ export MAVEN_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=512m"
**注意:**
* 如果使用 `build/mvn` 但是没有设置 `MAVEN_OPTS` ,那么脚本会自动地将以上选项添加到 `MAVEN_OPTS` 环境变量。
* 即使不适用 `build/mvn` ,Spark 构建的 `test` 阶段也会自动将这些选项加入到 `MAVEN_OPTS` 中。
* 如果使用 `build/mvn` 但是没有设置 `MAVEN_OPTS`,那么脚本会自动地将以上选项添加到 `MAVEN_OPTS` 环境变量。
* 即使不适用 `build/mvn`,Spark 构建的 `test` 阶段也会自动将这些选项加入到 `MAVEN_OPTS` 中。
### build/mvn
Spark 现在与一个独立的 Maven 安装包封装到了一起,以便从位于 `build/` 目录下的源代码构建和部署 Spark。该脚本将自动下载并设置所有必要的构建需求 ([Maven](https://maven.apache.org/) [Scala](http://www.scala-lang.org/)[Zinc](https://github.com/typesafehub/zinc)) 到本身的 `build/` 目录下。其尊重任何已经存在的 `mvn` 二进制文件,但是将会 pull down 其自己的 Scala 和 Zinc 的副本,不管是否满足适当的版本需求。`build/mvn` 的执行作为传递到 `mvn` 的调用,允许从以前的构建方法轻松转换。例如,可以像下边这样构建 Spark 的版本:
Spark 现在与一个独立的 Maven 安装包封装到了一起,以便从位于 `build/` 目录下的源代码构建和部署 Spark。该脚本将自动下载并设置所有必要的构建需求 ([Maven](https://maven.apache.org/)[Scala](http://www.scala-lang.org/)[Zinc](https://github.com/typesafehub/zinc)) 到本身的 `build/` 目录下。其尊重任何已经存在的 `mvn` 二进制文件,但是将会 pull down 其自己的 Scala 和 Zinc 的副本,不管是否满足适当的版本需求。`build/mvn` 的执行作为传递到 `mvn` 的调用,允许从以前的构建方法轻松转换。例如,可以像下边这样构建 Spark 的版本:
```
./build/mvn -DskipTests clean package
......@@ -116,7 +116,7 @@ Spark 现在与一个独立的 Maven 安装包封装到了一起,以便从位
./build/mvn -Pyarn -Dscala-2.10 -DskipTests clean package
```
请注意,Scala 2.10 的支持已经不再适用于 Spark 2.1.0 ,并且可能在 Spark 2.2.0 中被删除。
请注意,Scala 2.10 的支持已经不再适用于 Spark 2.1.0,并且可能在 Spark 2.2.0 中被删除。
## 单独构建子模块
......
......@@ -7,7 +7,7 @@
* [独立的应用](#独立的应用)
* [快速跳转](#快速跳转)
本教程提供了如何使用 Spark 的快速入门介绍。首先通过运行 Spark 交互式的 shell(在 Python 或 Scala 中)来介绍 API,然后展示如何使用 Java ,Scala 和 Python 来编写应用程序。
本教程提供了如何使用 Spark 的快速入门介绍。首先通过运行 Spark 交互式的 shell(在 Python 或 Scala 中)来介绍 API,然后展示如何使用 Java,Scala 和 Python 来编写应用程序。
为了继续阅读本指南,首先从 [Spark 官网](http://spark.apache.org/downloads.html) 下载 Spark 的发行包。因为我们将不使用 HDFS,所以你可以下载一个任何 Hadoop 版本的软件包。
......
......@@ -303,7 +303,7 @@ Once created, the distributed dataset (`distData`) can be operated on in paralle
Spark 可以从 Hadoop 所支持的任何存储源中创建 distributed dataset(分布式数据集),包括本地文件系统,HDFS,Cassandra,HBase,[Amazon S3](http://wiki.apache.org/hadoop/AmazonS3) 等等。Spark 支持文本文件,[SequenceFiles](http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/mapred/SequenceFileInputFormat.html),以及任何其它的 Hadoop [InputFormat](http://hadoop.apache.org/docs/stable/api/org/apache/hadoop/mapred/InputFormat.html)
可以使用 `SparkContext``textFile` 方法来创建文本文件的 RDD。此方法需要一个文件的 URI(计算机上的本地路径 `hdfs://``s3n://` 等等的 URI),并且读取它们作为一个 lines(行)的集合。下面是一个调用示例:
可以使用 `SparkContext``textFile` 方法来创建文本文件的 RDD。此方法需要一个文件的 URI(计算机上的本地路径,`hdfs://``s3n://` 等等的 URI),并且读取它们作为一个 lines(行)的集合。下面是一个调用示例:
......@@ -770,7 +770,7 @@ print("Counter value: ", counter)
上面的代码行为是不确定的,并且可能无法按预期正常工作。执行作业时,Spark 会分解 RDD 操作到每个 executor 中的 task 里。在执行之前,Spark 计算任务的 **closure**(闭包)。闭包是指 executor 要在RDD上进行计算时必须对执行节点可见的那些变量和方法(在这里是foreach())。闭包被序列化并被发送到每个 executor。
闭包的变量副本发给每个 **executor** ,当 **counter**`foreach` 函数引用的时候,它已经不再是 driver node 的 **counter** 了。虽然在 driver node 仍然有一个 counter 在内存中,但是对 executors 已经不可见。executor 看到的只是序列化的闭包一个副本。所以 **counter** 最终的值还是 0,因为对 `counter` 所有的操作均引用序列化的 closure 内的值。
闭包的变量副本发给每个 **executor**,当 **counter**`foreach` 函数引用的时候,它已经不再是 driver node 的 **counter** 了。虽然在 driver node 仍然有一个 counter 在内存中,但是对 executors 已经不可见。executor 看到的只是序列化的闭包一个副本。所以 **counter** 最终的值还是 0,因为对 `counter` 所有的操作均引用序列化的 closure 内的值。
`local` 本地模式,在某些情况下的 `foreach` 功能实际上是同一 JVM 上的驱动程序中执行,并会引用同一个原始的 **counter** 计数器,实际上可能更新.
......@@ -800,7 +800,7 @@ val counts = pairs.reduceByKey((a, b) => a + b)
我们也可以使用 `counts.sortByKey()` ,例如,在对按字母顺序排序,最后 `counts.collect()` 把他们作为一个数据对象返回给 driver 程序。
我们也可以使用 `counts.sortByKey()`,例如,在对按字母顺序排序,最后 `counts.collect()` 把他们作为一个数据对象返回给 driver 程序。
**Note(注意):** 当在 key-value pair 操作中使用自定义的 objects 作为 key 时,您必须确保有一个自定义的 `equals()` 方法有一个 `hashCode()` 方法相匹配. 有关详情,请参阅 [Object.hashCode() documentation](http://docs.oracle.com/javase/7/docs/api/java/lang/Object.html#hashCode()) 中列出的约定.
......@@ -943,7 +943,7 @@ RDD 可以使用 `persist()` 方法或 `cache()` 方法进行持久化。数据
| MEMORY_ONLY_SER
(Java and Scala) | 将 RDD 以序列化的 Java 对象的形式进行存储(每个分区为一个 byte 数组)。这种方式会比反序列化对象的方式节省很多空间,尤其是在使用 [fast serializer](tuning.html) 时会节省更多的空间,但是在读取时会增加 CPU 的计算负担. |
| MEMORY_AND_DISK_SER
(Java and Scala) | 类似于 MEMORY_ONLY_SER ,但是溢出的分区会存储到磁盘,而不是在用到它们时重新计算. |
(Java and Scala) | 类似于 MEMORY_ONLY_SER,但是溢出的分区会存储到磁盘,而不是在用到它们时重新计算. |
| DISK_ONLY | 只在磁盘上缓存 RDD. |
| MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. | 与上面的级别功能相同,只不过每个分区在集群中两个节点上建立副本. |
| OFF_HEAP (experimental 实验性) | 类似于 MEMORY_ONLY_SER,但是将数据存储在 [off-heap memory](configuration.html#memory-management) 中. 这需要启用 off-heap 内存. |
......
此差异已折叠。
......@@ -147,7 +147,7 @@ sparkR.session(appName = "R Spark SQL basic example", sparkConfig = list(spark.s
<small>Find full example code at "examples/src/main/r/RSparkSQLExample.R" in the Spark repo.</small>
注意第一次调用时,`sparkR.session()` 初始化一个全局的 `SparkSession` 单实例,并且总是返回一个引用此实例,可以连续的调用. 通过这种方式,用户仅需要创建一次 `SparkSession` ,然后像 `read.df` SparkR函数就能够立即获取全局的实例,用户不需要再 `SparkSession` 之间进行实例的传递.
注意第一次调用时,`sparkR.session()` 初始化一个全局的 `SparkSession` 单实例,并且总是返回一个引用此实例,可以连续的调用. 通过这种方式,用户仅需要创建一次 `SparkSession`,然后像 `read.df` SparkR函数就能够立即获取全局的实例,用户不需要再 `SparkSession` 之间进行实例的传递.
Spark 2.0 中的`SparkSession` 为 Hive 特性提供了内嵌的支持,包括使用 HiveQL 编写查询的能力,访问 Hive UDF,以及从 Hive 表中读取数据的能力.为了使用这些特性,你不需要去有一个已存在的 Hive 设置.
......@@ -1516,7 +1516,7 @@ df <- sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
### Save Modes (保存模式)
Save operations (保存操作)可以选择使用 `SaveMode` ,它指定如何处理现有数据如果存在的话. 重要的是要意识到,这些 save modes (保存模式)不使用任何 locking (锁定)并且不是 atomic (原子). 另外,当执行 `Overwrite` 时,数据将在新数据写出之前被删除.
Save operations (保存操作)可以选择使用 `SaveMode`,它指定如何处理现有数据如果存在的话. 重要的是要意识到,这些 save modes (保存模式)不使用任何 locking (锁定)并且不是 atomic (原子). 另外,当执行 `Overwrite` 时,数据将在新数据写出之前被删除.
| Scala/Java | Any Language | Meaning |
| --- | --- | --- |
......@@ -1836,7 +1836,7 @@ path
通过将 `path/to/table` 传递给 `SparkSession.read.parquet``SparkSession.read.load` ,Spark SQL 将自动从路径中提取 partitioning information (分区信息). 现在返回的 DataFrame 的 schema (模式)变成:
通过将 `path/to/table` 传递给 `SparkSession.read.parquet``SparkSession.read.load`,Spark SQL 将自动从路径中提取 partitioning information (分区信息). 现在返回的 DataFrame 的 schema (模式)变成:
......@@ -1852,11 +1852,11 @@ root
请注意,会自动 inferred (推断) partitioning columns (分区列)的 data types (数据类型).目前,支持 numeric data types (数字数据类型)和 string type (字符串类型).有些用户可能不想自动推断 partitioning columns (分区列)的数据类型.对于这些用例,automatic type inference (自动类型推断)可以由 `spark.sql.sources.partitionColumnTypeInference.enabled` 配置,默认为 `true` .当禁用 type inference (类型推断)时,string type (字符串类型)将用于 partitioning columns (分区列).
从 Spark 1.6.0 开始,默认情况下,partition discovery (分区发现)只能找到给定路径下的 partitions (分区).对于上述示例,如果用户将 `path/to/table/gender=male` 传递给 `SparkSession.read.parquet``SparkSession.read.load` ,则 `gender` 将不被视为 partitioning column (分区列).如果用户需要指定 partition discovery (分区发现)应该开始的基本路径,则可以在数据源选项中设置 `basePath`.例如,当 `path/to/table/gender=male` 是数据的路径并且用户将 `basePath` 设置为 `path/to/table/``gender` 将是一个 partitioning column (分区列).
从 Spark 1.6.0 开始,默认情况下,partition discovery (分区发现)只能找到给定路径下的 partitions (分区).对于上述示例,如果用户将 `path/to/table/gender=male` 传递给 `SparkSession.read.parquet``SparkSession.read.load`,则 `gender` 将不被视为 partitioning column (分区列).如果用户需要指定 partition discovery (分区发现)应该开始的基本路径,则可以在数据源选项中设置 `basePath`.例如,当 `path/to/table/gender=male` 是数据的路径并且用户将 `basePath` 设置为 `path/to/table/``gender` 将是一个 partitioning column (分区列).
### Schema Merging (模式合并)
像 ProtocolBuffer ,Avro 和 Thrift 一样,Parquet 也支持 schema evolution (模式演进). 用户可以从一个 simple schema (简单的架构)开始,并根据需要逐渐向 schema 添加更多的 columns (列). 以这种方式,用户可能会使用不同但相互兼容的 schemas 的 multiple Parquet files (多个 Parquet 文件). Parquet data source (Parquet 数据源)现在能够自动检测这种情况并 merge (合并)所有这些文件的 schemas .
像 ProtocolBuffer,Avro 和 Thrift 一样,Parquet 也支持 schema evolution (模式演进). 用户可以从一个 simple schema (简单的架构)开始,并根据需要逐渐向 schema 添加更多的 columns (列). 以这种方式,用户可能会使用不同但相互兼容的 schemas 的 multiple Parquet files (多个 Parquet 文件). Parquet data source (Parquet 数据源)现在能够自动检测这种情况并 merge (合并)所有这些文件的 schemas .
由于 schema merging (模式合并)是一个 expensive operation (相对昂贵的操作),并且在大多数情况下不是必需的,所以默认情况下从 1.5.0 开始. 你可以按照如下的方式启用它:
......@@ -2082,14 +2082,14 @@ REFRESH TABLE my_table;
| Property Name (参数名称) | Default(默认) | Meaning(含义) |
| --- | --- | --- |
| `spark.sql.parquet.binaryAsString` | false | 一些其他 Parquet-producing systems (Parquet 生产系统),特别是 Impala,Hive 和旧版本的 Spark SQL ,在 writing out (写出) Parquet schema 时,不区分 binary data (二进制数据)和 strings (字符串). 该 flag 告诉 Spark SQL 将 binary data (二进制数据)解释为 string (字符串)以提供与这些系统的兼容性. |
| `spark.sql.parquet.int96AsTimestamp` | true | 一些 Parquet-producing systems ,特别是 Impala 和 Hive ,将 Timestamp 存入INT96 . 该 flag 告诉 Spark SQL 将 INT96 数据解析为 timestamp 以提供与这些系统的兼容性. |
| `spark.sql.parquet.binaryAsString` | false | 一些其他 Parquet-producing systems (Parquet 生产系统),特别是 Impala,Hive 和旧版本的 Spark SQL,在 writing out (写出) Parquet schema 时,不区分 binary data (二进制数据)和 strings (字符串). 该 flag 告诉 Spark SQL 将 binary data (二进制数据)解释为 string (字符串)以提供与这些系统的兼容性. |
| `spark.sql.parquet.int96AsTimestamp` | true | 一些 Parquet-producing systems,特别是 Impala 和 Hive,将 Timestamp 存入INT96 . 该 flag 告诉 Spark SQL 将 INT96 数据解析为 timestamp 以提供与这些系统的兼容性. |
| `spark.sql.parquet.cacheMetadata` | true | 打开 Parquet schema metadata 的缓存. 可以加快查询静态数据. |
| `spark.sql.parquet.compression.codec` | snappy | 在编写 Parquet 文件时设置 compression codec (压缩编解码器)的使用. 可接受的值包括: uncompressed,snappy,gzip,lzo . |
| `spark.sql.parquet.filterPushdown` | true | 设置为 true 时启用 Parquet filter push-down optimization . |
| `spark.sql.hive.convertMetastoreParquet` | true | 当设置为 false 时,Spark SQL 将使用 Hive SerDe 作为 parquet tables ,而不是内置的支持. |
| `spark.sql.parquet.mergeSchema` | false | 当为 true 时,Parquet data source (Parquet 数据源) merges (合并)从所有 data files (数据文件)收集的 schemas ,否则如果没有可用的 summary file ,则从 summary file 或 random data file 中挑选 schema . |
| `spark.sql.optimizer.metadataOnly` | true | 如果为 true ,则启用使用表的 metadata 的 metadata-only query optimization 来生成 partition columns (分区列)而不是 table scans (表扫描). 当 scanned (扫描)的所有 columns (列)都是 partition columns (分区列)并且 query (查询)具有满足 distinct semantics (不同语义)的 aggregate operator (聚合运算符)时,它将适用. |
| `spark.sql.hive.convertMetastoreParquet` | true | 当设置为 false 时,Spark SQL 将使用 Hive SerDe 作为 parquet tables,而不是内置的支持. |
| `spark.sql.parquet.mergeSchema` | false | 当为 true 时,Parquet data source (Parquet 数据源) merges (合并)从所有 data files (数据文件)收集的 schemas,否则如果没有可用的 summary file,则从 summary file 或 random data file 中挑选 schema . |
| `spark.sql.optimizer.metadataOnly` | true | 如果为 true,则启用使用表的 metadata 的 metadata-only query optimization 来生成 partition columns (分区列)而不是 table scans (表扫描). 当 scanned (扫描)的所有 columns (列)都是 partition columns (分区列)并且 query (查询)具有满足 distinct semantics (不同语义)的 aggregate operator (聚合运算符)时,它将适用. |
## JSON Datasets (JSON 数据集)
......@@ -2192,7 +2192,7 @@ anotherPeople.show();
<small>Find full example code at "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" in the Spark repo.</small>
Spark SQL 可以 automatically infer (自动推断)JSON dataset 的 schema ,并将其作为 DataFrame 加载. 可以使用 JSON 文件中的 `SparkSession.read.json` 进行此 conversion (转换).
Spark SQL 可以 automatically infer (自动推断)JSON dataset 的 schema,并将其作为 DataFrame 加载. 可以使用 JSON 文件中的 `SparkSession.read.json` 进行此 conversion (转换).
请注意,以 _a json file_ 提供的文件不是典型的 JSON 文件. 每行必须包含一个 separate (单独的),self-contained valid (独立的有效的)JSON 对象. 有关更多信息,请参阅 [JSON Lines text format, also called newline-delimited JSON](http://jsonlines.org/)
......@@ -2241,7 +2241,7 @@ otherPeople.show()
<small>Find full example code at "examples/src/main/python/sql/datasource.py" in the Spark repo.</small>
Spark SQL 可以 automatically infer (自动推断)JSON dataset 的 schema ,并将其作为 DataFrame 加载. 使用 `read.json()` 函数,它从 JSON 文件的目录中加载数据,其中每一行文件都是一个 JSON 对象.
Spark SQL 可以 automatically infer (自动推断)JSON dataset 的 schema,并将其作为 DataFrame 加载. 使用 `read.json()` 函数,它从 JSON 文件的目录中加载数据,其中每一行文件都是一个 JSON 对象.
请注意,以 _a json file_ 提供的文件不是典型的 JSON 文件. 每行必须包含一个 separate (单独的),self-contained valid (独立的有效的)JSON 对象. 有关更多信息,请参阅 [JSON Lines text format, also called newline-delimited JSON](http://jsonlines.org/).
......@@ -3081,7 +3081,7 @@ Spark SQL 在设计时就考虑到了和 Hive metastore,SerDes 以及 UDF 之
#### 在现有的 Hive Warehouses 中部署
Spark SQL Thrift JDBC server 采用了开箱即用的设计以兼容已有的 Hive 安装版本。你不需要修改现有的 Hive Metastore ,或者改变数据的位置和表的分区。
Spark SQL Thrift JDBC server 采用了开箱即用的设计以兼容已有的 Hive 安装版本。你不需要修改现有的 Hive Metastore,或者改变数据的位置和表的分区。
### 所支持的 Hive 特性
......
......@@ -31,7 +31,7 @@
# 概述
GraphX 是 Spark 中用于图形和图形并行计算的新组件。在高层次上, GraphX 通过引入一个新的[图形](#property_graph)抽象来扩展 Spark [RDD](api/scala/index.html#org.apache.spark.rdd.RDD) :一种具有附加到每个顶点和边缘的属性的定向多重图形。为了支持图计算,GraphX 公开了一组基本运算符(例如: [subgraph](#structural_operators) [joinVertices](#join_operators)[aggregateMessages](#aggregateMessages) )以及 [Pregel](#pregel) API 的优化变体。此外,GraphX 还包括越来越多的图形[算法](#graph_algorithms)[构建器](#graph_builders),以简化图形分析任务。
GraphX 是 Spark 中用于图形和图形并行计算的新组件。在高层次上, GraphX 通过引入一个新的[图形](#property_graph)抽象来扩展 Spark [RDD](api/scala/index.html#org.apache.spark.rdd.RDD) :一种具有附加到每个顶点和边缘的属性的定向多重图形。为了支持图计算,GraphX 公开了一组基本运算符(例如: [subgraph](#structural_operators)[joinVertices](#join_operators)[aggregateMessages](#aggregateMessages) )以及 [Pregel](#pregel) API 的优化变体。此外,GraphX 还包括越来越多的图形[算法](#graph_algorithms)[构建器](#graph_builders),以简化图形分析任务。
# 入门
......@@ -56,7 +56,7 @@ import org.apache.spark.rdd.RDD
属性图是通过 vertex (`VD`)和 edge (`ED`) 类型进行参数化的。这些是分别与每个顶点和边缘相关联的对象的类型。
> 当它们是原始数据类型(例如: int ,double 等等)时,GraphX 优化顶点和边缘类型的表示,通过将其存储在专门的数组中来减少内存占用。
> 当它们是原始数据类型(例如: int,double 等等)时,GraphX 优化顶点和边缘类型的表示,通过将其存储在专门的数组中来减少内存占用。
在某些情况下,可能希望在同一个图形中具有不同属性类型的顶点。这可以通过继承来实现。例如,将用户和产品建模为二分图,我们可能会执行以下操作:
......@@ -144,7 +144,7 @@ graph.edges.filter(e => e.srcId > e.dstId).count
> 注意, `graph.vertices` 返回一个 `VertexRDD[(String, String)]` 扩展 `RDD[(VertexId, (String, String))]` ,所以我们使用 scala `case` 表达式来解构元组。另一方面, `graph.edges` 返回一个 `EdgeRDD` 包含 `Edge[String]` 对象。我们也可以使用 case 类型构造函数,如下所示:
> 注意, `graph.vertices` 返回一个 `VertexRDD[(String, String)]` 扩展 `RDD[(VertexId, (String, String))]`,所以我们使用 scala `case` 表达式来解构元组。另一方面, `graph.edges` 返回一个 `EdgeRDD` 包含 `Edge[String]` 对象。我们也可以使用 case 类型构造函数,如下所示:
......@@ -418,7 +418,7 @@ class Graph[VD, ED] {
[`joinVertices`](api/scala/index.html#org.apache.spark.graphx.GraphOps@joinVertices[U](RDD[(VertexId,U)])((VertexId,VD,U)⇒VD)(ClassTag[U]):Graph[VD,ED]) 操作符将顶点与输入 RDD 相连,并返回一个新的图形,其中通过将用户定义的 `map` 函数应用于已连接顶点的结果而获得的顶点属性。RDD 中没有匹配值的顶点保留其原始值。
> 请注意,如果 RDD 包含给定顶点的多个值,则只能使用一个值。因此,建议使用以下命令使输入 RDD 变得独一无二,这也将对结果值进行 _pre-index_ ,以显着加速后续连接。
> 请注意,如果 RDD 包含给定顶点的多个值,则只能使用一个值。因此,建议使用以下命令使输入 RDD 变得独一无二,这也将对结果值进行 _pre-index_,以显着加速后续连接。
......@@ -448,7 +448,7 @@ val degreeGraph = graph.outerJoinVertices(outDegrees) { (id, oldAttr, outDegOpt)
> 您可能已经注意到上述示例中使用的多个参数列表(例如: `f(a)(b)` curried 函数模式。虽然我们可以将 `f(a)(b)` 同样地写成 `f(a,b)` ,这意味着 `b` 上的类型推断不依赖于 `a`。因此,用户需要为用户定义的函数提供类型注释:
> 您可能已经注意到上述示例中使用的多个参数列表(例如: `f(a)(b)` curried 函数模式。虽然我们可以将 `f(a)(b)` 同样地写成 `f(a,b)`,这意味着 `b` 上的类型推断不依赖于 `a`。因此,用户需要为用户定义的函数提供类型注释:
......@@ -461,7 +461,7 @@ val joinedGraph = graph.joinVertices(uniqueCosts,
## 邻域聚合
许多图形分析任务的关键步骤是聚合关于每个顶点邻域的信息。例如,我们可能想知道每个用户拥有的关注者数量或每个用户的追随者的平均年龄。许多迭代图表算法(例如:网页级别,最短路径,以及连接成分)相邻顶点(例如:电流值的 PageRank ,最短到源路径,和最小可达顶点 ID )的重复聚合性质。
许多图形分析任务的关键步骤是聚合关于每个顶点邻域的信息。例如,我们可能想知道每个用户拥有的关注者数量或每个用户的追随者的平均年龄。许多迭代图表算法(例如:网页级别,最短路径,以及连接成分)相邻顶点(例如:电流值的 PageRank,最短到源路径,和最小可达顶点 ID )的重复聚合性质。
> 为了提高性能,主聚合操作员 `graph.mapReduceTriplets` 从新的更改 `graph.AggregateMessages`。虽然 API 的变化相对较小,但我们在下面提供了一个转换指南。
......@@ -483,9 +483,9 @@ class Graph[VD, ED] {
用户定义的 `sendMsg` 函数接受一个 [`EdgeContext`](api/scala/index.html#org.apache.spark.graphx.EdgeContext) ,它将源和目标属性以及 edge 属性和函数 ([`sendToSrc`](api/scala/index.html#org.apache.spark.graphx.EdgeContext@sendToSrc(msg:A):Unit),和 [`sendToDst`](api/scala/index.html#org.apache.spark.graphx.EdgeContext@sendToDst(msg:A):Unit)) 一起发送到源和目标属性。在 map-reduce 中,将 `sendMsg` 作为 _map_ 函数。用户定义的 `mergeMsg` 函数需要两个发往同一顶点的消息,并产生一条消息。想想 `mergeMsg` 是 map-reduce 中的_reduce_ 函数。[`aggregateMessages`](api/scala/index.html#org.apache.spark.graphx.Graph@aggregateMessages[A]((EdgeContext[VD,ED,A])⇒Unit,(A,A)⇒A,TripletFields)(ClassTag[A]):VertexRDD[A]) 运算符返回一个 `VertexRDD[Msg]` ,其中包含去往每个顶点的聚合消息(Msg类型)。没有收到消息的顶点不包括在返回的 `VertexRDD`[VertexRDD](api/scala/index.html#org.apache.spark.graphx.VertexRDD) 中。
用户定义的 `sendMsg` 函数接受一个 [`EdgeContext`](api/scala/index.html#org.apache.spark.graphx.EdgeContext),它将源和目标属性以及 edge 属性和函数 ([`sendToSrc`](api/scala/index.html#org.apache.spark.graphx.EdgeContext@sendToSrc(msg:A):Unit),和 [`sendToDst`](api/scala/index.html#org.apache.spark.graphx.EdgeContext@sendToDst(msg:A):Unit)) 一起发送到源和目标属性。在 map-reduce 中,将 `sendMsg` 作为 _map_ 函数。用户定义的 `mergeMsg` 函数需要两个发往同一顶点的消息,并产生一条消息。想想 `mergeMsg` 是 map-reduce 中的_reduce_ 函数。[`aggregateMessages`](api/scala/index.html#org.apache.spark.graphx.Graph@aggregateMessages[A]((EdgeContext[VD,ED,A])⇒Unit,(A,A)⇒A,TripletFields)(ClassTag[A]):VertexRDD[A]) 运算符返回一个 `VertexRDD[Msg]`,其中包含去往每个顶点的聚合消息(Msg类型)。没有收到消息的顶点不包括在返回的 `VertexRDD`[VertexRDD](api/scala/index.html#org.apache.spark.graphx.VertexRDD) 中。
另外,[`aggregateMessages`](api/scala/index.html#org.apache.spark.graphx.Graph@aggregateMessages[A]((EdgeContext[VD,ED,A])⇒Unit,(A,A)⇒A,TripletFields)(ClassTag[A]):VertexRDD[A]) 采用一个可选的`tripletsFields` ,它们指示在 [`EdgeContext`](api/scala/index.html#org.apache.spark.graphx.EdgeContext) 中访问哪些数据(即源顶点属性,而不是目标顶点属性)。`tripletsFields` 定义的可能选项, [`TripletFields`](api/java/org/apache/spark/graphx/TripletFields.html) 默认值是 [`TripletFields.All`](api/java/org/apache/spark/graphx/TripletFields.html#All) 指示用户定义的 `sendMsg` 函数可以访问的任何字段[`EdgeContext`](api/scala/index.html#org.apache.spark.graphx.EdgeContext)。该 `tripletFields` 参数可用于通知 GraphX ,只有部分 [`EdgeContext`](api/scala/index.html#org.apache.spark.graphx.EdgeContext) 需要允许 GraphX 选择优化的连接策略。例如,如果我们计算每个用户的追随者的平均年龄,我们只需要源字段,因此我们将用于 [`TripletFields.Src`](api/java/org/apache/spark/graphx/TripletFields.html#Src) 表示我们只需要源字段。
另外,[`aggregateMessages`](api/scala/index.html#org.apache.spark.graphx.Graph@aggregateMessages[A]((EdgeContext[VD,ED,A])⇒Unit,(A,A)⇒A,TripletFields)(ClassTag[A]):VertexRDD[A]) 采用一个可选的`tripletsFields`,它们指示在 [`EdgeContext`](api/scala/index.html#org.apache.spark.graphx.EdgeContext) 中访问哪些数据(即源顶点属性,而不是目标顶点属性)。`tripletsFields` 定义的可能选项, [`TripletFields`](api/java/org/apache/spark/graphx/TripletFields.html) 默认值是 [`TripletFields.All`](api/java/org/apache/spark/graphx/TripletFields.html#All) 指示用户定义的 `sendMsg` 函数可以访问的任何字段[`EdgeContext`](api/scala/index.html#org.apache.spark.graphx.EdgeContext)。该 `tripletFields` 参数可用于通知 GraphX,只有部分 [`EdgeContext`](api/scala/index.html#org.apache.spark.graphx.EdgeContext) 需要允许 GraphX 选择优化的连接策略。例如,如果我们计算每个用户的追随者的平均年龄,我们只需要源字段,因此我们将用于 [`TripletFields.Src`](api/java/org/apache/spark/graphx/TripletFields.html#Src) 表示我们只需要源字段。
> 在早期版本的 GraphX 中,我们使用字节码检测来推断, [`TripletFields`](api/java/org/apache/spark/graphx/TripletFields.html) 但是我们发现字节码检查稍微不可靠,而是选择了更明确的用户控制。
......@@ -540,7 +540,7 @@ class Graph[VD, ED] {
`mapReduceTriplets` 操作符接受用户定义的映射函数,该函数应用于每个三元组,并且可以使用用户定义的缩减函数来生成聚合的_消息_。然而,我们发现返回的迭代器的用户是昂贵的,并且它阻止了我们应用其他优化(例如:局部顶点重新编号)的能力。在 [`aggregateMessages`](api/scala/index.html#org.apache.spark.graphx.Graph@aggregateMessages[A]((EdgeContext[VD,ED,A])⇒Unit,(A,A)⇒A,TripletFields)(ClassTag[A]):VertexRDD[A]) 中,我们引入了 EdgeContext ,它暴露了三元组字段,并且还显示了向源和目标顶点发送消息的功能。此外,我们删除了字节码检查,而是要求用户指出三元组中实际需要哪些字段。
`mapReduceTriplets` 操作符接受用户定义的映射函数,该函数应用于每个三元组,并且可以使用用户定义的缩减函数来生成聚合的_消息_。然而,我们发现返回的迭代器的用户是昂贵的,并且它阻止了我们应用其他优化(例如:局部顶点重新编号)的能力。在 [`aggregateMessages`](api/scala/index.html#org.apache.spark.graphx.Graph@aggregateMessages[A]((EdgeContext[VD,ED,A])⇒Unit,(A,A)⇒A,TripletFields)(ClassTag[A]):VertexRDD[A]) 中,我们引入了 EdgeContext,它暴露了三元组字段,并且还显示了向源和目标顶点发送消息的功能。此外,我们删除了字节码检查,而是要求用户指出三元组中实际需要哪些字段。
以下代码块使用 `mapReduceTriplets`
......@@ -612,7 +612,7 @@ class GraphOps[VD, ED] {
在 Spark 中,默认情况下,RDD 不会保留在内存中。为了避免重新计算,在多次使用它们时,必须明确缓存它们(参见 [Spark Programming Guide](programming-guide.html#rdd-persistence) )。GraphX 中的图形表现方式相同。**当多次使用图表时,请务必先调用 [`Graph.cache()`](api/scala/index.html#org.apache.spark.graphx.Graph@cache():Graph[VD,ED])。**
在迭代计算中,_uncaching_ 也可能是最佳性能所必需的。默认情况下,缓存的 RDD 和图形将保留在内存中,直到内存压力迫使它们以 LRU 顺序逐出。对于迭代计算,来自先前迭代的中间结果将填满缓存。虽然它们最终被驱逐出来,但存储在内存中的不必要的数据会减慢垃圾收集速度。一旦不再需要中间结果,就会更有效率。这涉及每次迭代实现(缓存和强制)图形或 RDD ,取消所有其他数据集,并且仅在将来的迭代中使用实例化数据集。然而,由于图形由多个 RDD 组成,所以很难将它们正确地分开。**对于迭代计算,我们建议使用Pregel API,它可以正确地解析中间结果。**
在迭代计算中,_uncaching_ 也可能是最佳性能所必需的。默认情况下,缓存的 RDD 和图形将保留在内存中,直到内存压力迫使它们以 LRU 顺序逐出。对于迭代计算,来自先前迭代的中间结果将填满缓存。虽然它们最终被驱逐出来,但存储在内存中的不必要的数据会减慢垃圾收集速度。一旦不再需要中间结果,就会更有效率。这涉及每次迭代实现(缓存和强制)图形或 RDD,取消所有其他数据集,并且仅在将来的迭代中使用实例化数据集。然而,由于图形由多个 RDD 组成,所以很难将它们正确地分开。**对于迭代计算,我们建议使用Pregel API,它可以正确地解析中间结果。**
# Pregel API
......@@ -622,7 +622,7 @@ class GraphOps[VD, ED] {
> 注意,与更多的标准 Pregel 实现不同,GraphX 中的顶点只能将消息发送到相邻顶点,并且使用用户定义的消息传递功能并行完成消息构造。这些约束允许在 GraphX 中进行额外优化。
以下是 [Pregel 运算符](api/scala/index.html#org.apache.spark.graphx.GraphOps@pregel[A](A,Int,EdgeDirection)((VertexId,VD,A)⇒VD,(EdgeTriplet[VD,ED])⇒Iterator[(VertexId,A)],(A,A)⇒A)(ClassTag[A]):Graph[VD,ED]) 的类型签名以及 其实现的_草图_(注意:为了避免由于长谱系链引起的 stackOverflowError , pregel 支持周期性检查点图和消息,将 “spark.graphx.pregel.checkpointInterval” 设置为正数,说10。并使用 SparkContext.setCheckpointDir(directory: String)) 设置 checkpoint 目录):
以下是 [Pregel 运算符](api/scala/index.html#org.apache.spark.graphx.GraphOps@pregel[A](A,Int,EdgeDirection)((VertexId,VD,A)⇒VD,(EdgeTriplet[VD,ED])⇒Iterator[(VertexId,A)],(A,A)⇒A)(ClassTag[A]):Graph[VD,ED]) 的类型签名以及 其实现的_草图_(注意:为了避免由于长谱系链引起的 stackOverflowError, pregel 支持周期性检查点图和消息,将 “spark.graphx.pregel.checkpointInterval” 设置为正数,说10。并使用 SparkContext.setCheckpointDir(directory: String)) 设置 checkpoint 目录):
......@@ -714,7 +714,7 @@ object GraphLoader {
[`GraphLoader.edgeListFile`](api/scala/index.html#org.apache.spark.graphx.GraphLoader$@edgeListFile(SparkContext,String,Boolean,Int):Graph[Int,Int]) 提供了从磁盘边缘列表中加载图形的方法。它解析以下形式的(源顶点 ID ,目标顶点 ID )对的邻接列表,跳过以下开始的注释行 `#`
[`GraphLoader.edgeListFile`](api/scala/index.html#org.apache.spark.graphx.GraphLoader$@edgeListFile(SparkContext,String,Boolean,Int):Graph[Int,Int]) 提供了从磁盘边缘列表中加载图形的方法。它解析以下形式的(源顶点 ID,目标顶点 ID )对的邻接列表,跳过以下开始的注释行 `#`
```
# This is a comment
......@@ -849,7 +849,7 @@ PageRank 测量在图中每个顶点的重要性,假设从边缘 _u_ 到 _v_
GraphX 附带了 PageRank 的静态和动态实现方法作[`PageRank 对象`](api/scala/index.html#org.apache.spark.graphx.lib.PageRank$)上的方法。静态 PageRank 运行固定次数的迭代,而动态 PageRank 运行直到排列收敛(即,停止改变超过指定的公差)。[`GraphOps`](api/scala/index.html#org.apache.spark.graphx.GraphOps) 允许直接调用这些算法作为方法 `Graph`
GraphX还包括一个可以运行 PageRank 的社交网络数据集示例。给出了一组用户 `data/graphx/users.txt` ,并给出了一组用户之间的关系 `data/graphx/followers.txt`。我们计算每个用户的 PageRank 如下:
GraphX还包括一个可以运行 PageRank 的社交网络数据集示例。给出了一组用户 `data/graphx/users.txt`,并给出了一组用户之间的关系 `data/graphx/followers.txt`。我们计算每个用户的 PageRank 如下:
```
import org.apache.spark.graphx.GraphLoader
......@@ -928,7 +928,7 @@ println(triCountByUsername.collect().mkString("\n"))
# 示例
假设我想从一些文本文件中构建图形,将图形限制为重要的关系和用户,在 sub-graph 上运行 page-rank ,然后返回与顶级用户关联的属性。我可以用 GraphX 在几行内完成所有这些:
假设我想从一些文本文件中构建图形,将图形限制为重要的关系和用户,在 sub-graph 上运行 page-rank,然后返回与顶级用户关联的属性。我可以用 GraphX 在几行内完成所有这些:
```
import org.apache.spark.graphx.GraphLoader
......
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册