提交 98570530 编写于 作者: M Matei Zaharia 提交者: Aaron Davidson

SPARK-2684: Update ExternalAppendOnlyMap to take an iterator as input

This will decrease object allocation from the "update" closure used in map.changeValue.

Author: Matei Zaharia <matei@databricks.com>

Closes #1607 from mateiz/spark-2684 and squashes the following commits:

b7d89e6 [Matei Zaharia] Add insertAll for Iterables too, and fix some code style
561fc97 [Matei Zaharia] Update ExternalAppendOnlyMap to take an iterator as input
上级 3a69c72e
...@@ -55,10 +55,7 @@ case class Aggregator[K, V, C] ( ...@@ -55,10 +55,7 @@ case class Aggregator[K, V, C] (
combiners.iterator combiners.iterator
} else { } else {
val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners) val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners)
while (iter.hasNext) { combiners.insertAll(iter)
val pair = iter.next()
combiners.insert(pair._1, pair._2)
}
// TODO: Make this non optional in a future release // TODO: Make this non optional in a future release
Option(context).foreach(c => c.taskMetrics.memoryBytesSpilled = combiners.memoryBytesSpilled) Option(context).foreach(c => c.taskMetrics.memoryBytesSpilled = combiners.memoryBytesSpilled)
Option(context).foreach(c => c.taskMetrics.diskBytesSpilled = combiners.diskBytesSpilled) Option(context).foreach(c => c.taskMetrics.diskBytesSpilled = combiners.diskBytesSpilled)
......
...@@ -154,11 +154,8 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: ...@@ -154,11 +154,8 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
map.iterator.asInstanceOf[Iterator[(K, Array[Iterable[_]])]]) map.iterator.asInstanceOf[Iterator[(K, Array[Iterable[_]])]])
} else { } else {
val map = createExternalMap(numRdds) val map = createExternalMap(numRdds)
rddIterators.foreach { case (it, depNum) => for ((it, depNum) <- rddIterators) {
while (it.hasNext) { map.insertAll(it.map(pair => (pair._1, new CoGroupValue(pair._2, depNum))))
val kv = it.next()
map.insert(kv._1, new CoGroupValue(kv._2, depNum))
}
} }
context.taskMetrics.memoryBytesSpilled = map.memoryBytesSpilled context.taskMetrics.memoryBytesSpilled = map.memoryBytesSpilled
context.taskMetrics.diskBytesSpilled = map.diskBytesSpilled context.taskMetrics.diskBytesSpilled = map.diskBytesSpilled
......
...@@ -110,42 +110,69 @@ class ExternalAppendOnlyMap[K, V, C]( ...@@ -110,42 +110,69 @@ class ExternalAppendOnlyMap[K, V, C](
/** /**
* Insert the given key and value into the map. * Insert the given key and value into the map.
*/
def insert(key: K, value: V): Unit = {
insertAll(Iterator((key, value)))
}
/**
* Insert the given iterator of keys and values into the map.
* *
* If the underlying map is about to grow, check if the global pool of shuffle memory has * When the underlying map needs to grow, check if the global pool of shuffle memory has
* enough room for this to happen. If so, allocate the memory required to grow the map; * enough room for this to happen. If so, allocate the memory required to grow the map;
* otherwise, spill the in-memory map to disk. * otherwise, spill the in-memory map to disk.
* *
* The shuffle memory usage of the first trackMemoryThreshold entries is not tracked. * The shuffle memory usage of the first trackMemoryThreshold entries is not tracked.
*/ */
def insert(key: K, value: V) { def insertAll(entries: Iterator[Product2[K, V]]): Unit = {
// An update function for the map that we reuse across entries to avoid allocating
// a new closure each time
var curEntry: Product2[K, V] = null
val update: (Boolean, C) => C = (hadVal, oldVal) => { val update: (Boolean, C) => C = (hadVal, oldVal) => {
if (hadVal) mergeValue(oldVal, value) else createCombiner(value) if (hadVal) mergeValue(oldVal, curEntry._2) else createCombiner(curEntry._2)
} }
if (numPairsInMemory > trackMemoryThreshold && currentMap.atGrowThreshold) {
val mapSize = currentMap.estimateSize() while (entries.hasNext) {
var shouldSpill = false curEntry = entries.next()
val shuffleMemoryMap = SparkEnv.get.shuffleMemoryMap if (numPairsInMemory > trackMemoryThreshold && currentMap.atGrowThreshold) {
val mapSize = currentMap.estimateSize()
// Atomically check whether there is sufficient memory in the global pool for var shouldSpill = false
// this map to grow and, if possible, allocate the required amount val shuffleMemoryMap = SparkEnv.get.shuffleMemoryMap
shuffleMemoryMap.synchronized {
val previouslyOccupiedMemory = shuffleMemoryMap.get(threadId) // Atomically check whether there is sufficient memory in the global pool for
val availableMemory = maxMemoryThreshold - // this map to grow and, if possible, allocate the required amount
(shuffleMemoryMap.values.sum - previouslyOccupiedMemory.getOrElse(0L)) shuffleMemoryMap.synchronized {
val previouslyOccupiedMemory = shuffleMemoryMap.get(threadId)
// Assume map growth factor is 2x val availableMemory = maxMemoryThreshold -
shouldSpill = availableMemory < mapSize * 2 (shuffleMemoryMap.values.sum - previouslyOccupiedMemory.getOrElse(0L))
if (!shouldSpill) {
shuffleMemoryMap(threadId) = mapSize * 2 // Assume map growth factor is 2x
shouldSpill = availableMemory < mapSize * 2
if (!shouldSpill) {
shuffleMemoryMap(threadId) = mapSize * 2
}
}
// Do not synchronize spills
if (shouldSpill) {
spill(mapSize)
} }
} }
// Do not synchronize spills currentMap.changeValue(curEntry._1, update)
if (shouldSpill) { numPairsInMemory += 1
spill(mapSize)
}
} }
currentMap.changeValue(key, update) }
numPairsInMemory += 1
/**
* Insert the given iterable of keys and values into the map.
*
* When the underlying map needs to grow, check if the global pool of shuffle memory has
* enough room for this to happen. If so, allocate the memory required to grow the map;
* otherwise, spill the in-memory map to disk.
*
* The shuffle memory usage of the first trackMemoryThreshold entries is not tracked.
*/
def insertAll(entries: Iterable[Product2[K, V]]): Unit = {
insertAll(entries.iterator)
} }
/** /**
......
...@@ -63,12 +63,13 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext { ...@@ -63,12 +63,13 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
val map = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner, val map = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner,
mergeValue, mergeCombiners) mergeValue, mergeCombiners)
map.insert(1, 10) map.insertAll(Seq(
map.insert(2, 20) (1, 10),
map.insert(3, 30) (2, 20),
map.insert(1, 100) (3, 30),
map.insert(2, 200) (1, 100),
map.insert(1, 1000) (2, 200),
(1, 1000)))
val it = map.iterator val it = map.iterator
assert(it.hasNext) assert(it.hasNext)
val result = it.toSet[(Int, ArrayBuffer[Int])].map(kv => (kv._1, kv._2.toSet)) val result = it.toSet[(Int, ArrayBuffer[Int])].map(kv => (kv._1, kv._2.toSet))
...@@ -282,7 +283,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext { ...@@ -282,7 +283,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
assert(w1.hashCode === w2.hashCode) assert(w1.hashCode === w2.hashCode)
} }
(1 to 100000).map(_.toString).foreach { i => map.insert(i, i) } map.insertAll((1 to 100000).iterator.map(_.toString).map(i => (i, i)))
collisionPairs.foreach { case (w1, w2) => collisionPairs.foreach { case (w1, w2) =>
map.insert(w1, w2) map.insert(w1, w2)
map.insert(w2, w1) map.insert(w2, w1)
...@@ -355,7 +356,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext { ...@@ -355,7 +356,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
val map = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]]( val map = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](
createCombiner, mergeValue, mergeCombiners) createCombiner, mergeValue, mergeCombiners)
(1 to 100000).foreach { i => map.insert(i, i) } map.insertAll((1 to 100000).iterator.map(i => (i, i)))
map.insert(null.asInstanceOf[Int], 1) map.insert(null.asInstanceOf[Int], 1)
map.insert(1, null.asInstanceOf[Int]) map.insert(1, null.asInstanceOf[Int])
map.insert(null.asInstanceOf[Int], null.asInstanceOf[Int]) map.insert(null.asInstanceOf[Int], null.asInstanceOf[Int])
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册