提交 47b6b38c 编写于 作者: J jerryshao 提交者: Matei Zaharia

[SPARK-2125] Add sort flag and move sort into shuffle implementations

This patch adds a sort flag into ShuffleDependecy and moves sort into hash shuffle implementation.

Moving sort into shuffle implementation can give space for other shuffle implementations (like sort-based shuffle) to better optimize sort through shuffle.

Author: jerryshao <saisai.shao@intel.com>

Closes #1210 from jerryshao/SPARK-2125 and squashes the following commits:

2feaf7b [jerryshao] revert MimaExcludes
ceddf75 [jerryshao] add MimaExeclude
f674ff4 [jerryshao] Add missing Scope restriction
b9fe0dd [jerryshao] Fix some style issues according to comments
ef6b729 [jerryshao] Change sort flag into Option
3f6eeed [jerryshao] Fix issues related to unit test
2f552a5 [jerryshao] Minor changes about naming and order
c92a281 [jerryshao] Move sort into shuffle implementations
上级 ab3c6a45
...@@ -19,6 +19,7 @@ package org.apache.spark ...@@ -19,6 +19,7 @@ package org.apache.spark
import org.apache.spark.annotation.DeveloperApi import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.SortOrder.SortOrder
import org.apache.spark.serializer.Serializer import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.ShuffleHandle import org.apache.spark.shuffle.ShuffleHandle
...@@ -62,7 +63,8 @@ class ShuffleDependency[K, V, C]( ...@@ -62,7 +63,8 @@ class ShuffleDependency[K, V, C](
val serializer: Option[Serializer] = None, val serializer: Option[Serializer] = None,
val keyOrdering: Option[Ordering[K]] = None, val keyOrdering: Option[Ordering[K]] = None,
val aggregator: Option[Aggregator[K, V, C]] = None, val aggregator: Option[Aggregator[K, V, C]] = None,
val mapSideCombine: Boolean = false) val mapSideCombine: Boolean = false,
val sortOrder: Option[SortOrder] = None)
extends Dependency(rdd.asInstanceOf[RDD[Product2[K, V]]]) { extends Dependency(rdd.asInstanceOf[RDD[Product2[K, V]]]) {
val shuffleId: Int = rdd.context.newShuffleId() val shuffleId: Int = rdd.context.newShuffleId()
......
...@@ -57,14 +57,13 @@ class OrderedRDDFunctions[K : Ordering : ClassTag, ...@@ -57,14 +57,13 @@ class OrderedRDDFunctions[K : Ordering : ClassTag,
*/ */
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size): RDD[P] = { def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size): RDD[P] = {
val part = new RangePartitioner(numPartitions, self, ascending) val part = new RangePartitioner(numPartitions, self, ascending)
val shuffled = new ShuffledRDD[K, V, V, P](self, part).setKeyOrdering(ordering) new ShuffledRDD[K, V, V, P](self, part)
shuffled.mapPartitions(iter => { .setKeyOrdering(ordering)
val buf = iter.toArray .setSortOrder(if (ascending) SortOrder.ASCENDING else SortOrder.DESCENDING)
if (ascending) {
buf.sortWith((x, y) => ordering.lt(x._1, y._1)).iterator
} else {
buf.sortWith((x, y) => ordering.gt(x._1, y._1)).iterator
}
}, preservesPartitioning = true)
} }
} }
private[spark] object SortOrder extends Enumeration {
type SortOrder = Value
val ASCENDING, DESCENDING = Value
}
...@@ -21,6 +21,7 @@ import scala.reflect.ClassTag ...@@ -21,6 +21,7 @@ import scala.reflect.ClassTag
import org.apache.spark._ import org.apache.spark._
import org.apache.spark.annotation.DeveloperApi import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.SortOrder.SortOrder
import org.apache.spark.serializer.Serializer import org.apache.spark.serializer.Serializer
private[spark] class ShuffledRDDPartition(val idx: Int) extends Partition { private[spark] class ShuffledRDDPartition(val idx: Int) extends Partition {
...@@ -51,6 +52,8 @@ class ShuffledRDD[K, V, C, P <: Product2[K, C] : ClassTag]( ...@@ -51,6 +52,8 @@ class ShuffledRDD[K, V, C, P <: Product2[K, C] : ClassTag](
private var mapSideCombine: Boolean = false private var mapSideCombine: Boolean = false
private var sortOrder: Option[SortOrder] = None
/** Set a serializer for this RDD's shuffle, or null to use the default (spark.serializer) */ /** Set a serializer for this RDD's shuffle, or null to use the default (spark.serializer) */
def setSerializer(serializer: Serializer): ShuffledRDD[K, V, C, P] = { def setSerializer(serializer: Serializer): ShuffledRDD[K, V, C, P] = {
this.serializer = Option(serializer) this.serializer = Option(serializer)
...@@ -75,8 +78,15 @@ class ShuffledRDD[K, V, C, P <: Product2[K, C] : ClassTag]( ...@@ -75,8 +78,15 @@ class ShuffledRDD[K, V, C, P <: Product2[K, C] : ClassTag](
this this
} }
/** Set sort order for RDD's sorting. */
def setSortOrder(sortOrder: SortOrder): ShuffledRDD[K, V, C, P] = {
this.sortOrder = Option(sortOrder)
this
}
override def getDependencies: Seq[Dependency[_]] = { override def getDependencies: Seq[Dependency[_]] = {
List(new ShuffleDependency(prev, part, serializer, keyOrdering, aggregator, mapSideCombine)) List(new ShuffleDependency(prev, part, serializer,
keyOrdering, aggregator, mapSideCombine, sortOrder))
} }
override val partitioner = Some(part) override val partitioner = Some(part)
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
package org.apache.spark.shuffle.hash package org.apache.spark.shuffle.hash
import org.apache.spark.{InterruptibleIterator, TaskContext} import org.apache.spark.{InterruptibleIterator, TaskContext}
import org.apache.spark.rdd.SortOrder
import org.apache.spark.serializer.Serializer import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.{BaseShuffleHandle, ShuffleReader} import org.apache.spark.shuffle.{BaseShuffleHandle, ShuffleReader}
...@@ -38,7 +39,7 @@ class HashShuffleReader[K, C]( ...@@ -38,7 +39,7 @@ class HashShuffleReader[K, C](
val iter = BlockStoreShuffleFetcher.fetch(handle.shuffleId, startPartition, context, val iter = BlockStoreShuffleFetcher.fetch(handle.shuffleId, startPartition, context,
Serializer.getSerializer(dep.serializer)) Serializer.getSerializer(dep.serializer))
if (dep.aggregator.isDefined) { val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) {
if (dep.mapSideCombine) { if (dep.mapSideCombine) {
new InterruptibleIterator(context, dep.aggregator.get.combineCombinersByKey(iter, context)) new InterruptibleIterator(context, dep.aggregator.get.combineCombinersByKey(iter, context))
} else { } else {
...@@ -49,6 +50,17 @@ class HashShuffleReader[K, C]( ...@@ -49,6 +50,17 @@ class HashShuffleReader[K, C](
} else { } else {
iter iter
} }
val sortedIter = for (sortOrder <- dep.sortOrder; ordering <- dep.keyOrdering) yield {
val buf = aggregatedIter.toArray
if (sortOrder == SortOrder.ASCENDING) {
buf.sortWith((x, y) => ordering.lt(x._1, y._1)).iterator
} else {
buf.sortWith((x, y) => ordering.gt(x._1, y._1)).iterator
}
}
sortedIter.getOrElse(aggregatedIter)
} }
/** Close this reader */ /** Close this reader */
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册