本教程提供了如何使用 Spark 的简要介绍。首先通过运行 Spark 交互式的 shell(在 Python 或 Scala 中)来介绍 API,然后展示如何使用 Java ,Scala 和 Python 来编写应用程序。更多信息请参考 Spark 编程指南。
为了继续阅读本指南,首先从 Spark 官网 下载 Spark 的发行包。因为我们将不使用 HDFS,所以你可以下载一个任何 Hadoop 版本的软件包。
Spark shell 提供了一种来学习该 API 比较简单的方式,以及一个来分析数据交互的强大的工具。在 Scala(运行于 Java 虚拟机之上,并能很好的调用已存在的 Java 类库)或者 Python 中它是可用的。通过在 Spark 目录中运行以下的命令来启动它 :
./bin/spark-shell
Spark 的主要抽象是一个称为弹性分布式数据集(RDD)的分布式的 item 集合。RDD 可以从 Hadoop 的 InputFormats(例如 HDFS文件)或者通过其它 RDD 的转换来创建。让我们从源目录中的 README 文件中的文本创建一个新的 RDD :
scala> val textFile = sc.textFile("README.md") textFile: org.apache.spark.rdd.RDD[String] = README.md MapPartitionsRDD[1] at textFile at <console>:25
RDD 有可以返回值的 actions(动作),还有可以返回指定的新 RDD 的 transformations(转换)。让我们启动一个新的 actions(动作) :
scala> textFile.count() // 在这个 RDD 中 items 的数量 res0: Long = 126 scala> textFile.first() // 在这个 RDD 中的第一个 item res1: String = # Apache Spark
现在让我们使用一个 transformation(转换)。我们将使用 filter
transaction(转换)来返回一个新的 RDD(文件中 item 的一个子集)。
scala> val linesWithSpark = textFile.filter(line => line.contains("Spark")) linesWithSpark: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at filter at <console>:27
我们可以链式操作 transformation(转换) 和 action(动作)。
scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"? res3: Long = 15
RDD actions(操作)和 transformations(转换)可以用于更复杂的计算。例如,统计出现次数最多的单词 :
scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b) res4: Long = 15
第一个 map 操作创建一个新的 RDD,将一行数据 map 为一个整型值。reduce
RDD 找到最大的行计数。参数 map
与 reduce
是 Python 的 匿名函数(lambda表达式),但我们也可以通过我们想要的任何顶级的 Python 功能。例如,我们将定义一个 max
函数来使代码更易于理解 :
scala> import java.lang.Math import java.lang.Math scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b)) res5: Int = 15
一种常见的数据流模式是被 Hadoop 所推广的 MapReduce。Spark 可以很容易实现 MapReduce :
scala> val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b) wordCounts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[8] at reduceByKey at <console>:28
在这里,我们结合了 flatMap
,map
和 reduceByKey
transformations(转换)来计算文件中每一个单词的数量作为一个(string,int)的 RDD pairs(对)。对每个单词计数。为了在我们的 shell 中统计单词出现的次数,我们可以使用 collect action(
动作):
>>> wordCounts.collect() [(u'and', 9), (u'A', 1), (u'webpage', 1), (u'README', 1), (u'Note', 1), (u'"local"', 1), (u'variable', 1), ...]
Spark 还支持 Pulling(拉取)数据集到一个群集范围的内存缓存中。例如当查询一个小的 “hot” 数据集或运行一个像 PageRANK 这样的迭代算法时,在数据被重复访问时是非常高效的。举一个简单的例子,让我们标记我们的 linesWithSpark
数据集到缓存中 :
scala> linesWithSpark.cache() res7: linesWithSpark.type = MapPartitionsRDD[2] at filter at <console>:27 scala> linesWithSpark.count() res8: Long = 19 scala> linesWithSpark.count() res9: Long = 19
使用 Spark 来探索和缓存一个 100 行的文本文件看起来比较愚蠢。有趣的是,即使在他们跨越几十或者几百个节点时,这些相同的函数也可以用于非常大的数据集。您也可以像 编程指南 中描述的一样通过连接 bin/spark-shell 到集群中,使用交互式的方式来做这件事情。
假设我们希望使用 Spark API 来创建一个独立的应用程序。我们在 Scala(SBT),Java(Maven)和 Python 中练习一个简单应用程序。
我们将在 Scala 中创建一个非常简单的 Spark 应用程序 - 很简单的,事实上,它名为 SimpleApp.scala :
/* SimpleApp.scala */ import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf object SimpleApp { def main(args: Array[String]) { val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system val conf = new SparkConf().setAppName("Simple Application") val sc = new SparkContext(conf) val logData = sc.textFile(logFile, 2).cache() val numAs = logData.filter(line => line.contains("a")).count() val numBs = logData.filter(line => line.contains("b")).count() println("Lines with a: %s, Lines with b: %s".format(numAs, numBs)) } }
注意这个应用程序我们应该定义一个 main() 方法而不是去扩展 scala.App
。使用 scala.App
的子类可能不会正常运行。
该程序仅仅统计了 Spark README 文件中每一行包含 ‘a’ 的数量和包含 ‘b’ 的数量。注意,您需要将 YOUR_SPARK_HOME 替换为您 Spark 安装的位置。不像先前使用 spark shell 操作的示例,它们初始化了它们自己的 SparkContext,我们初始化了一个 SparkContext 作为应用程序的一部分。
我们传递给了 SparkContext 构造器一个包含我们应用程序信息的 SparkConf 对象。
我们的应用依赖了 Spark API,所以我们将包含一个名为 simple.sbt 的 sbt 配置文件,它说明了 Spark 是一个依赖。
该文件也添加了一个 Spark 依赖的仓库 :
name := "Simple Project" version := "1.0" scalaVersion := "2.11.7" libraryDependencies += "org.apache.spark" %% "spark-core" % "2.0.2"
为了让 sbt 正常的运行,我们需要根据经典的目录结构来布局 SimpleApp.scala
和 simple.sbt
文件。在成功后,我们可以创建一个包含应用程序代码的 JAR 包,然后使用 spark-submit 脚本来运行我们的程序。
# Your directory layout should look like this $ find . . ./simple.sbt ./src ./src/main ./src/main/scala ./src/main/scala/SimpleApp.scala # Package a jar containing your application $ sbt package ... [info] Packaging {..}/{..}/target/scala-2.11/simple-project_2.11-1.0.jar # Use spark-submit to run your application $ YOUR_SPARK_HOME/bin/spark-submit \ --class "SimpleApp" \ --master local[4] \ target/scala-2.11/simple-project_2.11-1.0.jar ... Lines with a: 46, Lines with b: 23
恭喜您成功的运行了您的第一个 Spark 应用程序!
# 针对 Scala 和 Java, 使用 run-example : ./bin/run-example SparkPi # 针对 Python 示例, 直接使用 spark-submit : ./bin/spark-submit examples/src/main/python/pi.py # 针对 R 示例,直接使用 spark-submit : ./bin/spark-submit examples/src/main/r/dataframe.R