提交 3f511953 编写于 作者: H Henry Saputra 提交者: Aljoscha Krettek

First cleanup attempt, mostly on Scala code, to follow guidelines from...

First cleanup attempt, mostly on Scala code, to follow guidelines from http://docs.scala-lang.org/style:
1. Remove return statement from Scala code where it is not necessary (end of method definition)
2. Remove extra ; from Scala and Java code
3. First drop to abide to 100 chars per line for Scala code.
   Will send another PR for other files as I have encountered.
4. Remove parentheses for empty argument methods that do not have side effect
   (see http://www.artima.com/pins1ed/composition-and-inheritance.html#i1343251059-1)
5. Remove unused import statements in Scala code as I have encountered them.

This is first drop to refactor and clean up Scala code to see comment/ response from community.
More PRs for follow up code will come.

Close #64
上级 59287062
......@@ -33,7 +33,7 @@ class WordCount extends Program with ProgramDescription with Serializable {
val words = input flatMap { _.toLowerCase().split("""\W+""") filter { _ != "" } map { (_, 1) } }
val counts = words groupBy { case (word, _) => word } reduce { (w1, w2) => (w1._1, w1._2 + w2._2) }
val output = counts.write(wordsOutput, CsvOutputFormat("\n", " "));
val output = counts.write(wordsOutput, CsvOutputFormat("\n", " "))
val plan = new ScalaPlan(Seq(output), "Word Count")
plan.setDefaultParallelism(numSubTasks)
......
......@@ -41,8 +41,8 @@ object RunJobRemote {
// You will also need to change the name of the jar if you change the
// project name and/or version. Before running this you also need
// to run "mvn package" to create the jar.
val ex = new RemoteExecutor("localhost", 6123, "target/stratosphere-project-0.1-SNAPSHOT.jar");
ex.executePlan(plan);
val ex = new RemoteExecutor("localhost", 6123, "target/stratosphere-project-0.1-SNAPSHOT.jar")
ex.executePlan(plan)
}
}
......
......@@ -426,7 +426,7 @@ public class HashMatchIteratorITCase {
while (iterator.callWithNextKey(matcher, collector));
iterator.close();;
iterator.close();
// assert that each expected match was seen
for (Entry<TestData.Key, Collection<RecordIntPairMatch>> entry : expectedMatchesMap.entrySet()) {
......@@ -473,7 +473,7 @@ public class HashMatchIteratorITCase {
while (iterator.callWithNextKey(matcher, collector));
iterator.close();;
iterator.close();
// assert that each expected match was seen
for (Entry<TestData.Key, Collection<RecordIntPairMatch>> entry : expectedMatchesMap.entrySet()) {
......
......@@ -69,7 +69,7 @@ public class IntPairSerializer extends TypeSerializer<IntPair> {
@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
target.write(source, 8);;
target.write(source, 8);
}
public static final class IntPairSerializerFactory implements TypeSerializerFactory<IntPair> {
......
......@@ -121,7 +121,7 @@ object AnnotationUtil {
val fieldSet = new FieldSet(notConstantSet1Annotation.value(): _*)
for (i <- 0 until scalaOp.getUDF().getOutputLength) {
for (i <- 0 until scalaOp.getUDF.getOutputLength) {
if (!fieldSet.contains(i)) {
properties.addForwardedField1(i, i)
}
......@@ -141,7 +141,7 @@ object AnnotationUtil {
val fieldSet = new FieldSet(notConstantSet2Annotation.value(): _*)
for (i <- 0 until scalaOp.getUDF().getOutputLength) {
for (i <- 0 until scalaOp.getUDF.getOutputLength) {
if (!fieldSet.contains(i)) {
properties.addForwardedField2(i, i)
}
......@@ -166,10 +166,10 @@ object AnnotationUtil {
}
// get constantSet annotation from stub
val constantSet: FunctionAnnotation.ConstantFields = scalaOp.getUserCodeAnnotation(classOf[FunctionAnnotation
.ConstantFields])
val notConstantSet: FunctionAnnotation.ConstantFieldsExcept = scalaOp.getUserCodeAnnotation(
classOf[FunctionAnnotation.ConstantFieldsExcept])
val constantSet: FunctionAnnotation.ConstantFields =
scalaOp.getUserCodeAnnotation(classOf[FunctionAnnotation.ConstantFields])
val notConstantSet: FunctionAnnotation.ConstantFieldsExcept =
scalaOp.getUserCodeAnnotation(classOf[FunctionAnnotation.ConstantFieldsExcept])
if (notConstantSet != null && constantSet != null) {
throw new RuntimeException("Either ConstantFields or ConstantFieldsExcept can be specified, not both.")
......@@ -186,7 +186,7 @@ object AnnotationUtil {
}
}
for (i <- 0 until scalaOp.getUDF().getOutputLength) {
for (i <- 0 until scalaOp.getUDF.getOutputLength) {
if (!nonConstant.contains(i)) {
properties.addForwardedField(i, i)
}
......
......@@ -15,24 +15,30 @@ package eu.stratosphere.api.scala
import language.experimental.macros
import scala.util.DynamicVariable
import scala.reflect.macros.Context
import eu.stratosphere.api.scala.analysis._
import eu.stratosphere.api.scala.operators.Annotations
import eu.stratosphere.api.common.operators.util.{ FieldSet => PactFieldSet }
import eu.stratosphere.api.common.operators.Operator
import eu.stratosphere.api.scala.codegen.MacroContextHolder
import scala.reflect.macros.Context
import eu.stratosphere.types.Record
case class KeyCardinality(key: FieldSelector, isUnique: Boolean, distinctCount: Option[Long], avgNumRecords: Option[Float]) {
case class KeyCardinality(
key: FieldSelector,
isUnique: Boolean,
distinctCount: Option[Long],
avgNumRecords: Option[Float]) {
@transient private var pactFieldSets = collection.mutable.Map[Operator[Record] with ScalaOperator[_, _], PactFieldSet]()
@transient private var pactFieldSets =
collection.mutable.Map[Operator[Record] with ScalaOperator[_, _], PactFieldSet]()
def getPactFieldSet(contract: Operator[Record] with ScalaOperator[_, _]): PactFieldSet = {
if (pactFieldSets == null)
pactFieldSets = collection.mutable.Map[Operator[Record] with ScalaOperator[_, _], PactFieldSet]()
if (pactFieldSets == null) {
pactFieldSets =
collection.mutable.Map[Operator[Record] with ScalaOperator[_, _], PactFieldSet]()
}
val keyCopy = key.copy
val keyCopy = key.copy()
contract.getUDF.attachOutputsToInputs(keyCopy.inputFields)
val keySet = keyCopy.selectedFields.toIndexSet.toArray
......@@ -57,27 +63,40 @@ trait OutputHintable[Out] { this: DataSet[Out] =>
def outputSize = contract.getCompilerHints().getOutputSize()
def outputSize_=(value: Long) = contract.getCompilerHints().setOutputSize(value)
def outputSize(value: Long): this.type = { contract.getCompilerHints().setOutputSize(value); this }
def outputSize(value: Long): this.type = {
contract.getCompilerHints().setOutputSize(value)
this
}
def outputCardinality = contract.getCompilerHints().getOutputCardinality()
def outputCardinality_=(value: Long) = contract.getCompilerHints().setOutputCardinality(value)
def outputCardinality(value: Long): this.type = { contract.getCompilerHints().setOutputCardinality(value); this }
def outputCardinality(value: Long): this.type = {
contract.getCompilerHints().setOutputCardinality(value)
this
}
def avgBytesPerRecord = contract.getCompilerHints().getAvgOutputRecordSize()
def avgBytesPerRecord_=(value: Float) = contract.getCompilerHints().setAvgOutputRecordSize(value)
def avgBytesPerRecord(value: Float): this.type = { contract.getCompilerHints().setAvgOutputRecordSize(value); this }
def avgBytesPerRecord(value: Float): this.type = {
contract.getCompilerHints().setAvgOutputRecordSize(value)
this
}
def filterFactor = contract.getCompilerHints().getFilterFactor()
def filterFactor_=(value: Float) = contract.getCompilerHints().setFilterFactor(value)
def filterFactor(value: Float): this.type = { contract.getCompilerHints().setFilterFactor(value); this }
def filterFactor(value: Float): this.type = {
contract.getCompilerHints().setFilterFactor(value)
this
}
def uniqueKey[Key](fields: Out => Key) = macro OutputHintableMacros.uniqueKey[Out, Key]
def applyHints(contract: Operator[Record] with ScalaOperator[_, _]): Unit = {
val hints = contract.getCompilerHints
if (hints.getUniqueFields != null)
if (hints.getUniqueFields != null) {
hints.getUniqueFields.clear()
}
_cardinalities.foreach { card =>
......@@ -92,7 +111,10 @@ trait OutputHintable[Out] { this: DataSet[Out] =>
object OutputHintableMacros {
def uniqueKey[Out: c.WeakTypeTag, Key: c.WeakTypeTag](c: Context { type PrefixType = OutputHintable[Out] })(fields: c.Expr[Out => Key]): c.Expr[Unit] = {
def uniqueKey[Out: c.WeakTypeTag, Key: c.WeakTypeTag]
(c: Context { type PrefixType = OutputHintable[Out] })(fields: c.Expr[Out => Key])
: c.Expr[Unit] = {
import c.universe._
val slave = MacroContextHolder.newMacroHelper(c)
......@@ -109,10 +131,14 @@ object OutputHintableMacros {
c.prefix.splice.addCardinality(card)
}
return result
result
}
def uniqueKeyWithDistinctCount[Out: c.WeakTypeTag, Key: c.WeakTypeTag](c: Context { type PrefixType = OutputHintable[Out] })(fields: c.Expr[Out => Key], distinctCount: c.Expr[Long]): c.Expr[Unit] = {
def uniqueKeyWithDistinctCount[Out: c.WeakTypeTag, Key: c.WeakTypeTag]
(c: Context { type PrefixType = OutputHintable[Out] })
(fields: c.Expr[Out => Key], distinctCount: c.Expr[Long])
: c.Expr[Unit] = {
import c.universe._
val slave = MacroContextHolder.newMacroHelper(c)
......@@ -129,10 +155,13 @@ object OutputHintableMacros {
c.prefix.splice.addCardinality(card)
}
return result
result
}
def cardinality[Out: c.WeakTypeTag, Key: c.WeakTypeTag](c: Context { type PrefixType = OutputHintable[Out] })(fields: c.Expr[Out => Key]): c.Expr[Unit] = {
def cardinality[Out: c.WeakTypeTag, Key: c.WeakTypeTag]
(c: Context { type PrefixType = OutputHintable[Out] })(fields: c.Expr[Out => Key])
: c.Expr[Unit] = {
import c.universe._
val slave = MacroContextHolder.newMacroHelper(c)
......@@ -149,10 +178,14 @@ object OutputHintableMacros {
c.prefix.splice.addCardinality(card)
}
return result
result
}
def cardinalityWithDistinctCount[Out: c.WeakTypeTag, Key: c.WeakTypeTag](c: Context { type PrefixType = OutputHintable[Out] })(fields: c.Expr[Out => Key], distinctCount: c.Expr[Long]): c.Expr[Unit] = {
def cardinalityWithDistinctCount[Out: c.WeakTypeTag, Key: c.WeakTypeTag]
(c: Context { type PrefixType = OutputHintable[Out] })
(fields: c.Expr[Out => Key], distinctCount: c.Expr[Long])
: c.Expr[Unit] = {
import c.universe._
val slave = MacroContextHolder.newMacroHelper(c)
......@@ -169,10 +202,12 @@ object OutputHintableMacros {
c.prefix.splice.addCardinality(card)
}
return result
result
}
def cardinalityWithAvgNumRecords[Out: c.WeakTypeTag, Key: c.WeakTypeTag](c: Context { type PrefixType = OutputHintable[Out] })(fields: c.Expr[Out => Key], avgNumRecords: c.Expr[Float]): c.Expr[Unit] = {
def cardinalityWithAvgNumRecords[Out: c.WeakTypeTag, Key: c.WeakTypeTag]
(c: Context { type PrefixType = OutputHintable[Out] })
(fields: c.Expr[Out => Key], avgNumRecords: c.Expr[Float]): c.Expr[Unit] = {
import c.universe._
val slave = MacroContextHolder.newMacroHelper(c)
......@@ -189,10 +224,14 @@ object OutputHintableMacros {
c.prefix.splice.addCardinality(card)
}
return result
result
}
def cardinalityWithAll[Out: c.WeakTypeTag, Key: c.WeakTypeTag](c: Context { type PrefixType = OutputHintable[Out] })(fields: c.Expr[Out => Key], distinctCount: c.Expr[Long], avgNumRecords: c.Expr[Float]): c.Expr[Unit] = {
def cardinalityWithAll[Out: c.WeakTypeTag, Key: c.WeakTypeTag]
(c: Context { type PrefixType = OutputHintable[Out] })
(fields: c.Expr[Out => Key], distinctCount: c.Expr[Long], avgNumRecords: c.Expr[Float])
: c.Expr[Unit] = {
import c.universe._
val slave = MacroContextHolder.newMacroHelper(c)
......@@ -201,15 +240,14 @@ object OutputHintableMacros {
val result = reify {
val contract = c.prefix.splice.getContract
val hints = contract.getCompilerHints
val keySelection = generatedKeySelector.splice
val key = new FieldSelector(c.prefix.splice.getContract.getUDF.outputUDT, keySelection)
val key = new FieldSelector(contract.getUDF.outputUDT, keySelection)
val card = KeyCardinality(key, false, Some(distinctCount.splice), Some(avgNumRecords.splice))
c.prefix.splice.addCardinality(card)
}
return result
result
}
}
......@@ -220,9 +258,12 @@ trait InputHintable[In, Out] { this: DataSet[Out] =>
def getInputUDT: UDT[In]
def getOutputUDT: UDT[Out]
def neglects[Fields](fields: In => Fields): Unit = macro InputHintableMacros.neglects[In, Out, Fields]
def observes[Fields](fields: In => Fields): Unit = macro InputHintableMacros.observes[In, Out, Fields]
def preserves[Fields](from: In => Fields, to: Out => Fields) = macro InputHintableMacros.preserves[In, Out, Fields]
def neglects[Fields](fields: In => Fields): Unit =
macro InputHintableMacros.neglects[In, Out, Fields]
def observes[Fields](fields: In => Fields): Unit =
macro InputHintableMacros.observes[In, Out, Fields]
def preserves[Fields](from: In => Fields, to: Out => Fields) =
macro InputHintableMacros.preserves[In, Out, Fields]
}
object InputHintable {
......@@ -235,7 +276,11 @@ object InputHintable {
object InputHintableMacros {
def neglects[In: c.WeakTypeTag, Out: c.WeakTypeTag, Fields: c.WeakTypeTag](c: Context { type PrefixType = InputHintable[In, Out] })(fields: c.Expr[In => Fields]): c.Expr[Unit] = {
def neglects[In: c.WeakTypeTag, Out: c.WeakTypeTag, Fields: c.WeakTypeTag]
(c: Context { type PrefixType = InputHintable[In, Out] })
(fields: c.Expr[In => Fields])
: c.Expr[Unit] = {
import c.universe._
val slave = MacroContextHolder.newMacroHelper(c)
......@@ -248,10 +293,14 @@ object InputHintableMacros {
val unreadFields = fieldSelector.selectedFields.map(_.localPos).toSet
unreadFields.foreach(c.prefix.splice.markUnread(_))
}
return result
result
}
def observes[In: c.WeakTypeTag, Out: c.WeakTypeTag, Fields: c.WeakTypeTag](c: Context { type PrefixType = InputHintable[In, Out] })(fields: c.Expr[In => Fields]): c.Expr[Unit] = {
def observes[In: c.WeakTypeTag, Out: c.WeakTypeTag, Fields: c.WeakTypeTag]
(c: Context { type PrefixType = InputHintable[In, Out] })
(fields: c.Expr[In => Fields])
: c.Expr[Unit] = {
import c.universe._
val slave = MacroContextHolder.newMacroHelper(c)
......@@ -265,14 +314,18 @@ object InputHintableMacros {
val unreadFields = fieldSelector.inputFields.map(_.localPos).toSet.diff(fieldSet)
unreadFields.foreach(c.prefix.splice.markUnread(_))
}
return result
result
}
def preserves[In: c.WeakTypeTag, Out: c.WeakTypeTag, Fields: c.WeakTypeTag](c: Context { type PrefixType = InputHintable[In, Out] })(from: c.Expr[In => Fields], to: c.Expr[Out => Fields]): c.Expr[Unit] = {
def preserves[In: c.WeakTypeTag, Out: c.WeakTypeTag, Fields: c.WeakTypeTag]
(c: Context { type PrefixType = InputHintable[In, Out] })
(from: c.Expr[In => Fields], to: c.Expr[Out => Fields])
: c.Expr[Unit] = {
import c.universe._
val slave = MacroContextHolder.newMacroHelper(c)
val slave = MacroContextHolder.newMacroHelper(c)
val generatedFromFieldSelector = slave.getSelector(from)
val generatedToFieldSelector = slave.getSelector(to)
......@@ -281,14 +334,16 @@ object InputHintableMacros {
val fromSelector = new FieldSelector(c.prefix.splice.getInputUDT, fromSelection)
val toSelection = generatedToFieldSelector.splice
val toSelector = new FieldSelector(c.prefix.splice.getOutputUDT, toSelection)
val pairs = fromSelector.selectedFields.map(_.localPos).zip(toSelector.selectedFields.map(_.localPos))
val pairs = fromSelector.selectedFields.map(_.localPos)
.zip(toSelector.selectedFields.map(_.localPos))
pairs.foreach(c.prefix.splice.markCopied.tupled)
}
return result
result
}
}
trait OneInputHintable[In, Out] extends InputHintable[In, Out] with OutputHintable[Out] { this: DataSet[Out] =>
trait OneInputHintable[In, Out] extends InputHintable[In, Out] with OutputHintable[Out] {
this: DataSet[Out] =>
override def markUnread = contract.getUDF.asInstanceOf[UDF1[In, Out]].markInputFieldUnread _
override def markCopied = contract.getUDF.asInstanceOf[UDF1[In, Out]].markFieldCopied _
......@@ -298,15 +353,19 @@ trait OneInputHintable[In, Out] extends InputHintable[In, Out] with OutputHintab
trait TwoInputHintable[LeftIn, RightIn, Out] extends OutputHintable[Out] { this: DataSet[Out] =>
val left = new DataSet[Out](contract) with OneInputHintable[LeftIn, Out] {
override def markUnread = { pos: Int => contract.getUDF.asInstanceOf[UDF2[LeftIn, RightIn, Out]].markInputFieldUnread(Left(pos))}
override def markCopied = { (from: Int, to: Int) => contract.getUDF.asInstanceOf[UDF2[LeftIn, RightIn, Out]].markFieldCopied(Left(from), to)}
override def markUnread = { pos: Int => contract.getUDF.asInstanceOf[UDF2[LeftIn, RightIn, Out]]
.markInputFieldUnread(Left(pos))}
override def markCopied = { (from: Int, to: Int) => contract.getUDF
.asInstanceOf[UDF2[LeftIn, RightIn, Out]].markFieldCopied(Left(from), to)}
override def getInputUDT = contract.getUDF.asInstanceOf[UDF2[LeftIn, RightIn, Out]].leftInputUDT
override def getOutputUDT = contract.getUDF.asInstanceOf[UDF2[LeftIn, RightIn, Out]].outputUDT
}
val right = new DataSet[Out](contract) with OneInputHintable[RightIn, Out] {
override def markUnread = { pos: Int => contract.getUDF.asInstanceOf[UDF2[LeftIn, RightIn, Out]].markInputFieldUnread(Right(pos))}
override def markCopied = { (from: Int, to: Int) => contract.getUDF.asInstanceOf[UDF2[LeftIn, RightIn, Out]].markFieldCopied(Right(from), to)}
override def markUnread = { pos: Int => contract.getUDF.asInstanceOf[UDF2[LeftIn, RightIn, Out]]
.markInputFieldUnread(Right(pos))}
override def markCopied = { (from: Int, to: Int) => contract.getUDF
.asInstanceOf[UDF2[LeftIn, RightIn, Out]].markFieldCopied(Right(from), to)}
override def getInputUDT = contract.getUDF.asInstanceOf[UDF2[LeftIn, RightIn, Out]].rightInputUDT
override def getOutputUDT = contract.getUDF.asInstanceOf[UDF2[LeftIn, RightIn, Out]].outputUDT
}
......
......@@ -28,7 +28,8 @@ import eu.stratosphere.types.Record
class DataSet[T] (val contract: Operator[Record] with ScalaOperator[T, Record]) {
def cogroup[RightIn](rightInput: DataSet[RightIn]) = new CoGroupDataSet[T, RightIn](this, rightInput)
def cogroup[RightIn](rightInput: DataSet[RightIn]) =
new CoGroupDataSet[T, RightIn](this, rightInput)
def cross[RightIn](rightInput: DataSet[RightIn]) = new CrossDataSet[T, RightIn](this, rightInput)
def join[RightIn](rightInput: DataSet[RightIn]) = new JoinDataSet[T, RightIn](this, rightInput)
......@@ -42,17 +43,28 @@ class DataSet[T] (val contract: Operator[Record] with ScalaOperator[T, Record])
// reduce without grouping
def reduce(fun: (T, T) => T) = macro ReduceMacros.globalReduce[T]
def reduceAll[Out](fun: Iterator[T] => Out) = macro ReduceMacros.globalReduceGroup[T, Out]
def combinableReduceAll[Out](fun: Iterator[T] => Out) = macro ReduceMacros.combinableGlobalReduceGroup[T]
def combinableReduceAll[Out](fun: Iterator[T] => Out) =
macro ReduceMacros.combinableGlobalReduceGroup[T]
def union(secondInput: DataSet[T]) = UnionOperator.impl[T](this, secondInput)
def iterateWithDelta[DeltaItem](stepFunction: DataSet[T] => (DataSet[T], DataSet[DeltaItem])) = macro IterateMacros.iterateWithDelta[T, DeltaItem]
def iterateWithDelta[DeltaItem](stepFunction: DataSet[T] => (DataSet[T], DataSet[DeltaItem])) =
macro IterateMacros.iterateWithDelta[T, DeltaItem]
def iterate(n: Int, stepFunction: DataSet[T] => DataSet[T])= macro IterateMacros.iterate[T]
def iterateWithTermination[C](n: Int, stepFunction: DataSet[T] => DataSet[T], terminationFunction: (DataSet[T],
DataSet[T]) => DataSet[C]) = macro IterateMacros.iterateWithTermination[T, C]
def iterateWithDelta[SolutionKey, WorksetItem](workset: DataSet[WorksetItem], solutionSetKey: T => SolutionKey, stepFunction: (DataSet[T], DataSet[WorksetItem]) => (DataSet[T], DataSet[WorksetItem]), maxIterations: Int) = macro WorksetIterateMacros.iterateWithDelta[T, SolutionKey, WorksetItem]
def iterateWithTermination[C](
n: Int,
stepFunction: DataSet[T] => DataSet[T],
terminationFunction: (DataSet[T],DataSet[T]) => DataSet[C]) =
macro IterateMacros.iterateWithTermination[T, C]
def iterateWithDelta[SolutionKey, WorksetItem](
workset: DataSet[WorksetItem],
solutionSetKey: T => SolutionKey,
stepFunction: (DataSet[T], DataSet[WorksetItem]) =>
(DataSet[T], DataSet[WorksetItem]), maxIterations: Int) =
macro WorksetIterateMacros.iterateWithDelta[T, SolutionKey, WorksetItem]
def write(url: String, format: ScalaOutputFormat[T]) = DataSinkOperator.write(this, url, format)
def write(url: String, format: ScalaOutputFormat[T], name: String) = DataSinkOperator.write(this, url, format, name)
def write(url: String, format: ScalaOutputFormat[T], name: String) =
DataSinkOperator.write(this, url, format, name)
}
\ No newline at end of file
......@@ -37,7 +37,7 @@ object DataSource {
override def getUDF = format.getUDF
override def persistConfiguration() = format.persistConfiguration(this.getParameters())
override def persistConfiguration() = format.persistConfiguration(this.getParameters)
}
// case "ext" => new GenericDataSource[GenericInputFormat[_]](format.asInstanceOf[GenericInputFormat[_]], uri.toString)
......
......@@ -26,8 +26,9 @@ import eu.stratosphere.api.common.operators.Union
import eu.stratosphere.types.Record
import eu.stratosphere.types.{Nothing => JavaNothing}
trait ScalaOperator[T, UT] { this: Operator[UT] =>
def getUDF(): UDF[T]
trait ScalaOperator[T, UT] {
this: Operator[UT] =>
def getUDF: UDF[T]
def getKeys: Seq[FieldSelector] = Seq()
def persistConfiguration(): Unit = {}
......@@ -41,14 +42,17 @@ trait ScalaOperator[T, UT] { this: Operator[UT] =>
for ((key, inputNum) <- getKeys.zipWithIndex) {
val source = key.selectedFields.toSerializerIndexArray
val target = optimizerNode map { _.getRemappedKeys(inputNum) } getOrElse { contract.getKeyColumns(inputNum) }
val target = optimizerNode map { _.getRemappedKeys(inputNum) } getOrElse {
contract.getKeyColumns(inputNum) }
assert(source.length == target.length, "Attempt to write " + source.length + " key indexes to an array of size " + target.length)
assert(source.length == target.length, "Attempt to write " + source.length +
" key indexes to an array of size " + target.length)
System.arraycopy(source, 0, target, 0, source.length)
}
}
case _ if getKeys.size > 0 => throw new UnsupportedOperationException("Attempted to set keys on a contract that doesn't support them")
case _ if getKeys.size > 0 => throw new UnsupportedOperationException("Attempted to set " +
"keys on a contract that doesn't support them")
case _ =>
}
......@@ -60,11 +64,14 @@ trait ScalaOperator[T, UT] { this: Operator[UT] =>
protected def annotations: Seq[Annotation] = Seq()
def getUserCodeAnnotation[A <: Annotation](annotationClass: Class[A]): A = {
val res = annotations find { _.annotationType().equals(annotationClass) } map { _.asInstanceOf[A] } getOrElse null.asInstanceOf[A]
val res = annotations find { _.annotationType().equals(annotationClass) } map {
_.asInstanceOf[A] } getOrElse null.asInstanceOf[A]
// println("returning ANOOT: " + res + " FOR: " + annotationClass.toString)
// res match {
// case r : FunctionAnnotation.ConstantFieldsFirst => println("CONSTANT FIELDS FIRST: " + r.value().mkString(","))
// case r : FunctionAnnotation.ConstantFieldsSecond => println("CONSTANT FIELDS SECOND: " + r.value().mkString(","))
// case r : FunctionAnnotation.ConstantFieldsFirst => println("CONSTANT FIELDS FIRST: " +
// r.value().mkString(","))
// case r : FunctionAnnotation.ConstantFieldsSecond => println("CONSTANT FIELDS SECOND: " +
// r.value().mkString(","))
// case _ =>
// }
res
......@@ -75,38 +82,46 @@ trait NoOpScalaOperator[In, Out] extends ScalaOperator[Out, Record] { this: Oper
}
trait HigherOrderScalaOperator[T] extends ScalaOperator[T, Record] { this: Operator[Record] =>
override def getUDF(): UDF0[T]
override def getUDF: UDF0[T]
}
trait BulkIterationScalaOperator[T] extends HigherOrderScalaOperator[T] { this: Operator[Record] =>
}
trait DeltaIterationScalaOperator[T] extends HigherOrderScalaOperator[T] { this: Operator[Record] =>
trait DeltaIterationScalaOperator[T] extends HigherOrderScalaOperator[T] {
this: Operator[Record] =>
val key: FieldSelector
}
trait ScalaOutputOperator[In] extends ScalaOperator[Nothing, JavaNothing] { this: Operator[JavaNothing] =>
override def getUDF(): UDF1[In, Nothing]
trait ScalaOutputOperator[In] extends ScalaOperator[Nothing, JavaNothing] {
this: Operator[JavaNothing] =>
override def getUDF: UDF1[In, Nothing]
}
trait OneInputScalaOperator[In, Out] extends ScalaOperator[Out, Record] { this: Operator[Record] =>
override def getUDF(): UDF1[In, Out]
override def getUDF: UDF1[In, Out]
}
trait TwoInputScalaOperator[In1, In2, Out] extends ScalaOperator[Out, Record] { this: Operator[Record] =>
override def getUDF(): UDF2[In1, In2, Out]
trait TwoInputScalaOperator[In1, In2, Out] extends ScalaOperator[Out, Record] {
this: Operator[Record] =>
override def getUDF: UDF2[In1, In2, Out]
}
trait UnionScalaOperator[In] extends TwoInputScalaOperator[In, In, In] { this: Union[Record] =>
override def getUDF(): UDF2[In, In, In]
trait UnionScalaOperator[In] extends TwoInputScalaOperator[In, In, In] {
this: Union[Record] =>
override def getUDF: UDF2[In, In, In]
}
trait OneInputKeyedScalaOperator[In, Out] extends OneInputScalaOperator[In, Out] { this: Operator[Record] =>
trait OneInputKeyedScalaOperator[In, Out] extends OneInputScalaOperator[In, Out] {
this: Operator[Record] =>
val key: FieldSelector
override def getKeys = Seq(key)
}
trait TwoInputKeyedScalaOperator[LeftIn, RightIn, Out] extends TwoInputScalaOperator[LeftIn, RightIn, Out] { this: Operator[Record] =>
trait TwoInputKeyedScalaOperator[LeftIn, RightIn, Out]
extends TwoInputScalaOperator[LeftIn, RightIn, Out] {
this: Operator[Record] =>
val leftKey: FieldSelector
val rightKey: FieldSelector
override def getKeys = Seq(leftKey, rightKey)
......
......@@ -18,49 +18,54 @@ import scala.reflect.macros.Context
import eu.stratosphere.api.java.record.operators.MapOperator
import eu.stratosphere.types.Record
import eu.stratosphere.api.common.operators.Operator
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.util.Collector
import eu.stratosphere.api.scala.codegen.{MacroContextHolder, Util}
import eu.stratosphere.api.scala._
import eu.stratosphere.api.scala.analysis._
import eu.stratosphere.api.scala.functions.{MapFunction, FlatMapFunction, FilterFunction, MapFunctionBase}
import eu.stratosphere.api.scala.functions.{MapFunction, MapFunctionBase}
import eu.stratosphere.api.scala.functions.{FlatMapFunction, FilterFunction}
object MapMacros {
def map[In: c.WeakTypeTag, Out: c.WeakTypeTag](c: Context { type PrefixType = DataSet[In] })(fun: c.Expr[In => Out]): c.Expr[DataSet[Out] with OneInputHintable[In, Out]] = {
def map[In: c.WeakTypeTag, Out: c.WeakTypeTag]
(c: Context { type PrefixType = DataSet[In] })
(fun: c.Expr[In => Out]): c.Expr[DataSet[Out]
with OneInputHintable[In, Out]] = {
import c.universe._
val slave = MacroContextHolder.newMacroHelper(c)
// val (paramName, udfBody) = slave.extractOneInputUdf(fun.tree)
val (udtIn, createUdtIn) = slave.mkUdtClass[In]
val (udtOut, createUdtOut) = slave.mkUdtClass[Out]
val stub: c.Expr[MapFunctionBase[In, Out]] = if (fun.actualType <:< weakTypeOf[MapFunction[In, Out]])
reify { fun.splice.asInstanceOf[MapFunctionBase[In, Out]] }
else reify {
implicit val inputUDT: UDT[In] = c.Expr[UDT[In]](createUdtIn).splice
implicit val outputUDT: UDT[Out] = c.Expr[UDT[Out]](createUdtOut).splice
new MapFunctionBase[In, Out] {
// val userFun = ClosureCleaner.clean(fun.splice)
// val userFun = fun.splice
override def map(record: Record, out: Collector[Record]) = {
val input = deserializer.deserializeRecyclingOn(record)
val output = fun.splice.apply(input)
record.setNumFields(outputLength)
for (field <- discard)
record.setNull(field)
serializer.serialize(output, record)
out.collect(record)
val (udtIn, createUdtIn) = slave.mkUdtClass[In]()
val (udtOut, createUdtOut) = slave.mkUdtClass[Out]()
val stub: c.Expr[MapFunctionBase[In, Out]] =
if (fun.actualType <:< weakTypeOf[MapFunction[In, Out]])
reify { fun.splice.asInstanceOf[MapFunctionBase[In, Out]] }
else
reify {
implicit val inputUDT: UDT[In] = c.Expr[UDT[In]](createUdtIn).splice
implicit val outputUDT: UDT[Out] = c.Expr[UDT[Out]](createUdtOut).splice
new MapFunctionBase[In, Out] {
// val userFun = ClosureCleaner.clean(fun.splice)
// val userFun = fun.splice
override def map(record: Record, out: Collector[Record]) = {
val input = deserializer.deserializeRecyclingOn(record)
val output = fun.splice.apply(input)
record.setNumFields(outputLength)
for (field <- discard)
record.setNull(field)
serializer.serialize(output, record)
out.collect(record)
}
}
}
}
}
val contract = reify {
val input = c.prefix.splice.contract
......@@ -78,47 +83,53 @@ object MapMacros {
stream
}
val result = c.Expr[DataSet[Out] with OneInputHintable[In, Out]](Block(List(udtIn, udtOut), contract.tree))
val result = c.Expr[DataSet[Out]
with OneInputHintable[In, Out]](Block(List(udtIn, udtOut), contract.tree))
return result
result
}
def flatMap[In: c.WeakTypeTag, Out: c.WeakTypeTag](c: Context { type PrefixType = DataSet[In] })(fun: c.Expr[In => Iterator[Out]]): c.Expr[DataSet[Out] with OneInputHintable[In, Out]] = {
def flatMap[In: c.WeakTypeTag, Out: c.WeakTypeTag]
(c: Context { type PrefixType = DataSet[In] })
(fun: c.Expr[In => Iterator[Out]]): c.Expr[DataSet[Out]
with OneInputHintable[In, Out]] = {
import c.universe._
val slave = MacroContextHolder.newMacroHelper(c)
// val (paramName, udfBody) = slave.extractOneInputUdf(fun.tree)
val (udtIn, createUdtIn) = slave.mkUdtClass[In]
val (udtOut, createUdtOut) = slave.mkUdtClass[Out]
val (udtIn, createUdtIn) = slave.mkUdtClass[In]()
val (udtOut, createUdtOut) = slave.mkUdtClass[Out]()
val stub: c.Expr[MapFunctionBase[In, Out]] = if (fun.actualType <:< weakTypeOf[FlatMapFunction[In, Out]])
reify { fun.splice.asInstanceOf[MapFunctionBase[In, Out]] }
else reify {
implicit val inputUDT: UDT[In] = c.Expr[UDT[In]](createUdtIn).splice
implicit val outputUDT: UDT[Out] = c.Expr[UDT[Out]](createUdtOut).splice
new MapFunctionBase[In, Out] {
override def map(record: Record, out: Collector[Record]) = {
val input = deserializer.deserializeRecyclingOn(record)
val output = fun.splice.apply(input)
val stub: c.Expr[MapFunctionBase[In, Out]] =
if (fun.actualType <:< weakTypeOf[FlatMapFunction[In, Out]])
reify { fun.splice.asInstanceOf[MapFunctionBase[In, Out]] }
else reify {
implicit val inputUDT: UDT[In] = c.Expr[UDT[In]](createUdtIn).splice
implicit val outputUDT: UDT[Out] = c.Expr[UDT[Out]](createUdtOut).splice
new MapFunctionBase[In, Out] {
override def map(record: Record, out: Collector[Record]) = {
val input = deserializer.deserializeRecyclingOn(record)
val output = fun.splice.apply(input)
if (output.nonEmpty) {
if (output.nonEmpty) {
record.setNumFields(outputLength)
record.setNumFields(outputLength)
for (field <- discard)
record.setNull(field)
for (field <- discard)
record.setNull(field)
for (item <- output) {
for (item <- output) {
serializer.serialize(item, record)
out.collect(record)
serializer.serialize(item, record)
out.collect(record)
}
}
}
}
}
}
val contract = reify {
val input = c.prefix.splice.contract
val generatedStub = ClosureCleaner.clean(stub.splice)
......@@ -135,33 +146,41 @@ object MapMacros {
stream
}
val result = c.Expr[DataSet[Out] with OneInputHintable[In, Out]](Block(List(udtIn, udtOut), contract.tree))
val result = c.Expr[DataSet[Out] with OneInputHintable[In, Out]](Block(List(udtIn, udtOut),
contract.tree))
return result
result
}
def filter[In: c.WeakTypeTag](c: Context { type PrefixType = DataSet[In] })(fun: c.Expr[In => Boolean]): c.Expr[DataSet[In] with OneInputHintable[In, In]] = {
def filter[In: c.WeakTypeTag]
(c: Context { type PrefixType = DataSet[In] })
(fun: c.Expr[In => Boolean]): c.Expr[DataSet[In]
with OneInputHintable[In, In]] = {
import c.universe._
val slave = MacroContextHolder.newMacroHelper(c)
// val (paramName, udfBody) = slave.extractOneInputUdf(fun.tree)
val (udtIn, createUdtIn) = slave.mkUdtClass[In]
val stub: c.Expr[MapFunctionBase[In, In]] = if (fun.actualType <:< weakTypeOf[FilterFunction[In, In]])
reify { fun.splice.asInstanceOf[MapFunctionBase[In, In]] }
else reify {
implicit val inputUDT: UDT[In] = c.Expr[UDT[In]](createUdtIn).splice
new MapFunctionBase[In, In] {
override def map(record: Record, out: Collector[Record]) = {
val input = deserializer.deserializeRecyclingOn(record)
if (fun.splice.apply(input)) {
out.collect(record)
// val (paramName, udfBody) = slave.extractOneInputUdf(fun.tree)
val (udtIn, createUdtIn) = slave.mkUdtClass[In]()
val stub: c.Expr[MapFunctionBase[In, In]] =
if (fun.actualType <:< weakTypeOf[FilterFunction[In, In]])
reify { fun.splice.asInstanceOf[MapFunctionBase[In, In]] }
else
reify {
implicit val inputUDT: UDT[In] = c.Expr[UDT[In]](createUdtIn).splice
new MapFunctionBase[In, In] {
override def map(record: Record, out: Collector[Record]) = {
val input = deserializer.deserializeRecyclingOn(record)
if (fun.splice.apply(input)) {
out.collect(record)
}
}
}
}
}
}
val contract = reify {
val input = c.prefix.splice.contract
val generatedStub = ClosureCleaner.clean(stub.splice)
......@@ -175,14 +194,15 @@ object MapMacros {
}
val stream = new DataSet[In](contract) with OneInputHintable[In, In] {}
contract.persistHints = { () =>
stream.applyHints(contract);
stream.applyHints(contract)
0 until generatedStub.udf.getOutputLength foreach { i => stream.markCopied(i, i) }
}
stream
}
val result = c.Expr[DataSet[In] with OneInputHintable[In, In]](Block(List(udtIn), contract.tree))
val result = c.Expr[DataSet[In]
with OneInputHintable[In, In]](Block(List(udtIn), contract.tree))
return result
result
}
}
......@@ -23,12 +23,14 @@ import eu.stratosphere.types.Record
object UnionOperator {
def impl[In](firstInput: DataSet[In], secondInput: DataSet[In]): DataSet[In] = {
val union = new Union[Record](firstInput.contract, secondInput.contract) with UnionScalaOperator[In] {
private val inputUDT = firstInput.contract.getUDF().outputUDT
val union = new Union[Record](firstInput.contract, secondInput.contract)
with UnionScalaOperator[In] {
private val inputUDT = firstInput.contract.getUDF.outputUDT
private val udf: UDF2[In, In, In] = new UDF2(inputUDT, inputUDT, inputUDT)
override def getUDF = udf;
override def getUDF = udf
}
return new DataSet(union)
new DataSet(union)
}
}
......@@ -319,7 +319,7 @@ public class FilterITCase extends JavaProgramTestBase {
public boolean filter(Tuple3<Integer, Long, String> value) throws Exception {
return (value.f1 == (broadcastSum / 11));
}
}).withBroadcastSet(intDs, "ints");;
}).withBroadcastSet(intDs, "ints");
filterDs.writeAsCsv(resultPath);
env.execute();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册