Accumulators(累加器)是一个仅可以执行 “added”(添加)的变量来通过一个关联和交换操作,因此可以高效地执行支持并行。累加器可以用于实现 counter( 计数,类似在 MapReduce 中那样)或者 sums(求和)。原生 Spark 支持数值型的累加器,并且程序员可以添加新的支持类型。
创建 accumulators(累加器)并命名之后,在 Spark 的 UI 界面上将会显示它。这样可以帮助理解正在运行的阶段的运行情况(注意 : 该特性在 Python 中还不支持)。
可以通过调用 SparkContext.longAccumulator()
或 SparkContext.doubleAccumulator()
方法创建数值类型的 accumulator(累加器)以分别累加 Long 或 Double 类型的值。
集群上正在运行的任务就可以使用 add 方法来累计数值
。然而,它们不能够读取它的值。只有 driver program(驱动程序)才可以使用 value 方法读取累加器的值。
下面的代码展示了一个 accumulator(累加器)被用于对一个数字中的元素求和。
scala> val accum = sc.longAccumulator("My Accumulator") accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0) scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x)) ... 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s scala> accum.value res2: Long = 10
上面的代码示例使用的是 Spark 内置的 Long 类型的累加器,程序员可以通过继承 AccumulatorV2 类创建新的累加器类型。AccumulatorV2 抽象类有几个需要 override(重写)的方法 : reset 方法可将累加器重置为 0,add 方法可将其它值添加到累加器中,merge 方法可将其他同样类型的累加器合并为一个。其他需要重写的方法可参考 scala API 文档。 例如,假设我们有一个表示数学上 vectors(向量)的 My
Vector
类,我们可以写成 :
object VectorAccumulatorV2 extends AccumulatorV2[MyVector, MyVector] { val vec_ : MyVector = MyVector.createZeroVector def reset(): MyVector = { vec_.reset() } def add(v1: MyVector, v2: MyVector): MyVector = { vec_.add(v2) } ... } // Then, create an Accumulator of this type: val myVectorAcc = new VectorAccumulatorV2 // Then, register it into spark context: sc.register(myVectorAcc, "MyVectorAcc1")
注意,在开发者定义自己的 AccumulatorV2 类型时, resulting type(返回值类型)可能与添加的元素的类型不一致。
累加器的更新只发生在 action 操作中,Spark 保证每个任务只更新累加器一次,例如,重启任务不会更新值。在 transformations(转换)中, 用户需要注意的是,如果 task(任务)或 job stages(阶段)重新执行,每个任务的更新操作可能会执行多次。
累加器不会改变 Spark lazy evaluation(懒加载)的模式。如果累加器在 RDD 中的一个操作中进行更新,它们的值仅被更新一次,RDD 被作为 action 的一部分来计算。因此,在一个像 map()
这样的 transformation(转换)时,累加器的更新并没有执行。下面的代码片段证明了这个特性 :
val accum = sc.accumulator(0) data.map { x => accum += x; x } // 在这里,accus 仍然为 0, 因为没有 actions(动作)来让 map 操作被计算。