提交 acdb2fe1 编写于 作者: 取昵称好难啊's avatar 取昵称好难啊

Add space and fix markdown syntax

上级 ec47ea3c
......@@ -4,13 +4,13 @@ Apache Spark 是一个快速的,多用途的集群计算系统。它提供了
# 下载
从该项目官网的 [下载页面](http://spark.apache.org/downloads.html) 获取 Spark。该文档用于 Spark 2.2.0 版本。Spark 可以通过 Hadoop client 库使用 HDFS 和 YARN。下载一个预编译主流Hadoop版本比较麻烦。用户可以下载一个编译好的 Hadoop 版本,并且可以 通过[设置 Spark 的 classpath](hadoop-provided.html) 来与任何的 Hadoop 版本一起运行 Spark。Scala 和 Java 用户可以在他们的工程中通过Maven的方式引入 Spark,并且在将来 Python 用户也可以从 PyPI 中安装 Spark。
从该项目官网的 [下载页面](http://spark.apache.org/downloads.html) 获取 Spark。该文档用于 Spark 2.2.0 版本。Spark 可以通过 Hadoop client 库使用 HDFS 和 YARN。下载一个预编译主流 Hadoop 版本比较麻烦。用户可以下载一个编译好的 Hadoop 版本,并且可以 通过[设置 Spark 的 classpath](hadoop-provided.html) 来与任何的 Hadoop 版本一起运行 Spark。Scala 和 Java 用户可以在他们的工程中通过 Maven 的方式引入 Spark,并且在将来 Python 用户也可以从 PyPI 中安装 Spark。
如果您希望从源码中编译一个Spark,请访问 [编译 Spark](building-spark.html)
Spark 可以在 windows 和 unix 类似的系统(例如,Linux,Mac OS)上运行。它可以很容易的在一台本地机器上运行 -你只需要安装一个 JAVA 环境并配置 PATH 环境变量,或者让 JAVA_HOME 指向你的 JAVA 安装路径
Spark 可运行在 Java 8+,Python 2.7+/3.4+ 和 R 3.1+ 的环境上。针对 Scala API,Spark 2.2.0 使用了 Scala 2.11\。您将需要去使用一个可兼容的 Scala 版本 (2.11.x)
Spark 可运行在 Java 8+,Python 2.7+/3.4+ 和 R 3.1+ 的环境上。针对 Scala API,Spark 2.2.0 使用了 Scala 2.11\。您将需要去使用一个可兼容的 Scala 版本(2.11.x)
请注意,从 Spark 2.2.0 起,对 Java 7,Python 2.6 和旧的 Hadoop 2.6.5 之前版本的支持均已被删除。
......
......@@ -24,7 +24,7 @@ Spark 应用在集群上作为独立的进程组来运行,在您的 main 程
* [Standalone](spark-standalone.html) – 包含在 Spark 中使得它更容易来安装集群的一个简单的 Cluster Manager。
* [Apache Mesos](running-on-mesos.html) – 一个通用的 Cluster Manager,它也可以运行 Hadoop MapReduce 和其它服务应用。
* [Hadoop YARN](running-on-yarn.html) –Hadoop 2 中的 resource manager(资源管理器)。
* [Hadoop YARN](running-on-yarn.html) Hadoop 2 中的 resource manager(资源管理器)。
* [Kubernetes (experimental)](https://github.com/apache-spark-on-k8s/spark) – 除了上述之外,还有 Kubernetes 的实验支持。Kubernetes 提供以容器为中心的基础设施的开源平台。Kubernetes 的支持正在 apache-spark-on-k8s Github 组织中积极开发。有关文档,请参阅该项目的 README。
# 提交应用程序
......
......@@ -26,8 +26,8 @@
一些常用的 options(选项)有 :
* `--class`:您的应用程序的入口点(例如 `org.apache.spark.examples.SparkPi`)
* `--master`:集群的 [master URL](#master-urls) (例如 `spark://23.195.26.187:7077`)
* `--deploy-mode`:是在 worker 节点(`cluster`) 上还是在本地作为一个外部的客户端(`client`) 部署您的 driver(默认:`client`) **†**
* `--master`:集群的 [master URL](#master-urls)(例如 `spark://23.195.26.187:7077`
* `--deploy-mode`:是在 worker 节点`cluster`)上还是在本地作为一个外部的客户端(`client`)部署您的 driver(默认:`client` **†**
* `--conf`:按照 key=value 格式任意的 Spark 配置属性。对于包含空格的 value(值)使用引号包 “key=value” 起来。
* `application-jar`:包括您的应用以及所有依赖的一个打包的 Jar 的路径。该 URL 在您的集群上必须是全局可见的,例如,一个 `hdfs://` path 或者一个 `file://` 在所有节点是可见的。
* `application-arguments`:传递到您的 main class 的 main 方法的参数,如果有的话。
......@@ -105,7 +105,7 @@ export HADOOP_CONF_DIR=XXX
| --- | --- |
| `local` | 使用一个线程本地运行 Spark(即,没有并行性)。 |
| `local[K]` | 使用 K 个 worker 线程本地运行 Spark(理想情况下,设置这个值的数量为您机器的 core 数量)。 |
| `local[K,F]` | 使用 K 个 worker 线程本地运行 Spark并允许最多失败 F次 (查阅 [spark.task.maxFailures](configuration.html#scheduling) 以获取对该变量的解释) |
| `local[K,F]` | 使用 K 个 worker 线程本地运行 Spark并允许最多失败 F次(查阅 [spark.task.maxFailures](configuration.html#scheduling) 以获取对该变量的解释)|
| `local[*]` | 使用更多的 worker 线程作为逻辑的 core 在您的机器上来本地的运行 Spark。 |
| `local[*,F]` | 使用更多的 worker 线程作为逻辑的 core 在您的机器上来本地的运行 Spark并允许最多失败 F次。 |
| `spark://HOST:PORT` | 连接至给定的 [Spark standalone cluster](spark-standalone.html) master. master。该 port(端口)必须有一个作为您的 master 配置来使用,默认是 7077。 |
......@@ -129,11 +129,11 @@ Spark 使用下面的 URL 格式以允许传播 jar 时使用不同的策略 :
* **file:** - 绝对路径和 `file:/` URI 通过 driver 的 HTTP file server 提供服务,并且每个 executor 会从 driver 的 HTTP server 拉取这些文件。
* **hdfs:****http:****https:****ftp:** - 如预期的一样拉取下载文件和 JAR
* **local:** - 一个用 local:/ 开头的 URL 预期作在每个 worker 节点上作为一个本地文件存在。这样意味着没有网络 IO 发生,并且非常适用于那些已经被推送到每个 worker 或通过 NFS,GlusterFS等共享的大型的 file/JAR。
* **local:** - 一个用 local:/ 开头的 URL 预期作在每个 worker 节点上作为一个本地文件存在。这样意味着没有网络 IO 发生,并且非常适用于那些已经被推送到每个 worker 或通过 NFS,GlusterFS 等共享的大型的 file/JAR。
N注意,那些 JAR 和文件被复制到 working directory(工作目录)用于在 executor 节点上的每个 SparkContext。这可以使用最多的空间显著量随着时间的推移,将需要清理。在 Spark On YARN 模式中,自动执行清理操作。在 Spark standalone 模式中,可以通过配置 `spark.worker.cleanup.appDataTtl` 属性来执行自动清理。
N 注意,那些 JAR 和文件被复制到 working directory(工作目录)用于在 executor 节点上的每个 SparkContext。这可以使用最多的空间显著量随着时间的推移,将需要清理。在 Spark On YARN 模式中,自动执行清理操作。在 Spark standalone 模式中,可以通过配置 `spark.worker.cleanup.appDataTtl` 属性来执行自动清理。
用户也可以通过使用 `--packages`来提供一个逗号分隔的 maven coordinates(maven 坐标)以包含任何其它的依赖。在使用这个命令时所有可传递的依赖将被处理。其它的 repository(或者在 SBT 中被解析的)可以使用 `--repositories`该标记添加到一个逗号分隔的样式中。(注意,对于那些设置了密码保护的库,在一些情况下可以在库URL中提供验证信息,例如 `https://user:password@host/...`。以这种方式提供验证信息需要小心。) 这些命令可以与 `pyspark``spark-shell``spark-submit` 配置会使用以包含 Spark Packages(Spark 包)。对于 Python 来说,也可以使用 `--py-files` 选项用于分发 `.egg``.zip``.py` libraries 到 executor 中。
用户也可以通过使用 `--packages`来提供一个逗号分隔的 maven coordinates(maven 坐标)以包含任何其它的依赖。在使用这个命令时所有可传递的依赖将被处理。其它的 repository(或者在 SBT 中被解析的)可以使用 `--repositories`该标记添加到一个逗号分隔的样式中。(注意,对于那些设置了密码保护的库,在一些情况下可以在库URL中提供验证信息,例如 `https://user:password@host/...`。以这种方式提供验证信息需要小心。)这些命令可以与 `pyspark``spark-shell``spark-submit` 配置会使用以包含 Spark Packages(Spark 包)。对于 Python 来说,也可以使用 `--py-files` 选项用于分发 `.egg``.zip``.py` libraries 到 executor 中。
# 更多信息
......
......@@ -76,7 +76,7 @@ Spark 除了运行在 Mesos 或者 YARN 上以外,Spark 还提供了一个简
| `SPARK_MASTER_OPTS` | 仅应用到 master 上的配置属性,格式是 "-Dx=y"(默认是:none)。查看下面的列表可能选项。 |
| `SPARK_LOCAL_DIRS` | Spark 中 "scratch" space(暂存空间)的目录,包括 map 的输出文件和存储在磁盘上的 RDDs。这必须在您的系统中的一个快速的,本地的磁盘上。这也可以是逗号分隔的不同磁盘上的多个目录的列表。 |
| `SPARK_WORKER_CORES` | 机器上 Spark 应用程序可以使用的全部的 cores(核)的数量。(默认:全部的核可用)|
| `SPARK_WORKER_MEMORY` | 机器上 Spark 应用程序可以使用的全部的内存数量,例如 `1000m``2g`(默认:全部的内存减去 1 GB);注意每个应用程序的_individual(独立)_内存是使用 `spark.executor.memory` 属性进行配置的。 |
| `SPARK_WORKER_MEMORY` | 机器上 Spark 应用程序可以使用的全部的内存数量,例如 `1000m``2g`(默认:全部的内存减去 1 GB);注意每个应用程序的 _individual(独立)_ 内存是使用 `spark.executor.memory` 属性进行配置的。 |
| `SPARK_WORKER_PORT` | 在一个指定的 port(端口)上启动 Spark worker(默认: random(随机))|
| `SPARK_WORKER_WEBUI_PORT` | worker 的 web UI 的 Port(端口)(默认:8081)|
| `SPARK_WORKER_DIR` | 运行应用程序的目录,这个目录中包含日志和暂存空间(default:SPARK_HOME/work)|
......@@ -166,7 +166,7 @@ Spark 的 standalone 模式提供了一个基于 web 的用户接口来监控集
# 与 Hadoop 集成
您可以运行 Spark 集成到您现有的 Hadoop 集群,只需在同一台机器上将其作为单独的服务启动。要从 Spark 访问 Hadoop 的数据,只需要使用 hdfs:// URL (通常为 `hdfs://<namenode>:9000/path`,但是您可以在您的 Hadoop Namenode 的 web UI 中找到正确的 URL。) 或者,您可以为 Spark 设置一个单独的集群,并且仍然可以通过网络访问 HDFS ;这将比磁盘本地访问速度慢,但是如果您仍然在同一个局域网中运行(例如,您将 Hadoop 上的每个机架放置几台 Spark 机器),可能不会引起关注。
您可以运行 Spark 集成到您现有的 Hadoop 集群,只需在同一台机器上将其作为单独的服务启动。要从 Spark 访问 Hadoop 的数据,只需要使用 hdfs:// URL(通常为 `hdfs://<namenode>:9000/path`,但是您可以在您的 Hadoop Namenode 的 web UI 中找到正确的 URL。)或者,您可以为 Spark 设置一个单独的集群,并且仍然可以通过网络访问 HDFS ;这将比磁盘本地访问速度慢,但是如果您仍然在同一个局域网中运行(例如,您将 Hadoop 上的每个机架放置几台 Spark 机器),可能不会引起关注。
# 配置网络安全端口
......@@ -196,7 +196,7 @@ Spark 对网络的需求比较高,并且一些环境对于使用严格的防
为了调度新的应用程序或者添加新的 Worker 到集群中,他们需要知道当前的 leader 的 IP 地址。这可以通过简单地传递一个您在一个单一的进程中传递的 Masters 的列表来完成。例如,您可以启动您的 SparkContext 指向 `spark://host1:port1,host2:port2`。这将导致您的 SparkContext 尝试去注册两个 Masters – 如果 `host1` 宕掉,这个配置仍然是正确地,因为我们将会发现新的 leader `host2`
在 “registering with a Master(使用 Master 注册)” 与正常操作之间有一个重要的区别。当启动的时候,一个应用程序或者 Worker 需要使用当前的 lead Master 找到并且注册。一旦它成功注册,它就是 “in the system(在系统中)”了(即存储在了 ZooKeeper 中)。如果发生故障切换,新的 leader 将会联系所有值钱已经注册的应用程序和 Workers 来通知他们领导层的变化,所以他们甚至不知道新的 Master 在启动时的存在。
在 “registering with a Master(使用 Master 注册)” 与正常操作之间有一个重要的区别。当启动的时候,一个应用程序或者 Worker 需要使用当前的 lead Master 找到并且注册。一旦它成功注册,它就是 “in the system(在系统中)” 了(即存储在了 ZooKeeper 中)。如果发生故障切换,新的 leader 将会联系所有值钱已经注册的应用程序和 Workers 来通知他们领导层的变化,所以他们甚至不知道新的 Master 在启动时的存在。
由于这个属性,新的 Masters 可以在任何时间创建,唯一需要担心的是,_new_ 应用程序和 Workers 可以找到它注册,以防其成为 leader。一旦注册了之后,您将被照顾。
......@@ -219,4 +219,4 @@ ZooKeeper 是生产级别的高可用性的最佳方法,但是如果您只是
* 该解决方案可以与像 [monit](http://mmonit.com/monit/) 这样的过程 monitor/manager 一起使用,或者只是通过重新启动手动恢复。
* 尽管文件系统恢复似乎比完全没有任何恢复更好,但是对于某些特定的开发或者实验目的,此模式可能不太适合。特别是,通过 stop-master.sh 杀死 master 并不会清除其恢复状态,所以每当重新启动一个新的 Master 时,它将进入恢复模式。如果需要等待所有先前注册的 Worker/clients 超时,这可能会将启动时间增加 1 分钟。
* 虽然没有正式的支持,你也可以挂载 NFS 目录作为恢复目录。如果 original Master w安全地死亡,则您可以在不同的节点上启动 Master,这将正确恢复所有以前注册的 Workers/applications(相当于 ZooKeeper 恢复)。然而,未来的应用程序必须能够找到新的 Master 才能注册。
\ No newline at end of file
* 虽然没有正式的支持,你也可以挂载 NFS 目录作为恢复目录。如果 original Master w 安全地死亡,则您可以在不同的节点上启动 Master,这将正确恢复所有以前注册的 Workers/applications(相当于 ZooKeeper 恢复)。然而,未来的应用程序必须能够找到新的 Master 才能注册。
\ No newline at end of file
......@@ -71,7 +71,7 @@ yarn logs -applicationId <app ID>
将打印来自给定的应用程序的所有容器(containers)的所有的日志文件的内容。你还可以使用 HDFS shell 或者 API 直接在 HDFS 中查看容器日志文件(container log files)。可以通过查看您的 YARN 配置(`yarn.nodemanager.remote-app-log-dir``yarn.nodemanager.remote-app-log-dir-suffix`)找到它们所在的目录。日志还可以在 Spark Web UI 的 “执行程序(Executors)”选项卡下找到。您需要同时运行 Spark 历史记录服务器(Spark history server)和 MapReduce 历史记录服务器(MapReduce history server),并在 `yarn-site.xm`l 文件中正确配置 `yarn.log.server.url`。Spark 历史记录服务器 UI 上的日志将重定向您到 MapReduce 历史记录服务器以显示聚合日志(aggregated logs)。
当未启用日志聚合时,日志将在每台计算机上的本地保留在 `YARN_APP_LOGS_DIR 目录下`,通常配置为 `/tmp/logs` 或者 `$HADOOP_HOME/logs/userlogs`,具体取决于 Hadoop 版本和安装。查看容器(container)的日志需要转到包含它们的主机并在此目录中查看它们。子目录根据应用程序 ID(application ID)和 容器 ID(container ID)组织日志文件。日志还可以在 Spark Web UI 的 “执行程序(Executors)”选项卡下找到,并且不需要运行 MapReduce history server。
当未启用日志聚合时,日志将在每台计算机上的本地保留在 `YARN_APP_LOGS_DIR 目录下`,通常配置为 `/tmp/logs` 或者 `$HADOOP_HOME/logs/userlogs`,具体取决于 Hadoop 版本和安装。查看容器(container)的日志需要转到包含它们的主机并在此目录中查看它们。子目录根据应用程序 ID(application ID)和 容器 ID(container ID)组织日志文件。日志还可以在 Spark Web UI 的 “执行程序(Executors)” 选项卡下找到,并且不需要运行 MapReduce history server。
要查看每个 container(容器)的启动环境,请将 `yarn.nodemanager.delete.debug-delay-sec` 增加到一个较大的值(例如 `36000`),然后通过 `yarn.nodemanager.local-dirs` 访问应用程序缓存,在容器启动的节点上。此目录包含启动脚本(launch script),JARs,和用于启动每个容器的所有的环境变量。这个过程对于调试 classpath 问题特别有用。(请注意,启用此功能需要集群设置的管理员权限并且还需要重新启动所有的 node managers,因此这不适用于托管集群)。
......@@ -94,10 +94,10 @@ To use a custom metrics.properties for the application master and executors, upd
| Property Name(属性名称)| Default(默认值)| Meaning(含义)|
| --- | --- | --- |
| `spark.yarn.am.memory` | `512m` | 在客户端模式下用于 YARN Application Master 的内存量,与 JVM 内存字符串格式相同 (e.g. `512m``2g`)。在 `cluster` 集群模式下,使用 `spark.driver.memory` 代替.使用小写字母尾后缀,例如. `k``m``g``t`,and `p`,分别表示 kibi-,mebi-,gibi-,tebi- 和 pebibytes,respectively。 |
| `spark.yarn.am.memory` | `512m` | 在客户端模式下用于 YARN Application Master 的内存量,与 JVM 内存字符串格式相同(例如,`512m``2g`。在 `cluster` 集群模式下,使用 `spark.driver.memory` 代替.使用小写字母尾后缀,例如. `k``m``g``t`,and `p`,分别表示 kibi-,mebi-,gibi-,tebi- 和 pebibytes,respectively。 |
| `spark.yarn.am.cores` | `1` | 在客户端模式下用于 YARN Application Master 的核数。在集群模式下,请改用 `spark.driver.cores` 代替。 |
| `spark.yarn.am.waitTime` | `100s` | 在 `cluster 集群` 模式中,YARN Application Master 等待 SparkContext 被初始化的时间。在 `client 客户端` 模式中,YARN Application Master 等待 driver 连接它的时间。 |
| `spark.yarn.submit.file.replication` | 默认的 HDFS 副本数 (通常是 `3`) | 用于应用程序上传到 HDFS 的文件的 HDFS 副本级别。这些包括诸如 Spark jar,app jar,和任何分布式缓存 files/archives 之类的东西。 |
| `spark.yarn.submit.file.replication` | 默认的 HDFS 副本数(通常是 `3` | 用于应用程序上传到 HDFS 的文件的 HDFS 副本级别。这些包括诸如 Spark jar,app jar,和任何分布式缓存 files/archives 之类的东西。 |
| `spark.yarn.stagingDir` | 文件系统中当前用户的主目录 | 提交应用程序时使用的临时目录。 |
| `spark.yarn.preserve.staging.files` | `false` | 设置为 `true` 以便在作业结束保留暂存文件(Spark jar,app jar,分布式缓存文件),而不是删除它们。 |
| `spark.yarn.scheduler.heartbeat.interval-ms` | `3000` | Spark application master 心跳到 YARN ResourceManager 中的间隔(以毫秒为单位)。该值的上限为到期时间间隔的 YARN 配置值的一半,即 `yarn.am.liveness-monitor.expiry-interval-ms`。 |
......
......@@ -329,7 +329,7 @@ It also allows a different address from the local one to be advertised to execut
| `spark.acls.enable` | false | 是否开启 Spark acls。如果开启了,它检查用户是否有权限去查看或修改 job。Note this requires the user to be known, so if the user comes across as null no checks are done. UI 利用使用过滤器验证和设置用户。 |
| `spark.admin.acls` | Empty | 逗号分隔的用户或者管理员列表,列表中的用户或管理员有查看和修改所有 Spark job 的权限。如果你运行在一个共享集群,有一组管理员或开发者帮助 debug,这个选项有用。 |
| `spark.admin.acls.groups` | Empty | 具有查看和修改对所有Spark作业的访问权限的组的逗号分隔列表。如果您有一组帮助维护和调试的 administrators 或 developers 可以使用此功能基础设施。在列表中输入 "*" 表示任何组中的任何用户都可以使用 admin 的特权。用户组是从 groups mapping provider 的实例获得的。由 `spark.user.groups.mapping` 指定。检查 entry `spark.user.groups.mapping` 了解更多详细信息。 |
| `spark.user.groups.mapping` | `org.apache.spark.security.ShellBasedGroupsMappingProvider` | 用户的组列表由特征定义的 group mapping service 决定可以通过此属性配置的org.apache.spark.security.GroupMappingServiceProvider. 提供了基于 unix shell 的默认实现 `org.apache.spark.security.ShellBasedGroupsMappingProvider` 可以指定它来解析用户的组列表。     _注意:_ 此实现仅支持基于 Unix/Linux 的环境。Windows 环境是      目前 **不** 支持。但是,通过实现可以支持新的 platform/protocol(平台/协议)trait `org.apache.spark.security.GroupMappingServiceProvider`。 |
| `spark.user.groups.mapping` | `org.apache.spark.security.ShellBasedGroupsMappingProvider` | 用户的组列表由特征定义的 group mapping service 决定可以通过此属性配置的org.apache.spark.security.GroupMappingServiceProvider. 提供了基于 unix shell 的默认实现 `org.apache.spark.security.ShellBasedGroupsMappingProvider` 可以指定它来解析用户的组列表。_注意:_ 此实现仅支持基于 Unix/Linux 的环境。Windows 环境目前是 **不** 支持。但是,通过实现可以支持新的 platform/protocol(平台/协议)trait `org.apache.spark.security.GroupMappingServiceProvider`。 |
| `spark.authenticate` | false | 是否 Spark 验证其内部连接。如果不是运行在 YARN 上,请看 `spark.authenticate.secret`。 |
| `spark.authenticate.secret` | None | 设置密钥用于 spark 组件之间进行身份验证。这需要设置 不启用运行在 yarn 和身份验证。 |
| `spark.network.crypto.enabled` | false | Enable encryption using the commons-crypto library for RPC and block transfer service. Requires `spark.authenticate` to be enabled. |
......
......@@ -72,11 +72,11 @@ history server 可以配置如下:
1. history server 显示完成的和未完成的 Spark 作业。如果应用程序在失败后进行多次尝试,将显示失败的尝试,以及任何持续未完成的尝试或最终成功的尝试。
2. 未完成的程序只会间歇性地更新。更新的时间间隔由更改文件的检查间隔 (`spark.history.fs.update.interval`) 定义。在较大的集群上,更新间隔可能设置为较大的值。查看正在运行的应用程序的方式实际上是查看自己的 Web UI。
2. 未完成的程序只会间歇性地更新。更新的时间间隔由更改文件的检查间隔`spark.history.fs.update.interval`定义。在较大的集群上,更新间隔可能设置为较大的值。查看正在运行的应用程序的方式实际上是查看自己的 Web UI。
3. 没有注册完成就退出的应用程序将被列出为未完成的,即使它们不再运行。如果应用程序崩溃,可能会发生这种情况。
4. 一个用于表示完成 Spark 作业的一种方法是明确地停止Spark Context (`sc.stop()`),或者在 Python 中使用 `with SparkContext() as sc:` 构造处理 Spark 上下文设置并拆除。
4. 一个用于表示完成 Spark 作业的一种方法是明确地停止Spark Context`sc.stop()`,或者在 Python 中使用 `with SparkContext() as sc:` 构造处理 Spark 上下文设置并拆除。
## REST API
......
......@@ -21,9 +21,9 @@
序列化在任何分布式应用程序的性能中起着重要的作用。很慢的将对象序列化或消费大量字节的格式将会大大减慢计算速度。通常,这可能是您优化 Spark 应用程序的第一件事。Spark 宗旨在于方便和性能之间取得一个平衡(允许您使用操作中的任何 Java 类型)。它提供了两种序列化库:
* [Java serialization](http://docs.oracle.com/javase/6/docs/api/java/io/Serializable.html):默认情况下,使用 Java `ObjectOutputStream` 框架的 Spark 序列化对象,并且可以与您创建的任何实现 [`java.io.Serializable`](http://docs.oracle.com/javase/6/docs/api/java/io/Serializable.html) 的类一起使用。您还可以通过扩展 [`java.io.Externalizable`](http://docs.oracle.com/javase/6/docs/api/java/io/Externalizable.html) 来更紧密地控制序列化的性能。Java 序列化是灵活的,但通常相当缓慢,并导致许多类的大型序列化格式。
* [Kryo serialization](https://github.com/EsotericSoftware/kryo):Spark 也可以使用 Kryo 库(版本2)来更快地对对象进行序列化。Kryo 比 Java 序列化(通常高达10x)要快得多,而且更紧凑,但并不支持所有的 `Serializable` 类型,并且需要先_注册_您将在程序中使用的类以获得最佳性能。
* [Kryo serialization](https://github.com/EsotericSoftware/kryo):Spark 也可以使用 Kryo 库(版本 2)来更快地对对象进行序列化。Kryo 比 Java 序列化(通常高达 10x)要快得多,而且更紧凑,但并不支持所有的 `Serializable` 类型,并且需要先 _注册_ 您将在程序中使用的类以获得最佳性能。
您可以通过使用 [SparkConf](configuration.html#spark-properties) 初始化作业 并进行调用来切换到使用 Kryo `conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")`。此设置配置用于不仅在工作节点之间进行洗牌数据的串行器,而且还将 RDD 序列化到磁盘。Kryo 不是默认的唯一原因是因为自定义注册要求,但我们建议您尝试在任何网络密集型应用程序。自从 Spark 2.0.0 以来,我们在使用简单类型,简单类型的数组或字符串类型对RDD进行混洗时,内部使用 Kryo serializer。
您可以通过使用 [SparkConf](configuration.html#spark-properties) 初始化作业 并进行调用来切换到使用 Kryo `conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")`。此设置配置用于不仅在工作节点之间进行洗牌数据的串行器,而且还将 RDD 序列化到磁盘。Kryo 不是默认的唯一原因是因为自定义注册要求,但我们建议您尝试在任何网络密集型应用程序。自从 Spark 2.0.0 以来,我们在使用简单类型,简单类型的数组或字符串类型对 RDD 进行混洗时,内部使用 Kryo serializer。
Spark 自动包含 Kryo 序列化器,用于 [Twitter chill](https://github.com/twitter/chill) 中 AllScalaRegistrar 涵盖的许多常用的核心 Scala 类。
......@@ -43,14 +43,14 @@ val sc = new SparkContext(conf)
# 内存调优
有三个方面的考虑在调整内存使用:该_量_的存储你的对象所使用的(你可能希望你的整个数据集,以适应在内存中),则_成本_访问这些对象,并且开销_垃圾收集_(如果你有高成交物品条款)。
有三个方面的考虑在调整内存使用:该 _量_ 的存储你的对象所使用的(你可能希望你的整个数据集,以适应在内存中),则 _成本_ 访问这些对象,并且开销 _垃圾收集_(如果你有高成交物品条款)。
默认情况下,Java 对象可以快速访问,但可以轻松地消耗比其字段中的 “raw” 数据多2-5倍的空间。这是由于以下几个原因:
默认情况下,Java 对象可以快速访问,但可以轻松地消耗比其字段中的 “raw” 数据多 2 - 5 倍的空间。这是由于以下几个原因:
* 每个不同的 Java 对象都有一个 “object header”,它大约是16个字节,包含一个指向它的类的指针。对于一个数据很少的对象(比如说一个`Int`字段),这可以比数据大。
* Java `String` 在原始字符串数据上具有大约40字节的开销(因为它们存储在 `Char` 数组中并保留额外的数据,例如长度),并且由于 UTF-16 的内部使用而将每个字符存储为_两个_字节 `String` 编码。因此,一个10个字符的字符串可以容易地消耗60个字节。
* 公共收集类,例如 `HashMap``LinkedList`,使用链接的数据结构,其中每个条目(例如: `Map.Entry`)存在”包装器”对象。该对象不仅具有 header,还包括指针(通常为8个字节)到列表中的下一个对象。
* 原始类型的集合通常将它们存储为”盒装”对象,例如: `java.lang.Integer`
* 每个不同的 Java 对象都有一个 “object header”,它大约是 16 个字节,包含一个指向它的类的指针。对于一个数据很少的对象(比如说一个`Int`字段),这可以比数据大。
* Java `String` 在原始字符串数据上具有大约 40 字节的开销(因为它们存储在 `Char` 数组中并保留额外的数据,例如长度),并且由于 UTF-16 的内部使用而将每个字符存储为 _两个_ 字节 `String` 编码。因此,一个 10 个字符的字符串可以容易地消耗 60 个字节。
* 公共收集类,例如 `HashMap``LinkedList`,使用链接的数据结构,其中每个条目(例如: `Map.Entry`)存在 “包装器” 对象。该对象不仅具有 header,还包括指针(通常为 8 个字节)到列表中的下一个对象。
* 原始类型的集合通常将它们存储为 “盒装” 对象,例如: `java.lang.Integer`
本节将从 Spark 的内存管理概述开始,然后讨论用户可以采取的具体策略,以便在他/她的应用程序中更有效地使用内存。具体来说,我们将描述如何确定对象的内存使用情况,以及如何改进数据结构,或通过以串行格式存储数据。然后我们将介绍调整 Spark 的缓存大小和 Java 垃圾回收器。
......@@ -110,9 +110,9 @@ Spark 中 GC 调优的目的是确保只有长寿命的 RDD 存储在 Old 版本
* 通过收集 GC 统计信息来检查垃圾收集是否太多。如果在任务完成之前多次调用完整的 GC,这意味着没有足够的可用于执行任务的内存。
* 如果太小的集合太多,而不是很多主要的 GC,为 Eden 分配更多的内存将会有所帮助。您可以将 Eden 的大小设置为对每个任务需要多少内存的估计。如果确定 Eden 的大小 `E`,那么您可以使用该选项设置年轻一代的大小 `-Xmn=4/3*E`。(按比例增加4/3是考虑幸存者地区使用的空间。)
* 如果太小的集合太多,而不是很多主要的 GC,为 Eden 分配更多的内存将会有所帮助。您可以将 Eden 的大小设置为对每个任务需要多少内存的估计。如果确定 Eden 的大小 `E`,那么您可以使用该选项设置年轻一代的大小 `-Xmn=4/3*E`。(按比例增加 4/3 是考虑幸存者地区使用的空间。)
* 在打印的 GC 统计信息中,如果 OldGen 接近于满,则通过降低减少用于缓存的内存量 `spark.memory.fraction` ; 缓存较少的对象比减慢任务执行更好。或者,考虑减少年轻一代的大小。这意味着 `-Xmn` 如果您将其设置为如上所述降低。如果没有,请尝试更改 JVM `NewRatio` 参数的值。许多 JVM 默认为2,这意味着 Old 版本占据堆栈的2/3。它应该足够大,使得该分数超过 `spark.memory.fraction`
* 在打印的 GC 统计信息中,如果 OldGen 接近于满,则通过降低减少用于缓存的内存量 `spark.memory.fraction` ; 缓存较少的对象比减慢任务执行更好。或者,考虑减少年轻一代的大小。这意味着 `-Xmn` 如果您将其设置为如上所述降低。如果没有,请尝试更改 JVM `NewRatio` 参数的值。许多 JVM 默认为 2,这意味着 Old 版本占据堆栈的 2/3。它应该足够大,使得该分数超过 `spark.memory.fraction`
* 尝试使用 G1GC 垃圾回收器 `-XX:+UseG1GC`。在垃圾收集是瓶颈的一些情况下,它可以提高性能。请注意,对于大型 excutor 的堆大小,通过设置 -XX:G1HeapRegionSize 参数来增加 [G1 区域的大小](https://blogs.oracle.com/g1gc/entry/g1_gc_tuning_a_case) 是非常重要的
......@@ -128,15 +128,15 @@ Spark 中 GC 调优的目的是确保只有长寿命的 RDD 存储在 Old 版本
## 并行度水平
集群不会被充分利用,除非您将每个操作的并行级别设置得足够高。自动星火设置的 “地图” 任务的数量根据其大小对每个文件运行(尽管你可以通过可选的参数来控制它 `SparkContext.textFile`,等等),以及用于分布式”减少”操作,如: `groupByKey``reduceByKey`,它采用了最大父 RDD 的分区数。您可以将并行级别作为第二个参数传递(请参阅 [`spark.PairRDDFunctions`](api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions) 文档),或者将 config 属性设置 `spark.default.parallelism` 为更改默认值。一般来说,我们建议您的群集中每个 CPU 内核有2-3个任务。
集群不会被充分利用,除非您将每个操作的并行级别设置得足够高。自动星火设置的 “地图” 任务的数量根据其大小对每个文件运行(尽管你可以通过可选的参数来控制它 `SparkContext.textFile`,等等),以及用于分布式”减少”操作,如: `groupByKey``reduceByKey`,它采用了最大父 RDD 的分区数。您可以将并行级别作为第二个参数传递(请参阅 [`spark.PairRDDFunctions`](api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions) 文档),或者将 config 属性设置 `spark.default.parallelism` 为更改默认值。一般来说,我们建议您的群集中每个 CPU 内核有 2 - 3 个任务。
## 减少任务的内存使用
有时,您将得到一个 OutOfMemoryError,因为您的 RDD 不适合内存,而是因为您的其中一个任务的工作集(如其中一个 reduce 任务`groupByKey`)太大。Spark 的 shuffle 操作(`sortByKey``groupByKey``reduceByKey``join`,等)建立每个任务中的哈希表来进行分组,而这往往是很大的。这里最简单的解决方案是_增加并行级别_,以便每个任务的输入集都更小。Spark 可以有效地支持短达200 ms 的任务,因为它可以将多个任务中的一个执行者JVM重用,并且任务启动成本低,因此您可以将并行级别安全地提高到集群中的核心数量。
有时,您将得到一个 OutOfMemoryError,因为您的 RDD 不适合内存,而是因为您的其中一个任务的工作集(如其中一个 reduce 任务`groupByKey`)太大。Spark 的 shuffle 操作(`sortByKey``groupByKey``reduceByKey``join`,等)建立每个任务中的哈希表来进行分组,而这往往是很大的。这里最简单的解决方案是 _增加并行级别_,以便每个任务的输入集都更小。Spark 可以有效地支持短达 200ms 的任务,因为它可以将多个任务中的一个执行者 JVM 重用,并且任务启动成本低,因此您可以将并行级别安全地提高到集群中的核心数量。
## 广播大的变量
使用 可用的[广播功能](programming-guide.html#broadcast-variables) `SparkContext` 可以大大减少每个序列化任务的大小,以及在群集上启动作业的成本。如果您的任务使用其中的驱动程序中的任何大对象(例如:静态查找表),请考虑将其变为广播变量。Spark 打印主机上每个任务的序列化大小,因此您可以查看该任务以决定您的任务是否过大; 一般任务大于20 KB大概值得优化。
使用 可用的[广播功能](programming-guide.html#broadcast-variables) `SparkContext` 可以大大减少每个序列化任务的大小,以及在群集上启动作业的成本。如果您的任务使用其中的驱动程序中的任何大对象(例如:静态查找表),请考虑将其变为广播变量。Spark 打印主机上每个任务的序列化大小,因此您可以查看该任务以决定您的任务是否过大; 一般任务大于 20KB 大概值得优化。
## 数据本地化
......
......@@ -29,7 +29,7 @@ SSL 配置是分级组织的。用户可以对所有的通讯协议配置默认
| Config Namespace(配置空间)| Component(组件)|
| --- | --- |
| `spark.ssl.fs` | 文件下载客户端 (用于从启用了 HTTPS 的服务器中下载 jars 和 files)。 |
| `spark.ssl.fs` | 文件下载客户端(用于从启用了 HTTPS 的服务器中下载 jars 和 files)。 |
| `spark.ssl.ui` | Spark application Web UI |
| `spark.ssl.standalone` | Standalone Master / Worker Web UI |
| `spark.ssl.historyServer` | History Server Web UI |
......
......@@ -26,7 +26,7 @@ Spark 开发者都会遇到一个常见问题,那就是如何为 Spark 配置
# 网络
根据我们的经验,当数据在内存中时,很多 Spark 应用程序跟网络有密切的关系。使用 **10 千兆位**以太网或者更快的网络是让这些应用程序变快的最佳方式。这对于 “distributed reduce” 类的应用程序来说尤其如此,例如 group-by 、reduce-by 和 SQL join。任何程序都可以在应用程序监控 UI 页面 (`http://&lt;driver-node&gt;:4040`) 中查看 Spark 通过网络传输的数据量。
根据我们的经验,当数据在内存中时,很多 Spark 应用程序跟网络有密切的关系。使用 **10 千兆位**以太网或者更快的网络是让这些应用程序变快的最佳方式。这对于 “distributed reduce” 类的应用程序来说尤其如此,例如 group-by 、reduce-by 和 SQL join。任何程序都可以在应用程序监控 UI 页面`http://&lt;driver-node&gt;:4040`中查看 Spark 通过网络传输的数据量。
# CPU Cores
......
......@@ -54,7 +54,7 @@ export MAVEN_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=512m"
### build/mvn
Spark 现在与一个独立的 Maven 安装包封装到了一起,以便从位于 `build/` 目录下的源代码构建和部署 Spark。该脚本将自动下载并设置所有必要的构建需求 ([Maven](https://maven.apache.org/)[Scala](http://www.scala-lang.org/)[Zinc](https://github.com/typesafehub/zinc)) 到本身的 `build/` 目录下。其尊重任何已经存在的 `mvn` 二进制文件,但是将会 pull down 其自己的 Scala 和 Zinc 的副本,不管是否满足适当的版本需求。`build/mvn` 的执行作为传递到 `mvn` 的调用,允许从以前的构建方法轻松转换。例如,可以像下边这样构建 Spark 的版本:
Spark 现在与一个独立的 Maven 安装包封装到了一起,以便从位于 `build/` 目录下的源代码构建和部署 Spark。该脚本将自动下载并设置所有必要的构建需求[Maven](https://maven.apache.org/)[Scala](http://www.scala-lang.org/)[Zinc](https://github.com/typesafehub/zinc)到本身的 `build/` 目录下。其尊重任何已经存在的 `mvn` 二进制文件,但是将会 pull down 其自己的 Scala 和 Zinc 的副本,不管是否满足适当的版本需求。`build/mvn` 的执行作为传递到 `mvn` 的调用,允许从以前的构建方法轻松转换。例如,可以像下边这样构建 Spark 的版本:
```
./build/mvn -DskipTests clean package
......
......@@ -5,4 +5,4 @@
* [StackOverflow tag `apache-spark`](http://stackoverflow.com/questions/tagged/apache-spark)
* [Mailing Lists](http://spark.apache.org/mailing-lists.html):在这里询问关于 Spark 的问题
* [AMP Camps](http://ampcamp.berkeley.edu/):在 UC Berkeley(加州大学伯克利分校)的一系列的训练营中,它们的特色是讨论和针对关于 Spark,Spark Streaming,Mesos 的练习,等等。在这里可以免费获取[视频](http://ampcamp.berkeley.edu/6/)[幻灯片](http://ampcamp.berkeley.edu/6/)[练习题](http://ampcamp.berkeley.edu/6/exercises/)
* [Code Examples](http://spark.apache.org/examples.html):更多`示例`可以在 Spark 的子文件夹中获取 ([Scala](https://github.com/apache/spark/tree/master/examples/src/main/scala/org/apache/spark/examples)[Java](https://github.com/apache/spark/tree/master/examples/src/main/java/org/apache/spark/examples)[Python](https://github.com/apache/spark/tree/master/examples/src/main/python)[R](https://github.com/apache/spark/tree/master/examples/src/main/r))
\ No newline at end of file
* [Code Examples](http://spark.apache.org/examples.html):更多`示例`可以在 Spark 的子文件夹中获取([Scala](https://github.com/apache/spark/tree/master/examples/src/main/scala/org/apache/spark/examples)[Java](https://github.com/apache/spark/tree/master/examples/src/main/java/org/apache/spark/examples)[Python](https://github.com/apache/spark/tree/master/examples/src/main/python)[R](https://github.com/apache/spark/tree/master/examples/src/main/r)
\ No newline at end of file
此差异已折叠。
......@@ -6,13 +6,13 @@
* [开始入门](#开始入门)
* [起始点:SparkSession](#起始点-sparksession)
* [创建 DataFrames](#创建-dataframes)
* [无类型的Dataset操作 (aka DataFrame 操作)](#无类型的dataset操作-aka-dataframe-操作)
* [无类型的 Dataset 操作(aka DataFrame 操作)](#无类型的dataset操作-aka-dataframe-操作)
* [Running SQL Queries Programmatically](#running-sql-queries-programmatically)
* [全局临时视图](#全局临时视图)
* [创建Datasets](#创建datasets)
* [RDD的互操作性](#rdd的互操作性)
* [使用反射推断Schema](#使用反射推断schema)
* [以编程的方式指定Schema](#以编程的方式指定schema)
* [创建 Datasets](#创建datasets)
* [RDD 的互操作性](#rdd的互操作性)
* [使用反射推断 Schema](#使用反射推断schema)
* [以编程的方式指定 Schema](#以编程的方式指定schema)
* [Aggregations](#aggregations)
* [Untyped User-Defined Aggregate Functions](#untyped-user-defined-aggregate-functions)
* [Type-Safe User-Defined Aggregate Functions](#type-safe-user-defined-aggregate-functions)
......@@ -58,7 +58,7 @@
* [Java 和 Scala APIs 的统一](#java-和-scala-apis-的统一)
* [隔离隐式转换和删除 dsl 包(仅Scala)](#隔离隐式转换和删除-dsl-包仅scala)
* [针对 DataType 删除在 org.apache.spark.sql 包中的一些类型别名(仅限于 Scala)](#针对-datatype-删除在-orgapachesparksql-包中的一些类型别名仅限于-scala)
* [UDF 注册迁移到 `sqlContext.udf` 中 (Java & Scala)](#udf-注册迁移到-sqlcontextudf-中-java--scala)
* [UDF 注册迁移到 `sqlContext.udf` 中(Java & Scala)](#udf-注册迁移到-sqlcontextudf-中-java--scala)
* [Python DataTypes 不再是 Singletons(单例的)](#python-datatypes-不再是-singletons单例的)
* [与 Apache Hive 的兼容](#与-apache-hive-的兼容)
* [在现有的 Hive Warehouses 中部署](#在现有的-hive-warehouses-中部署)
......@@ -80,7 +80,7 @@ Spark SQL 的功能之一是执行 SQL 查询。Spark SQL 也能够被用于从
## Datasets and DataFrames
一个 Dataset 是一个分布式的数据集合 Dataset 是在 Spark 1.6 中被添加的新接口,它提供了 RDD 的优点(强类型化,能够使用强大的 lambda 函数)与Spark SQL执行引擎的优点。一个 Dataset 可以从 JVM 对象来 [构造](#creating-datasets) 并且使用转换功能(map,flatMap,filter,等等)。Dataset API 在[Scala](api/scala/index.html#org.apache.spark.sql.Dataset)[Java](api/java/index.html?org/apache/spark/sql/Dataset.html)是可用的。Python 不支持 Dataset API。但是由于 Python 的动态特性,许多 Dataset API 的优点已经可用了 (也就是说,你可能通过 name 天生的`row.columnName`属性访问一行中的字段)。这种情况和 R 相似。
一个 Dataset 是一个分布式的数据集合 Dataset 是在 Spark 1.6 中被添加的新接口,它提供了 RDD 的优点(强类型化,能够使用强大的 lambda 函数)与Spark SQL执行引擎的优点。一个 Dataset 可以从 JVM 对象来 [构造](#creating-datasets) 并且使用转换功能(map,flatMap,filter,等等)。Dataset API 在[Scala](api/scala/index.html#org.apache.spark.sql.Dataset)[Java](api/java/index.html?org/apache/spark/sql/Dataset.html)是可用的。Python 不支持 Dataset API。但是由于 Python 的动态特性,许多 Dataset API 的优点已经可用了(也就是说,你可能通过 name 天生的`row.columnName`属性访问一行中的字段)。这种情况和 R 相似。
一个 DataFrame 是一个 _Dataset_ 组成的指定列。它的概念与一个在关系型数据库或者在 R/Python 中的表是相等的,但是有很多优化。DataFrames 可以从大量的 [sources](#data-sources) 中构造出来,比如:结构化的文本文件,Hive中的表,外部数据库,或者已经存在的 RDDs。DataFrame API 可以在 Scala,Java,[Python](api/python/pyspark.sql.html#pyspark.sql.DataFrame),和 [R](api/R/index.html)中实现。在 Scala 和 Java中,DataFrame 由 DataSet 中的 `RowS`(多个 Row)来表示。在 [the Scala API](api/scala/index.html#org.apache.spark.sql.Dataset)中,`DataFrame` 仅仅是一个 `Dataset[Row]`类型的别名。然而,在 [Java API](api/java/index.html?org/apache/spark/sql/Dataset.html)中,用户需要去使用 `Dataset&lt;Row&gt;` 去代表一个 `DataFrame`
......@@ -245,7 +245,7 @@ showDF(df)
<small>Find full example code at "examples/src/main/r/RSparkSQLExample.R" in the Spark repo.</small>
## 无类型的Dataset操作 (aka DataFrame 操作)
## 无类型的 Dataset 操作(aka DataFrame 操作)
DataFrames 提供了一个特定的语法用在 [Scala](api/scala/index.html#org.apache.spark.sql.Dataset)[Java](api/java/index.html?org/apache/spark/sql/Dataset.html)[Python](api/python/pyspark.sql.html#pyspark.sql.DataFrame) and [R](api/R/SparkDataFrame.html)中机构化数据的操作。
......@@ -1001,9 +1001,9 @@ namesDS.show();
<small>Find full example code at "examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java" in the Spark repo.</small>
当一个字典不能被提前定义 (例如,记录的结构是在一个字符串中,抑或一个文本中解析,被不同的用户所属),一个 `DataFrame` 可以通过以下3步来创建。
当一个字典不能被提前定义(例如,记录的结构是在一个字符串中,抑或一个文本中解析,被不同的用户所属),一个 `DataFrame` 可以通过以下 3 步来创建。
1. RDD从原始的RDD穿件一个RDD的toples或者一个列表;
1. RDD 从原始的 RDD 穿件一个 RDD 的 toples 或者一个列表;
2. Step 1 被创建后,创建 Schema 表示一个 `StructType` 匹配 RDD 中的结构。
3. 通过 `SparkSession` 提供的 `createDataFrame` 方法应用 Schema 到 RDD。
......@@ -2189,7 +2189,7 @@ SELECT * FROM jsonTable
## Hive 表
Spark SQL 还支持读取和写入存储在 [Apache Hive](http://hive.apache.org/) 中的数据。但是,由于 Hive 具有大量依赖关系,因此这些依赖关系不包含在默认 Spark 分发中。如果在类路径中找到 Hive 依赖项,Spark 将自动加载它们。请注意,这些 Hive 依赖关系也必须存在于所有工作节点上,因为它们将需要访问 Hive 序列化和反序列化库 (SerDes),以访问存储在 Hive 中的数据。
Spark SQL 还支持读取和写入存储在 [Apache Hive](http://hive.apache.org/) 中的数据。但是,由于 Hive 具有大量依赖关系,因此这些依赖关系不包含在默认 Spark 分发中。如果在类路径中找到 Hive 依赖项,Spark 将自动加载它们。请注意,这些 Hive 依赖关系也必须存在于所有工作节点上,因为它们将需要访问 Hive 序列化和反序列化库(SerDes),以访问存储在 Hive 中的数据。
通过将 `hive-site.xml``core-site.xml`(用于安全配置)和 `hdfs-site.xml`(用于 HDFS 配置)文件放在 `conf/` 中来完成配置。
......@@ -2457,7 +2457,7 @@ results <- collect(sql("FROM src SELECT key, value"))
### 指定 Hive 表的存储格式
创建 Hive 表时,需要定义如何 从/向 文件系统 read/write 数据,即 “输入格式” 和 “输出格式”。您还需要定义该表如何将数据反序列化为行,或将行序列化为数据,即 “serde”。以下选项可用于指定存储格式 (“serde”, “input format”, “output format”),例如,`CREATE TABLE src(id int) USING hive OPTIONS(fileFormat 'parquet')`。默认情况下,我们将以纯文本形式读取表格文件。请注意,Hive 存储处理程序在创建表时不受支持,您可以使用 Hive 端的存储处理程序创建一个表,并使用 Spark SQL 来读取它。
创建 Hive 表时,需要定义如何 从/向 文件系统 read/write 数据,即 “输入格式” 和 “输出格式”。您还需要定义该表如何将数据反序列化为行,或将行序列化为数据,即 “serde”。以下选项可用于指定存储格式(“serde”, “input format”, “output format”),例如,`CREATE TABLE src(id int) USING hive OPTIONS(fileFormat 'parquet')`。默认情况下,我们将以纯文本形式读取表格文件。请注意,Hive 存储处理程序在创建表时不受支持,您可以使用 Hive 端的存储处理程序创建一个表,并使用 Spark SQL 来读取它。
| Property Name | Meaning |
| --- | --- |
......@@ -2829,7 +2829,7 @@ Spark SQL CLI 是在本地模式下运行 Hive 转移服务并执行从命令行
#### DataFrame data reader/writer interface
基于用户反馈,我们创建了一个新的更流畅的 API,用于读取 (`SQLContext.read`) 中的数据并写入数据 (`DataFrame.write`),并且旧的 API 将过时(例如,`SQLContext.parquetFile``SQLContext.jsonFile`)。
基于用户反馈,我们创建了一个新的更流畅的 API,用于读取`SQLContext.read`)中的数据并写入数据(`DataFrame.write`,并且旧的 API 将过时(例如,`SQLContext.parquetFile``SQLContext.jsonFile`)。
针对 `SQLContext.read`[Scala](api/scala/index.html#org.apache.spark.sql.SQLContext@read:DataFrameReader)[Java](api/java/org/apache/spark/sql/SQLContext.html#read()),[Python](api/python/pyspark.sql.html#pyspark.sql.SQLContext.read)) 和 `DataFrame.write`[Scala](api/scala/index.html#org.apache.spark.sql.DataFrame@write:DataFrameWriter)[Java](api/java/org/apache/spark/sql/DataFrame.html#write()),[Python](api/python/pyspark.sql.html#pyspark.sql.DataFrame.write))的更多细节,请看 API 文档。
......@@ -2909,7 +2909,7 @@ sqlContext.setConf("spark.sql.retainGroupColumns", "false")
Spark 1.3 移除存在于基本 SQL 包的 `DataType` 类型别名。开发人员应改为导入类 `org.apache.spark.sql.types`
#### UDF 注册迁移到 `sqlContext.udf` 中 (Java & Scala)
#### UDF 注册迁移到 `sqlContext.udf` 中(Java & Scala)
用于注册 UDF 的函数,不管是 DataFrame DSL 还是 SQL 中用到的,都被迁移到 `SQLContext` 中的 udf 对象中。
......@@ -2946,15 +2946,15 @@ Spark SQL 支持绝大部分的 Hive 功能,如:
* `CLUSTER BY`
* `SORT BY`
* 所有 Hive 操作,包括:
* 关系运算符 (`=``⇔``==``&lt;&gt;``&lt;``&gt;``&gt;=``&lt;=`,等等)
* 算术运算符 (`+``-``*``/``%`,等等)
* 逻辑运算符 (`AND``&&``OR``||`,等等)
* 关系运算符`=``⇔``==``&lt;&gt;``&lt;``&gt;``&gt;=``&lt;=`,等等)
* 算术运算符`+``-``*``/``%`,等等)
* 逻辑运算符`AND``&&``OR``||`,等等)
* 复杂类型的构造
* 数学函数 (`sign``ln``cos`,等等)
* String 函数 (`instr``length``printf`,等等)
* 用户定义函数 (UDF)
* 用户定义聚合函数 (UDAF)
* 用户定义 serialization formats (SerDes)
* 数学函数`sign``ln``cos`,等等)
* String 函数`instr``length``printf`,等等)
* 用户定义函数(UDF)
* 用户定义聚合函数(UDAF)
* 用户定义 serialization formats(SerDes)
* 窗口函数
* Joins
* `JOIN`
......@@ -3011,7 +3011,7 @@ Spark SQL 支持绝大部分的 Hive 功能,如:
有少数 Hive 优化还没有包含在 Spark 中。其中一些(比如 indexes 索引)由于 Spark SQL 的这种内存计算模型而显得不那么重要。另外一些在 Spark SQL 未来的版本中会持续跟踪。
* Block 级别的 bitmap indexes 和虚拟 columns (用于构建 indexes)
* Block 级别的 bitmap indexes 和虚拟 columns(用于构建 indexes)
* 自动为 join 和 groupBy 计算 reducer 个数:目前在 Spark SQL 中,你需要使用 “`SET spark.sql.shuffle.partitions=[num_tasks];`” 来控制 post-shuffle 的并行度。
* 仅 Meta-data 的 query:对于只使用 metadata 就能回答的查询,Spark SQL 仍然会启动计算结果的任务。
* Skew data flag:Spark SQL 不遵循 Hive 中 skew 数据的标记。
......@@ -3075,7 +3075,7 @@ import org.apache.spark.sql.types._
**Note(注意):** _valueContainsNull_ 的默认值是 _true_。
| **StructType** | org.apache.spark.sql.Row | StructType(_fields_)
**Note(注意):** _fields_ 是 StructFields 的 Seq。所有,两个 fields 拥有相同的名称是不被允许的。
| **StructField** | 该 field(字段)数据类型的 Scala 中的 value 类型 (例如,数据类型为 IntegerType 的 StructField 是 Int) | StructField(_name_, _dataType_, [_nullable_])
| **StructField** | 该 field(字段)数据类型的 Scala 中的 value 类型(例如,数据类型为 IntegerType 的 StructField 是 Int) | StructField(_name_, _dataType_, [_nullable_])
**Note:** _nullable_ 的默认值是 _true_。
Spark SQL 的所有数据类型都在 `org.apache.spark.sql.types` 的包中。要访问或者创建一个数据类型,请使用 `org.apache.spark.sql.types.DataTypes` 中提供的 factory 方法.
......
......@@ -10,7 +10,7 @@
* [Structural 运算符](#structural-运算符)
* [Join 运算符](#join-运算符)
* [邻域聚合](#邻域聚合)
* [聚合消息 (aggregateMessages)](#聚合消息-aggregatemessages)
* [聚合消息(aggregateMessages)](#聚合消息-aggregatemessages)
* [Map Reduce Triplets Transition Guide (Legacy)](#map-reduce-triplets-transition-guide-legacy)
* [计算级别信息](#计算级别信息)
* [收集相邻点](#收集相邻点)
......@@ -50,7 +50,7 @@ import org.apache.spark.rdd.RDD
[属性 Graph](api/scala/index.html#org.apache.spark.graphx.Graph) 是一个定向多重图形,用户定义的对象附加到每个顶点和边缘。定向多图是具有共享相同源和目标顶点的潜在多个平行边缘的有向图。支持平行边缘的能力简化了在相同顶点之间可以有多个关系(例如: 同事和朋友)的建模场景。每个顶点都由唯一的 64 位长标识符(`VertexId`)键入。GraphX 不对顶点标识符施加任何排序约束。类似地,边缘具有对应的源和目标顶点标识符。
属性图是通过 vertex (`VD`)和 edge (`ED`) 类型进行参数化的。这些是分别与每个顶点和边缘相关联的对象的类型。
属性图是通过 vertex`VD`)和 edge(`ED`类型进行参数化的。这些是分别与每个顶点和边缘相关联的对象的类型。
> 当它们是原始数据类型(例如: int,double 等等)时,GraphX 优化顶点和边缘类型的表示,通过将其存储在专门的数组中来减少内存占用。
......@@ -240,7 +240,7 @@ class Graph[VD, ED] {
这些运算符中的每一个产生一个新的图形,其中顶点或边缘属性被用户定义的 `map` 函数修改。
> 请注意,在每种情况下,图形结构都不受影响。这是这些运算符的一个关键特征,它允许生成的图形重用原始图形的结构索引。以下代码段在逻辑上是等效的,但是第一个代码片段不保留结构索引,并且不会从GraphX系统优化中受益:
> 请注意,在每种情况下,图形结构都不受影响。这是这些运算符的一个关键特征,它允许生成的图形重用原始图形的结构索引。以下代码段在逻辑上是等效的,但是第一个代码片段不保留结构索引,并且不会从 GraphX 系统优化中受益:
```
val newVertices = graph.vertices.map { case (id, attr) => (id, mapUdf(id, attr)) }
......@@ -279,9 +279,9 @@ class Graph[VD, ED] {
}
```
[`reverse`](api/scala/index.html#org.apache.spark.graphx.Graph@reverse:Graph[VD,ED]) 运算符将返回逆转的所有边缘方向上的新图。这在例如尝试计算逆PageRank时是有用的。由于反向操作不会修改顶点或边缘属性或更改边缘数量,因此可以在没有数据移动或重复的情况下高效地实现。
[`reverse`](api/scala/index.html#org.apache.spark.graphx.Graph@reverse:Graph[VD,ED]) 运算符将返回逆转的所有边缘方向上的新图。这在例如尝试计算逆 PageRank 时是有用的。由于反向操作不会修改顶点或边缘属性或更改边缘数量,因此可以在没有数据移动或重复的情况下高效地实现。
[`subgraph`](api/scala/index.html#org.apache.spark.graphx.Graph@subgraph((EdgeTriplet[VD,ED])⇒Boolean,(VertexId,VD)⇒Boolean):Graph[VD,ED]) 操作者需要的顶点和边缘的谓词,并返回包含只有满足谓词顶点的顶点的曲线图(评估为真),并且满足谓词边缘边缘_并连接满足顶点谓词顶点_。所述 `subgraph` 操作员可在情况编号被用来限制图形以顶点和感兴趣的边缘或消除断开的链接。例如,在以下代码中,我们删除了断开的链接:
[`subgraph`](api/scala/index.html#org.apache.spark.graphx.Graph@subgraph((EdgeTriplet[VD,ED])⇒Boolean,(VertexId,VD)⇒Boolean):Graph[VD,ED]) 操作者需要的顶点和边缘的谓词,并返回包含只有满足谓词顶点的顶点的曲线图(评估为真),并且满足谓词边缘边缘 _并连接满足顶点谓词顶点_。所述 `subgraph` 操作员可在情况编号被用来限制图形以顶点和感兴趣的边缘或消除断开的链接。例如,在以下代码中,我们删除了断开的链接:
```
// Create an RDD for the vertices
......@@ -379,7 +379,7 @@ val joinedGraph = graph.joinVertices(uniqueCosts,
### 聚合消息 (aggregateMessages)
GraphX 中的核心聚合操作是 [`aggregateMessages`](api/scala/index.html#org.apache.spark.graphx.Graph@aggregateMessages[A]((EdgeContext[VD,ED,A])⇒Unit,(A,A)⇒A,TripletFields)(ClassTag[A]):VertexRDD[A])。该运算符将用户定义的 `sendMsg` 函数应用于图中的每个_边缘三元组_,然后使用该 `mergeMsg` 函数在其目标顶点聚合这些消息。
GraphX 中的核心聚合操作是 [`aggregateMessages`](api/scala/index.html#org.apache.spark.graphx.Graph@aggregateMessages[A]((EdgeContext[VD,ED,A])⇒Unit,(A,A)⇒A,TripletFields)(ClassTag[A]):VertexRDD[A])。该运算符将用户定义的 `sendMsg` 函数应用于图中的每个 _边缘三元组_,然后使用该 `mergeMsg` 函数在其目标顶点聚合这些消息。
```
class Graph[VD, ED] {
......@@ -391,7 +391,7 @@ class Graph[VD, ED] {
}
```
用户定义的 `sendMsg` 函数接受一个 [`EdgeContext`](api/scala/index.html#org.apache.spark.graphx.EdgeContext),它将源和目标属性以及 edge 属性和函数 ([`sendToSrc`](api/scala/index.html#org.apache.spark.graphx.EdgeContext@sendToSrc(msg:A):Unit),和 [`sendToDst`](api/scala/index.html#org.apache.spark.graphx.EdgeContext@sendToDst(msg:A):Unit)) 一起发送到源和目标属性。在 map-reduce 中,将 `sendMsg` 作为 _map_ 函数。用户定义的 `mergeMsg` 函数需要两个发往同一顶点的消息,并产生一条消息。想想 `mergeMsg` 是 map-reduce 中的_reduce_ 函数。[`aggregateMessages`](api/scala/index.html#org.apache.spark.graphx.Graph@aggregateMessages[A]((EdgeContext[VD,ED,A])⇒Unit,(A,A)⇒A,TripletFields)(ClassTag[A]):VertexRDD[A]) 运算符返回一个 `VertexRDD[Msg]`,其中包含去往每个顶点的聚合消息(Msg类型)。没有收到消息的顶点不包括在返回的 `VertexRDD`[VertexRDD](api/scala/index.html#org.apache.spark.graphx.VertexRDD) 中。
用户定义的 `sendMsg` 函数接受一个 [`EdgeContext`](api/scala/index.html#org.apache.spark.graphx.EdgeContext),它将源和目标属性以及 edge 属性和函数 ([`sendToSrc`](api/scala/index.html#org.apache.spark.graphx.EdgeContext@sendToSrc(msg:A):Unit),和 [`sendToDst`](api/scala/index.html#org.apache.spark.graphx.EdgeContext@sendToDst(msg:A):Unit)) 一起发送到源和目标属性。在 map-reduce 中,将 `sendMsg` 作为 _map_ 函数。用户定义的 `mergeMsg` 函数需要两个发往同一顶点的消息,并产生一条消息。想想 `mergeMsg` 是 map-reduce 中的 _reduce_ 函数。[`aggregateMessages`](api/scala/index.html#org.apache.spark.graphx.Graph@aggregateMessages[A]((EdgeContext[VD,ED,A])⇒Unit,(A,A)⇒A,TripletFields)(ClassTag[A]):VertexRDD[A]) 运算符返回一个 `VertexRDD[Msg]`,其中包含去往每个顶点的聚合消息(Msg类型)。没有收到消息的顶点不包括在返回的 `VertexRDD`[VertexRDD](api/scala/index.html#org.apache.spark.graphx.VertexRDD) 中。
另外,[`aggregateMessages`](api/scala/index.html#org.apache.spark.graphx.Graph@aggregateMessages[A]((EdgeContext[VD,ED,A])⇒Unit,(A,A)⇒A,TripletFields)(ClassTag[A]):VertexRDD[A]) 采用一个可选的`tripletsFields`,它们指示在 [`EdgeContext`](api/scala/index.html#org.apache.spark.graphx.EdgeContext) 中访问哪些数据(即源顶点属性,而不是目标顶点属性)。`tripletsFields` 定义的可能选项,[`TripletFields`](api/java/org/apache/spark/graphx/TripletFields.html) 默认值是 [`TripletFields.All`](api/java/org/apache/spark/graphx/TripletFields.html#All) 指示用户定义的 `sendMsg` 函数可以访问的任何字段[`EdgeContext`](api/scala/index.html#org.apache.spark.graphx.EdgeContext)。该 `tripletFields` 参数可用于通知 GraphX,只有部分 [`EdgeContext`](api/scala/index.html#org.apache.spark.graphx.EdgeContext) 需要允许 GraphX 选择优化的连接策略。例如,如果我们计算每个用户的追随者的平均年龄,我们只需要源字段,因此我们将用于 [`TripletFields.Src`](api/java/org/apache/spark/graphx/TripletFields.html#Src) 表示我们只需要源字段。
......@@ -443,7 +443,7 @@ class Graph[VD, ED] {
}
```
`mapReduceTriplets` 操作符接受用户定义的映射函数,该函数应用于每个三元组,并且可以使用用户定义的缩减函数来生成聚合的_消息_。然而,我们发现返回的迭代器的用户是昂贵的,并且它阻止了我们应用其他优化(例如:局部顶点重新编号)的能力。在 [`aggregateMessages`](api/scala/index.html#org.apache.spark.graphx.Graph@aggregateMessages[A]((EdgeContext[VD,ED,A])⇒Unit,(A,A)⇒A,TripletFields)(ClassTag[A]):VertexRDD[A]) 中,我们引入了 EdgeContext,它暴露了三元组字段,并且还显示了向源和目标顶点发送消息的功能。此外,我们删除了字节码检查,而是要求用户指出三元组中实际需要哪些字段。
`mapReduceTriplets` 操作符接受用户定义的映射函数,该函数应用于每个三元组,并且可以使用用户定义的缩减函数来生成聚合的 _消息_。然而,我们发现返回的迭代器的用户是昂贵的,并且它阻止了我们应用其他优化(例如:局部顶点重新编号)的能力。在 [`aggregateMessages`](api/scala/index.html#org.apache.spark.graphx.Graph@aggregateMessages[A]((EdgeContext[VD,ED,A])⇒Unit,(A,A)⇒A,TripletFields)(ClassTag[A]):VertexRDD[A]) 中,我们引入了 EdgeContext,它暴露了三元组字段,并且还显示了向源和目标顶点发送消息的功能。此外,我们删除了字节码检查,而是要求用户指出三元组中实际需要哪些字段。
以下代码块使用 `mapReduceTriplets`
......@@ -499,17 +499,17 @@ class GraphOps[VD, ED] {
在 Spark 中,默认情况下,RDD 不会保留在内存中。为了避免重新计算,在多次使用它们时,必须明确缓存它们(参见 [Spark Programming Guide](programming-guide.html#rdd-persistence))。GraphX 中的图形表现方式相同。**当多次使用图表时,请务必先调用 [`Graph.cache()`](api/scala/index.html#org.apache.spark.graphx.Graph@cache():Graph[VD,ED])。**
在迭代计算中,_uncaching_ 也可能是最佳性能所必需的。默认情况下,缓存的 RDD 和图形将保留在内存中,直到内存压力迫使它们以 LRU 顺序逐出。对于迭代计算,来自先前迭代的中间结果将填满缓存。虽然它们最终被驱逐出来,但存储在内存中的不必要的数据会减慢垃圾收集速度。一旦不再需要中间结果,就会更有效率。这涉及每次迭代实现(缓存和强制)图形或 RDD,取消所有其他数据集,并且仅在将来的迭代中使用实例化数据集。然而,由于图形由多个 RDD 组成,所以很难将它们正确地分开。**对于迭代计算,我们建议使用Pregel API,它可以正确地解析中间结果。**
在迭代计算中,_uncaching_ 也可能是最佳性能所必需的。默认情况下,缓存的 RDD 和图形将保留在内存中,直到内存压力迫使它们以 LRU 顺序逐出。对于迭代计算,来自先前迭代的中间结果将填满缓存。虽然它们最终被驱逐出来,但存储在内存中的不必要的数据会减慢垃圾收集速度。一旦不再需要中间结果,就会更有效率。这涉及每次迭代实现(缓存和强制)图形或 RDD,取消所有其他数据集,并且仅在将来的迭代中使用实例化数据集。然而,由于图形由多个 RDD 组成,所以很难将它们正确地分开。**对于迭代计算,我们建议使用 Pregel API,它可以正确地解析中间结果。**
# Pregel API
图形是固有的递归数据结构,因为顶点的属性取决于其邻居的属性,而邻居的属性又依赖于_其_邻居的属性。因此,许多重要的图算法迭代地重新计算每个顶点的属性,直到达到一个固定点条件。已经提出了一系列图并行抽象来表达这些迭代算法。GraphX 公开了 Pregel API 的变体。
图形是固有的递归数据结构,因为顶点的属性取决于其邻居的属性,而邻居的属性又依赖于 _其_ 邻居的属性。因此,许多重要的图算法迭代地重新计算每个顶点的属性,直到达到一个固定点条件。已经提出了一系列图并行抽象来表达这些迭代算法。GraphX 公开了 Pregel API 的变体。
在高层次上,GraphX 中的 Pregel 运算符是_限制到图形拓扑的_批量同步并行消息抽象。Pregel 操作符在一系列超级步骤中执行,其中顶点接收来自先前超级步骤的入站消息的_总和_,计算顶点属性的新值,然后在下一个超级步骤中将消息发送到相邻顶点。与 Pregel 不同,消息作为边缘三元组的函数并行计算,消息计算可以访问源和目标顶点属性。在超级步骤中跳过不接收消息的顶点。Pregel 运算符终止迭代,并在没有剩余的消息时返回最终的图。
在高层次上,GraphX 中的 Pregel 运算符是 _限制到图形拓扑的_ 批量同步并行消息抽象。Pregel 操作符在一系列超级步骤中执行,其中顶点接收来自先前超级步骤的入站消息的 _总和_,计算顶点属性的新值,然后在下一个超级步骤中将消息发送到相邻顶点。与 Pregel 不同,消息作为边缘三元组的函数并行计算,消息计算可以访问源和目标顶点属性。在超级步骤中跳过不接收消息的顶点。Pregel 运算符终止迭代,并在没有剩余的消息时返回最终的图。
> 注意,与更多的标准 Pregel 实现不同,GraphX 中的顶点只能将消息发送到相邻顶点,并且使用用户定义的消息传递功能并行完成消息构造。这些约束允许在 GraphX 中进行额外优化。
以下是 [Pregel 运算符](api/scala/index.html#org.apache.spark.graphx.GraphOps@pregel[A](A,Int,EdgeDirection)((VertexId,VD,A)⇒VD,(EdgeTriplet[VD,ED])⇒Iterator[(VertexId,A)],(A,A)⇒A)(ClassTag[A]):Graph[VD,ED]) 的类型签名以及 其实现的_草图_(注意:为了避免由于长谱系链引起的 stackOverflowError,pregel 支持周期性检查点图和消息,将 “spark.graphx.pregel.checkpointInterval” 设置为正数,说10。并使用 SparkContext.setCheckpointDir(directory: String)) 设置 checkpoint 目录):
以下是 [Pregel 运算符](api/scala/index.html#org.apache.spark.graphx.GraphOps@pregel[A](A,Int,EdgeDirection)((VertexId,VD,A)⇒VD,(EdgeTriplet[VD,ED])⇒Iterator[(VertexId,A)],(A,A)⇒A)(ClassTag[A]):Graph[VD,ED]) 的类型签名以及 其实现的 _草图_(注意:为了避免由于长谱系链引起的 stackOverflowError,pregel 支持周期性检查点图和消息,将 “spark.graphx.pregel.checkpointInterval” 设置为正数,说10。并使用 SparkContext.setCheckpointDir(directory: String)) 设置 checkpoint 目录):
```
class GraphOps[VD, ED] {
......@@ -601,7 +601,7 @@ object GraphLoader {
1 2
```
`Graph` 从指定的边缘创建一个,自动创建边缘提到的任何顶点。所有顶点和边缘属性默认为1\. `canonicalOrientation` 参数允许在正方向 (`srcId &lt; dstId`) 重新定向边,这是[连接的组件](api/scala/index.html#org.apache.spark.graphx.lib.ConnectedComponents$)算法所要求的。该 `minEdgePartitions` 参数指定要生成的边缘分区的最小数量; 如果例如 HDFS 文件具有更多块,则可能存在比指定更多的边缘分区。
`Graph` 从指定的边缘创建一个,自动创建边缘提到的任何顶点。所有顶点和边缘属性默认为1\. `canonicalOrientation` 参数允许在正方向`srcId &lt; dstId`)重新定向边,这是[连接的组件](api/scala/index.html#org.apache.spark.graphx.lib.ConnectedComponents$)算法所要求的。该 `minEdgePartitions` 参数指定要生成的边缘分区的最小数量;如果例如 HDFS 文件具有更多块,则可能存在比指定更多的边缘分区。
```
object Graph {
......@@ -658,7 +658,7 @@ class VertexRDD[VD] extends RDD[(VertexId, VD)] {
请注意,例如,`filter` 运算符 如何返回 [`VertexRDD`](api/scala/index.html#org.apache.spark.graphx.VertexRDD)。过滤器实际上是通过 `BitSet` 使用索引重新实现的,并保留与其他`VertexRDD` 进行快速连接的能力。同样,`mapValues` 运算符不允许 `map` 功能改变,`VertexId` 从而使相同的 `HashMap` 数据结构能够被重用。无论是 `leftJoin``innerJoin` 能够连接两个时识别 `VertexRDD` 来自同一来源的小号 `HashMap` 和落实线性扫描,而不是昂贵的点查找的加入。
`aggregateUsingIndex` 运算符对于从 `RDD[(VertexId, A)]` 有效构建新的 [`VertexRDD`](api/scala/index.html#org.apache.spark.graphx.VertexRDD) 非常有用。在概念上,如果我在一组顶点上构造了一个 `VertexRDD[B]`,这是一些 `RDD[(VertexId, A)]` 中的顶点的_超集_,那么我可以重用索引来聚合然后再索引 `RDD[(VertexId, A)]`。例如:
`aggregateUsingIndex` 运算符对于从 `RDD[(VertexId, A)]` 有效构建新的 [`VertexRDD`](api/scala/index.html#org.apache.spark.graphx.VertexRDD) 非常有用。在概念上,如果我在一组顶点上构造了一个 `VertexRDD[B]`,这是一些 `RDD[(VertexId, A)]` 中的顶点的 _超集_,那么我可以重用索引来聚合然后再索引 `RDD[(VertexId, A)]`。例如:
```
val setA: VertexRDD[Int] = VertexRDD(sc.parallelize(0L until 100L).map(id => (id, 1)))
......@@ -761,7 +761,7 @@ println(ccByUsername.collect().mkString("\n"))
## Triangle 计数
顶点是三角形的一部分,当它有两个相邻的顶点之间有一个边。GraphX 在 [`TriangleCount 对象`](api/scala/index.html#org.apache.spark.graphx.lib.TriangleCount$) 中实现一个三角计数算法,用于确定通过每个顶点的三角形数量,提供聚类度量。我们从 [PageRank 部分](#pagerank) 计算社交网络数据集的三角形数。_需要注意的是 `TriangleCount` 边缘要处于规范方向 (`srcId &lt; dstId`),而图形要使用 [`Graph.partitionBy`](api/scala/index.html#org.apache.spark.graphx.Graph@partitionBy(PartitionStrategy):Graph[VD,ED])。_
顶点是三角形的一部分,当它有两个相邻的顶点之间有一个边。GraphX 在 [`TriangleCount 对象`](api/scala/index.html#org.apache.spark.graphx.lib.TriangleCount$) 中实现一个三角计数算法,用于确定通过每个顶点的三角形数量,提供聚类度量。我们从 [PageRank 部分](#pagerank) 计算社交网络数据集的三角形数。_需要注意的是 `TriangleCount` 边缘要处于规范方向`srcId &lt; dstId`,而图形要使用 [`Graph.partitionBy`](api/scala/index.html#org.apache.spark.graphx.Graph@partitionBy(PartitionStrategy):Graph[VD,ED])。_
```
import org.apache.spark.graphx.{GraphLoader, PartitionStrategy}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册