diff --git a/.gitignore b/.gitignore index af23bc2e05ed9166cb73085df497afae3b4b9dad..1bca0cf2e243847b0929458f8fe5b975e1285b11 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ .cache +scalastyle-output.xml .classpath .idea .metadata diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/ConnectedComponents.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/ConnectedComponents.scala index df28ad012ac76691d9a3ca9d300a7f01216ece53..e00b2ddc2ea7631c3c4696f3b77f9716852f3d00 100644 --- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/ConnectedComponents.scala +++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/ConnectedComponents.scala @@ -71,7 +71,8 @@ object ConnectedComponents { // assign the initial components (equal to the vertex id) val vertices = getVerticesDataSet(env).map { id => (id, id) } - // undirected edges by emitting for each input edge the input edges itself and an inverted version + // undirected edges by emitting for each input edge the input edges itself and an inverted + // version val edges = getEdgesDataSet(env).flatMap { edge => Seq(edge, (edge._2, edge._1)) } // open a delta iteration @@ -106,20 +107,22 @@ object ConnectedComponents { private def parseParameters(args: Array[String]): Boolean = { if (args.length > 0) { - fileOutput = true + fileOutput = true if (args.length == 4) { verticesPath = args(0) edgesPath = args(1) outputPath = args(2) maxIterations = args(3).toInt } else { - System.err.println("Usage: ConnectedComponents ") + System.err.println("Usage: ConnectedComponents " + + " ") false } } else { System.out.println("Executing Connected Components example with built-in default data.") System.out.println(" Provide parameters to read input data from a file.") - System.out.println(" Usage: ConnectedComponents ") + System.out.println(" Usage: ConnectedComponents " + + " ") } true } diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesBasic.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesBasic.scala index fe121d50d72b5900dcc12b22905711aff8b60573..da1c07834280e9eecd2bd1eedb097bd0d25b80bf 100644 --- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesBasic.scala +++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesBasic.scala @@ -65,111 +65,113 @@ import scala.collection.mutable * */ object EnumTrianglesBasic { - - def main(args: Array[String]) { - if (!parseParameters(args)) { - return - } - - // set up execution environment - val env = ExecutionEnvironment.getExecutionEnvironment - - // read input data - val edges = getEdgeDataSet(env) - - // project edges by vertex id - val edgesById = edges map(e => if (e.v1 < e.v2) e else Edge(e.v2, e.v1) ) - - val triangles = edgesById - // build triads - .groupBy("v1").sortGroup("v2", Order.ASCENDING).reduceGroup(new TriadBuilder()) - // filter triads - .join(edgesById).where("v2", "v3").equalTo("v1", "v2") { (t, _) => t } - - // emit result - if (fileOutput) { - triangles.writeAsCsv(outputPath, "\n", ",") - } else { - triangles.print() - } - - // execute program - env.execute("TriangleEnumeration Example") - } - - // ************************************************************************* - // USER DATA TYPES - // ************************************************************************* - - case class Edge(v1: Int, v2: Int) extends Serializable - case class Triad(v1: Int, v2: Int, v3: Int) extends Serializable - - - // ************************************************************************* - // USER FUNCTIONS - // ************************************************************************* - - /** - * Builds triads (triples of vertices) from pairs of edges that share a vertex. - * The first vertex of a triad is the shared vertex, the second and third vertex are ordered by vertexId. - * Assumes that input edges share the first vertex and are in ascending order of the second vertex. - */ - class TriadBuilder extends GroupReduceFunction[Edge, Triad] { - - val vertices = mutable.MutableList[Integer]() - - override def reduce(edges: java.lang.Iterable[Edge], out: Collector[Triad]) = { - - // clear vertex list - vertices.clear() - - // build and emit triads - for(e <- edges.asScala) { - - // combine vertex with all previously read vertices - for(v <- vertices) { - out.collect(Triad(e.v1, v, e.v2)) - } - vertices += e.v2 - } - } - } - - // ************************************************************************* - // UTIL METHODS - // ************************************************************************* - - private def parseParameters(args: Array[String]): Boolean = { - if (args.length > 0) { - fileOutput = true - if (args.length == 2) { - edgePath = args(0) - outputPath = args(1) - } else { - System.err.println("Usage: EnumTriangleBasic ") - false - } - } else { - System.out.println("Executing Enum Triangles Basic example with built-in default data.") - System.out.println(" Provide parameters to read input data from files.") - System.out.println(" See the documentation for the correct format of input files.") - System.out.println(" Usage: EnumTriangleBasic ") - } - true - } - - private def getEdgeDataSet(env: ExecutionEnvironment): DataSet[Edge] = { - if (fileOutput) { - env.readCsvFile[Edge](edgePath, fieldDelimiter = ' ', includedFields = Array(0, 1)) - } else { - val edges = EnumTrianglesData.EDGES.map{ case Array(v1, v2) => new Edge(v1.asInstanceOf[Int], v2.asInstanceOf[Int]) } - env.fromCollection(edges) - } - } - - - private var fileOutput: Boolean = false - private var edgePath: String = null - private var outputPath: String = null + + def main(args: Array[String]) { + if (!parseParameters(args)) { + return + } + + // set up execution environment + val env = ExecutionEnvironment.getExecutionEnvironment + + // read input data + val edges = getEdgeDataSet(env) + + // project edges by vertex id + val edgesById = edges map(e => if (e.v1 < e.v2) e else Edge(e.v2, e.v1) ) + + val triangles = edgesById + // build triads + .groupBy("v1").sortGroup("v2", Order.ASCENDING).reduceGroup(new TriadBuilder()) + // filter triads + .join(edgesById).where("v2", "v3").equalTo("v1", "v2") { (t, _) => t } + + // emit result + if (fileOutput) { + triangles.writeAsCsv(outputPath, "\n", ",") + } else { + triangles.print() + } + + // execute program + env.execute("TriangleEnumeration Example") + } + + // ************************************************************************* + // USER DATA TYPES + // ************************************************************************* + + case class Edge(v1: Int, v2: Int) extends Serializable + case class Triad(v1: Int, v2: Int, v3: Int) extends Serializable + + + // ************************************************************************* + // USER FUNCTIONS + // ************************************************************************* + + /** + * Builds triads (triples of vertices) from pairs of edges that share a vertex. The first vertex + * of a triad is the shared vertex, the second and third vertex are ordered by vertexId. Assumes + * that input edges share the first vertex and are in ascending order of the second vertex. + */ + class TriadBuilder extends GroupReduceFunction[Edge, Triad] { + + val vertices = mutable.MutableList[Integer]() + + override def reduce(edges: java.lang.Iterable[Edge], out: Collector[Triad]) = { + + // clear vertex list + vertices.clear() + + // build and emit triads + for(e <- edges.asScala) { + + // combine vertex with all previously read vertices + for(v <- vertices) { + out.collect(Triad(e.v1, v, e.v2)) + } + vertices += e.v2 + } + } + } + + // ************************************************************************* + // UTIL METHODS + // ************************************************************************* + + private def parseParameters(args: Array[String]): Boolean = { + if (args.length > 0) { + fileOutput = true + if (args.length == 2) { + edgePath = args(0) + outputPath = args(1) + } else { + System.err.println("Usage: EnumTriangleBasic ") + false + } + } else { + System.out.println("Executing Enum Triangles Basic example with built-in default data.") + System.out.println(" Provide parameters to read input data from files.") + System.out.println(" See the documentation for the correct format of input files.") + System.out.println(" Usage: EnumTriangleBasic ") + } + true + } + + private def getEdgeDataSet(env: ExecutionEnvironment): DataSet[Edge] = { + if (fileOutput) { + env.readCsvFile[Edge](edgePath, fieldDelimiter = ' ', includedFields = Array(0, 1)) + } else { + val edges = EnumTrianglesData.EDGES.map { + case Array(v1, v2) => new Edge(v1.asInstanceOf[Int], v2.asInstanceOf[Int]) + } + env.fromCollection(edges) + } + } + + + private var fileOutput: Boolean = false + private var edgePath: String = null + private var outputPath: String = null } diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala index 5f515c46b1177eebf032baab7c69ff0024b5c8cb..7fc9af7fd22cc261a4a2927e6a76e14d0e2bd574 100644 --- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala +++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala @@ -86,7 +86,9 @@ object PageRankBasic { // initialize lists .map(e => AdjacencyList(e.sourceId, Array(e.targetId))) // concatenate lists - .groupBy("sourceId").reduce((l1, l2) => AdjacencyList(l1.sourceId, l1.targetIds ++ l2.targetIds)) + .groupBy("sourceId").reduce { + (l1, l2) => AdjacencyList(l1.sourceId, l1.targetIds ++ l2.targetIds) + } // start iteration val finalRanks = pagesWithRanks.iterateWithTermination(maxIterations) { diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala index 5416bb49bf8c95e9328ceb6c5278bb09ab832d93..68f57e41a3c8f0ca0b5bf61027670fd9446978b4 100644 --- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala +++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala @@ -23,90 +23,93 @@ import org.apache.flink.util.Collector object TransitiveClosureNaive { - def main (args: Array[String]): Unit = { - if (!parseParameters(args)) { - return - } - - val env = ExecutionEnvironment.getExecutionEnvironment - - val edges = getEdgesDataSet(env) - - val paths = edges.iterateWithTermination(maxIterations) { prevPaths: DataSet[(Long, Long)] => - - val nextPaths = prevPaths - .join(edges) - .where(1).equalTo(0) { - (left, right) => (left._1,right._2) - } - .union(prevPaths) - .groupBy(0, 1) - .reduce((l, r) => l) - - val terminate = prevPaths - .coGroup(nextPaths) - .where(0).equalTo(0) { - (prev, next, out: Collector[(Long, Long)]) => { - val prevPaths = prev.toSet - for (n <- next) - if (!prevPaths.contains(n)) - out.collect(n) - } - } - (nextPaths, terminate) - } - - if (fileOutput) - paths.writeAsCsv(outputPath, "\n", " ") - else - paths.print() - - env.execute("Scala Transitive Closure Example") - - - } - - - private var fileOutput: Boolean = false - private var edgesPath: String = null - private var outputPath: String = null - private var maxIterations: Int = 10 - - private def parseParameters(programArguments: Array[String]): Boolean = { - if (programArguments.length > 0) { - fileOutput = true - if (programArguments.length == 3) { - edgesPath = programArguments(0) - outputPath = programArguments(1) - maxIterations = Integer.parseInt(programArguments(2)) - } - else { - System.err.println("Usage: TransitiveClosure ") - return false - } - } - else { - System.out.println("Executing TransitiveClosure example with default parameters and built-in default data.") - System.out.println(" Provide parameters to read input data from files.") - System.out.println(" See the documentation for the correct format of input files.") - System.out.println(" Usage: TransitiveClosure ") - } - true - } - - private def getEdgesDataSet(env: ExecutionEnvironment): DataSet[(Long, Long)] = { - if (fileOutput) { - env.readCsvFile[(Long, Long)]( - edgesPath, - fieldDelimiter = ' ', - includedFields = Array(0, 1)) - .map { x => (x._1, x._2)} - } - else { - val edgeData = ConnectedComponentsData.EDGES map { - case Array(x, y) => (x.asInstanceOf[Long], y.asInstanceOf[Long]) - } - env.fromCollection(edgeData) - } - } -} \ No newline at end of file + def main (args: Array[String]): Unit = { + if (!parseParameters(args)) { + return + } + + val env = ExecutionEnvironment.getExecutionEnvironment + + val edges = getEdgesDataSet(env) + + val paths = edges.iterateWithTermination(maxIterations) { prevPaths: DataSet[(Long, Long)] => + + val nextPaths = prevPaths + .join(edges) + .where(1).equalTo(0) { + (left, right) => (left._1,right._2) + } + .union(prevPaths) + .groupBy(0, 1) + .reduce((l, r) => l) + + val terminate = prevPaths + .coGroup(nextPaths) + .where(0).equalTo(0) { + (prev, next, out: Collector[(Long, Long)]) => { + val prevPaths = prev.toSet + for (n <- next) + if (!prevPaths.contains(n)) out.collect(n) + } + } + (nextPaths, terminate) + } + + if (fileOutput) { + paths.writeAsCsv(outputPath, "\n", " ") + } else { + paths.print() + } + + env.execute("Scala Transitive Closure Example") + + + } + + + private var fileOutput: Boolean = false + private var edgesPath: String = null + private var outputPath: String = null + private var maxIterations: Int = 10 + + private def parseParameters(programArguments: Array[String]): Boolean = { + if (programArguments.length > 0) { + fileOutput = true + if (programArguments.length == 3) { + edgesPath = programArguments(0) + outputPath = programArguments(1) + maxIterations = Integer.parseInt(programArguments(2)) + } + else { + System.err.println("Usage: TransitiveClosure ") + return false + } + } + else { + System.out.println("Executing TransitiveClosure example with default parameters and " + + "built-in default data.") + System.out.println(" Provide parameters to read input data from files.") + System.out.println(" See the documentation for the correct format of input files.") + System.out.println(" Usage: TransitiveClosure ") + } + true + } + + private def getEdgesDataSet(env: ExecutionEnvironment): DataSet[(Long, Long)] = { + if (fileOutput) { + env.readCsvFile[(Long, Long)]( + edgesPath, + fieldDelimiter = ' ', + includedFields = Array(0, 1)) + .map { x => (x._1, x._2)} + } + else { + val edgeData = ConnectedComponentsData.EDGES map { + case Array(x, y) => (x.asInstanceOf[Long], y.asInstanceOf[Long]) + } + env.fromCollection(edgeData) + } + } +} diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/misc/PiEstimation.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/misc/PiEstimation.scala index bb66b1013c0c3b71bf43644e64243c5f6548f5c6..d08e6e2059d5be413c4d6b4ff73954c9a4a57b70 100644 --- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/misc/PiEstimation.scala +++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/misc/PiEstimation.scala @@ -37,7 +37,7 @@ object PiEstimation { val y = Math.random() if (x * x + y * y < 1) 1L else 0L } - .reduce(_+_) + .reduce(_ + _) // ratio of samples in upper right quadrant vs total samples gives surface of upper // right quadrant, times 4 gives surface of whole unit circle, i.e. PI diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/ml/LinearRegression.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/ml/LinearRegression.scala index 95dcb9aedcf39d401fcdba1da8b3203e58696f89..eed11a5a875d76356f497120ee0b35a5875a963d 100644 --- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/ml/LinearRegression.scala +++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/ml/LinearRegression.scala @@ -62,101 +62,101 @@ import scala.collection.JavaConverters._ */ object LinearRegression { - def main(args: Array[String]) { - if (!parseParameters(args)) { - return - } - - val env = ExecutionEnvironment.getExecutionEnvironment - val data = getDataSet(env) - val parameters = getParamsDataSet(env) - - val result = parameters.iterate(numIterations) { currentParameters => - val newParameters = data - .map(new SubUpdate).withBroadcastSet(currentParameters, "parameters") - .reduce { (p1, p2) => + def main(args: Array[String]) { + if (!parseParameters(args)) { + return + } + + val env = ExecutionEnvironment.getExecutionEnvironment + val data = getDataSet(env) + val parameters = getParamsDataSet(env) + + val result = parameters.iterate(numIterations) { currentParameters => + val newParameters = data + .map(new SubUpdate).withBroadcastSet(currentParameters, "parameters") + .reduce { (p1, p2) => val result = p1._1 + p2._1 - (result, p1._2 + p2._2) - } - .map { x => x._1.div(x._2) } - newParameters - } - - if (fileOutput) { - result.writeAsText(outputPath) - } - else { - result.print() - } - env.execute("Scala Linear Regression example") - } - - /** - * A simple data sample, x means the input, and y means the target. - */ + (result, p1._2 + p2._2) + } + .map { x => x._1.div(x._2) } + newParameters + } + + if (fileOutput) { + result.writeAsText(outputPath) + } + else { + result.print() + } + env.execute("Scala Linear Regression example") + } + + /** + * A simple data sample, x means the input, and y means the target. + */ case class Data(var x: Double, var y: Double) - /** - * A set of parameters -- theta0, theta1. - */ + /** + * A set of parameters -- theta0, theta1. + */ case class Params(theta0: Double, theta1: Double) { def div(a: Int): Params = { Params(theta0 / a, theta1 / a) } - def +(other: Params) = { + def + (other: Params) = { Params(theta0 + other.theta0, theta1 + other.theta1) } } - // ************************************************************************* - // USER FUNCTIONS - // ************************************************************************* + // ************************************************************************* + // USER FUNCTIONS + // ************************************************************************* - /** - * Compute a single BGD type update for every parameters. - */ - class SubUpdate extends RichMapFunction[Data, (Params, Int)] { + /** + * Compute a single BGD type update for every parameters. + */ + class SubUpdate extends RichMapFunction[Data, (Params, Int)] { - private var parameter: Params = null + private var parameter: Params = null - /** Reads the parameters from a broadcast variable into a collection. */ - override def open(parameters: Configuration) { - val parameters = getRuntimeContext.getBroadcastVariable[Params]("parameters").asScala + /** Reads the parameters from a broadcast variable into a collection. */ + override def open(parameters: Configuration) { + val parameters = getRuntimeContext.getBroadcastVariable[Params]("parameters").asScala parameter = parameters.head - } + } - def map(in: Data): (Params, Int) = { - val theta0 = + def map(in: Data): (Params, Int) = { + val theta0 = parameter.theta0 - 0.01 * ((parameter.theta0 + (parameter.theta1 * in.x)) - in.y) - val theta1 = + val theta1 = parameter.theta1 - 0.01 * (((parameter.theta0 + (parameter.theta1 * in.x)) - in.y) * in.x) - (Params(theta0, theta1), 1) - } - } - - // ************************************************************************* - // UTIL METHODS - // ************************************************************************* - private var fileOutput: Boolean = false - private var dataPath: String = null - private var outputPath: String = null - private var numIterations: Int = 10 - - private def parseParameters(programArguments: Array[String]): Boolean = { - if (programArguments.length > 0) { - fileOutput = true - if (programArguments.length == 3) { - dataPath = programArguments(0) - outputPath = programArguments(1) - numIterations = programArguments(2).toInt - } - else { - System.err.println("Usage: LinearRegression ") - false - } - } - else { + (Params(theta0, theta1), 1) + } + } + + // ************************************************************************* + // UTIL METHODS + // ************************************************************************* + private var fileOutput: Boolean = false + private var dataPath: String = null + private var outputPath: String = null + private var numIterations: Int = 10 + + private def parseParameters(programArguments: Array[String]): Boolean = { + if (programArguments.length > 0) { + fileOutput = true + if (programArguments.length == 3) { + dataPath = programArguments(0) + outputPath = programArguments(1) + numIterations = programArguments(2).toInt + } + else { + System.err.println("Usage: LinearRegression ") + false + } + } + else { System.out.println("Executing Linear Regression example with default parameters and " + "built-in default data.") System.out.println(" Provide parameters to read input data from files.") @@ -164,30 +164,30 @@ object LinearRegression { System.out.println(" We provide a data generator to create synthetic input files for this " + "program.") System.out.println(" Usage: LinearRegression ") - } - true - } - - private def getDataSet(env: ExecutionEnvironment): DataSet[Data] = { - if (fileOutput) { - env.readCsvFile[(Double, Double)]( - dataPath, - fieldDelimiter = ' ', - includedFields = Array(0, 1)) - .map { t => new Data(t._1, t._2) } - } - else { - val data = LinearRegressionData.DATA map { - case Array(x, y) => Data(x.asInstanceOf[Double], y.asInstanceOf[Double]) - } - env.fromCollection(data) - } - } - - private def getParamsDataSet(env: ExecutionEnvironment): DataSet[Params] = { - val params = LinearRegressionData.PARAMS map { - case Array(x, y) => Params(x.asInstanceOf[Double], y.asInstanceOf[Double]) - } - env.fromCollection(params) - } + } + true + } + + private def getDataSet(env: ExecutionEnvironment): DataSet[Data] = { + if (fileOutput) { + env.readCsvFile[(Double, Double)]( + dataPath, + fieldDelimiter = ' ', + includedFields = Array(0, 1)) + .map { t => new Data(t._1, t._2) } + } + else { + val data = LinearRegressionData.DATA map { + case Array(x, y) => Data(x.asInstanceOf[Double], y.asInstanceOf[Double]) + } + env.fromCollection(data) + } + } + + private def getParamsDataSet(env: ExecutionEnvironment): DataSet[Params] = { + val params = LinearRegressionData.PARAMS map { + case Array(x, y) => Params(x.asInstanceOf[Double], y.asInstanceOf[Double]) + } + env.fromCollection(params) + } } diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala index e9730934cbbc01275bdc3527002f5fb2c44f8830..0d0519c291b81cc58d822f258917522de84bb651 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -15,7 +15,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.flink.api.scala import org.apache.commons.lang3.Validate @@ -610,9 +609,9 @@ class DataSet[T: ClassTag](private[flink] val set: JavaDataSet[T]) { new Keys.FieldPositionKeys[T](fieldIndices, set.getType,false)) } - // public UnsortedGrouping groupBy(String... fields) { - // new UnsortedGrouping(this, new Keys.ExpressionKeys(fields, getType())); - // } + // public UnsortedGrouping groupBy(String... fields) { + // new UnsortedGrouping(this, new Keys.ExpressionKeys(fields, getType())); + // } // -------------------------------------------------------------------------------------------- // Joining @@ -807,7 +806,7 @@ class DataSet[T: ClassTag](private[flink] val set: JavaDataSet[T]) { /** * Creates a new DataSet by performing delta (or workset) iterations using the given step - * function. At the beginning `this` DataSet is the solution set and `workset` is the Workset. + * function. At the beginning `this` DataSet is the solution set and `workset` is the Workset. * The iteration step function gets the current solution set and workset and must output the * delta for the solution set and the workset for the next iteration. * @@ -825,6 +824,28 @@ class DataSet[T: ClassTag](private[flink] val set: JavaDataSet[T]) { wrap(result) } + /** + * Creates a new DataSet by performing delta (or workset) iterations using the given step + * function. At the beginning `this` DataSet is the solution set and `workset` is the Workset. + * The iteration step function gets the current solution set and workset and must output the + * delta for the solution set and the workset for the next iteration. + * + * Note: The syntax of delta iterations are very likely going to change soon. + */ + def iterateDelta[R: ClassTag](workset: DataSet[R], maxIterations: Int, keyFields: Array[String])( + stepFunction: (DataSet[T], DataSet[R]) => (DataSet[T], DataSet[R])) = { + val fieldIndices = fieldNames2Indices(set.getType, keyFields) + + val key = new FieldPositionKeys[T](fieldIndices, set.getType, false) + val iterativeSet = new DeltaIteration[T, R]( + set.getExecutionEnvironment, set.getType, set, workset.set, key, maxIterations) + val (newSolution, newWorkset) = stepFunction( + wrap(iterativeSet.getSolutionSet), + wrap(iterativeSet.getWorkset)) + val result = iterativeSet.closeWith(newSolution.set, newWorkset.set) + wrap(result) + } + // ------------------------------------------------------------------------------------------- // Custom Operators // ------------------------------------------------------------------------------------------- @@ -919,4 +940,4 @@ class DataSet[T: ClassTag](private[flink] val set: JavaDataSet[T]) { def printToErr(): DataSink[T] = { output(new PrintingOutputFormat[T](true)) } -} \ No newline at end of file +} diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala index 6f44e684bdbe46073550eecdeb2ac785a43b19a9..30b4525613a219156b60688067695b5b62af34ff 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala @@ -22,7 +22,7 @@ import java.util.UUID import org.apache.commons.lang3.Validate import org.apache.flink.api.common.JobExecutionResult import org.apache.flink.api.java.io._ -import org.apache.flink.api.java.typeutils.{TupleTypeInfoBase, BasicTypeInfo} +import org.apache.flink.api.java.typeutils.{ValueTypeInfo, TupleTypeInfoBase, BasicTypeInfo} import org.apache.flink.api.scala.operators.ScalaCsvInputFormat import org.apache.flink.core.fs.Path @@ -30,7 +30,7 @@ import org.apache.flink.api.java.{ExecutionEnvironment => JavaEnv} import org.apache.flink.api.common.io.{InputFormat, FileInputFormat} import org.apache.flink.api.java.operators.DataSource -import org.apache.flink.types.TypeInformation +import org.apache.flink.types.{StringValue, TypeInformation} import org.apache.flink.util.{NumberSequenceIterator, SplittableIterator} import scala.collection.JavaConverters._ @@ -104,6 +104,27 @@ class ExecutionEnvironment(javaEnv: JavaEnv) { wrap(source) } + /** + * Creates a DataSet of Strings produced by reading the given file line wise. + * This method is similar to [[readTextFile]], but it produces a DataSet with mutable + * [[StringValue]] objects, rather than Java Strings. StringValues can be used to tune + * implementations to be less object and garbage collection heavy. + * + * @param filePath The path of the file, as a URI (e.g., "file:///some/local/file" or + * "hdfs://host:port/file/path"). + * @param charsetName The name of the character set used to read the file. Default is UTF-0 + */ + def readTextFileWithValue( + filePath: String, + charsetName: String = "UTF-8"): DataSet[StringValue] = { + Validate.notNull(filePath, "The file path may not be null.") + val format = new TextValueInputFormat(new Path(filePath)) + format.setCharsetName(charsetName) + val source = new DataSource[StringValue]( + javaEnv, format, new ValueTypeInfo[StringValue](classOf[StringValue])) + wrap(source) + } + /** * Creates a DataSet by reading the given CSV file. The type parameter must be used to specify * a Tuple type that has the same number of fields as there are fields in the CSV file. If the @@ -337,8 +358,9 @@ class ExecutionEnvironment(javaEnv: JavaEnv) { def createProgramPlan(jobName: String = "") = { if (jobName.isEmpty) { javaEnv.createProgramPlan() - } else + } else { javaEnv.createProgramPlan(jobName) + } } } @@ -360,7 +382,8 @@ object ExecutionEnvironment { * of parallelism of the local environment is the number of hardware contexts (CPU cores/threads). */ def createLocalEnvironment( - degreeOfParallelism: Int = Runtime.getRuntime.availableProcessors()) : ExecutionEnvironment = { + degreeOfParallelism: Int = Runtime.getRuntime.availableProcessors()) + : ExecutionEnvironment = { val javaEnv = JavaEnv.createLocalEnvironment() javaEnv.setDegreeOfParallelism(degreeOfParallelism) new ExecutionEnvironment(javaEnv) diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala index 802fd09ff6910fe5f96d32d810bc7fd509b66fdc..3c5bf9e322323fb7750069135e66b35aa2d6611e 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala @@ -267,7 +267,7 @@ private[flink] class GroupedDataSetImpl[T: ClassTag]( } def reduceGroup[R: TypeInformation: ClassTag]( - fun: (TraversableOnce[T], Collector[R]) => Unit): DataSet[R] = { + fun: (TraversableOnce[T], Collector[R]) => Unit): DataSet[R] = { Validate.notNull(fun, "Group reduce function must not be null.") val reducer = new GroupReduceFunction[T, R] { def reduce(in: java.lang.Iterable[T], out: Collector[R]) { diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/coGroupDataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/coGroupDataSet.scala index b0661a1c1198d44fdcef43309dbfebff58d05c2b..582edac0ef23d03b84b93c4164307fe5b2d4f379 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/coGroupDataSet.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/coGroupDataSet.scala @@ -33,8 +33,8 @@ import scala.reflect.ClassTag /** - * A specific [[DataSet]] that results from a `coGroup` operation. The result of a default coGroup is - * a tuple containing two arrays of values from the two sides of the coGroup. The result of the + * A specific [[DataSet]] that results from a `coGroup` operation. The result of a default coGroup + * is a tuple containing two arrays of values from the two sides of the coGroup. The result of the * coGroup can be changed by specifying a custom coGroup function using the `apply` method or by * providing a [[RichCoGroupFunction]]. * diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/Counter.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/Counter.scala index d5381887cb91f70ffa93e39e89a6e1cef2c52dd7..caa43241942c0903aa280f4348a941aeb7de8ea2 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/Counter.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/Counter.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -15,8 +15,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - - package org.apache.flink.api.scala.codegen private[flink] class Counter { @@ -29,4 +27,4 @@ private[flink] class Counter { current } } -} \ No newline at end of file +} diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/MacroContextHolder.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/MacroContextHolder.scala index 4ce4922b0331f6b0f5c89b3e7fc73600be7804f5..9f176914e91e89cdf198f2a77959e370a3c3e579 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/MacroContextHolder.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/MacroContextHolder.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -24,8 +24,8 @@ private[flink] class MacroContextHolder[C <: Context](val c: C) private[flink] object MacroContextHolder { def newMacroHelper[C <: Context](c: C) = new MacroContextHolder[c.type](c) - with TypeDescriptors[c.type] - with TypeAnalyzer[c.type] - with TreeGen[c.type] + with TypeDescriptors[c.type] + with TypeAnalyzer[c.type] + with TreeGen[c.type] with TypeInformationGen[c.type] } diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TreeGen.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TreeGen.scala index 89454d5af94603636e2c88e4b3faffbd556dbfc7..7d04b1ba96f88d7552ab969dfb08df6407bbc538 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TreeGen.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TreeGen.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -15,8 +15,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - - package org.apache.flink.api.scala.codegen import scala.language.implicitConversions @@ -50,10 +48,11 @@ private[flink] trait TreeGen[C <: Context] { this: MacroContextHolder[C] with Ty reify(c.Expr(source).splice.asInstanceOf[T]).tree def maybeMkAsInstanceOf[S: c.WeakTypeTag, T: c.WeakTypeTag](source: Tree): Tree = { - if (weakTypeOf[S] <:< weakTypeOf[T]) + if (weakTypeOf[S] <:< weakTypeOf[T]) { source - else + } else { mkAsInstanceOf[T](source) + } } // def mkIdent(target: Symbol): Tree = Ident(target) setType target.tpe diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeAnalyzer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeAnalyzer.scala index 7b1675d852b4a53867066db0c7e287a8d0cbeb14..6bf827e207e27593b2e1a19006f47fb482261f07 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeAnalyzer.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeAnalyzer.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -15,8 +15,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - - package org.apache.flink.api.scala.codegen import scala.Option.option2Iterable @@ -107,10 +105,11 @@ private[flink] trait TypeAnalyzer[C <: Context] { this: MacroContextHolder[C] appliedType(d.asType.toType, dArgs) } - if (dTpe <:< tpe) + if (dTpe <:< tpe) { Some(analyze(dTpe)) - else + } else { None + } } val errors = subTypes flatMap { _.findByType[UnsupportedDescriptor] } @@ -150,7 +149,11 @@ private[flink] trait TypeAnalyzer[C <: Context] { this: MacroContextHolder[C] case true => Some( FieldAccessor( - bGetter, bSetter, bTpe, isBaseField = true, analyze(bTpe.termSymbol.asMethod.returnType))) + bGetter, + bSetter, + bTpe, + isBaseField = true, + analyze(bTpe.termSymbol.asMethod.returnType))) case false => None } } @@ -167,7 +170,9 @@ private[flink] trait TypeAnalyzer[C <: Context] { this: MacroContextHolder[C] desc match { case desc @ BaseClassDescriptor(_, _, getters, baseSubTypes) => - desc.copy(getters = getters map updateField, subTypes = baseSubTypes map wireBaseFields) + desc.copy( + getters = getters map updateField, + subTypes = baseSubTypes map wireBaseFields) case desc @ CaseClassDescriptor(_, _, _, _, getters) => desc.copy(getters = getters map updateField) case _ => desc @@ -221,7 +226,7 @@ private[flink] trait TypeAnalyzer[C <: Context] { this: MacroContextHolder[C] case errs @ _ :: _ => val msgs = errs flatMap { f => (f: @unchecked) match { - case FieldAccessor(fgetter, _, _, _, UnsupportedDescriptor(_, fTpe, errors)) => + case FieldAccessor(fgetter, _,_,_, UnsupportedDescriptor(_, fTpe, errors)) => errors map { err => "Field " + fgetter.name + ": " + fTpe + " - " + err } } } diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeDescriptors.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeDescriptors.scala index bbb9e735c86fc93f6c3a7f78e6bdd44863ccee13..34a7a97c3ccb5bffce39be16311e9b528f5db2a4 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeDescriptors.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeDescriptors.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -15,8 +15,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - - package org.apache.flink.api.scala.codegen import scala.language.postfixOps @@ -122,7 +120,8 @@ private[flink] trait TypeDescriptors[C <: Context] { this: MacroContextHolder[C] id: Int, tpe: Type, override val getters: Seq[FieldAccessor], subTypes: Seq[UDTDescriptor]) extends UDTDescriptor { - override def flatten = this +: ((getters flatMap { _.desc.flatten }) ++ (subTypes flatMap { _.flatten })) + override def flatten = + this +: ((getters flatMap { _.desc.flatten }) ++ (subTypes flatMap { _.flatten })) override def canBeKey = flatten forall { f => f.canBeKey } override def select(path: List[String]): Seq[Option[UDTDescriptor]] = path match { @@ -151,7 +150,8 @@ private[flink] trait TypeDescriptors[C <: Context] { this: MacroContextHolder[C] override def hashCode = (id, tpe, ctor, getters).hashCode override def equals(that: Any) = that match { case CaseClassDescriptor(thatId, thatTpe, thatMutable, thatCtor, thatGetters) => - (id, tpe, mutable, ctor, getters).equals(thatId, thatTpe, thatMutable, thatCtor, thatGetters) + (id, tpe, mutable, ctor, getters).equals( + thatId, thatTpe, thatMutable, thatCtor, thatGetters) case _ => false } @@ -164,7 +164,12 @@ private[flink] trait TypeDescriptors[C <: Context] { this: MacroContextHolder[C] } } - case class FieldAccessor(getter: Symbol, setter: Symbol, tpe: Type, isBaseField: Boolean, desc: UDTDescriptor) + case class FieldAccessor( + getter: Symbol, + setter: Symbol, + tpe: Type, + isBaseField: Boolean, + desc: UDTDescriptor) case class RecursiveDescriptor(id: Int, tpe: Type, refId: Int) extends UDTDescriptor { override def flatten = Seq(this) diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala index 6ad1f742e6c2a265bfcc054105f7c4afb36ff6cb..235caa729ffc3ff4b246c85c51527e0c3d4e4ad1 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -15,8 +15,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - - package org.apache.flink.api.scala.codegen import org.apache.flink.api.common.typeutils.TypeSerializer @@ -116,14 +114,16 @@ private[flink] trait TypeInformationGen[C <: Context] { } } - def mkValueTypeInfo[T <: Value : c.WeakTypeTag](desc: UDTDescriptor): c.Expr[TypeInformation[T]] = { + def mkValueTypeInfo[T <: Value : c.WeakTypeTag]( + desc: UDTDescriptor): c.Expr[TypeInformation[T]] = { val tpeClazz = c.Expr[Class[T]](Literal(Constant(desc.tpe))) reify { new ValueTypeInfo[T](tpeClazz.splice) } } - def mkWritableTypeInfo[T <: Writable : c.WeakTypeTag](desc: UDTDescriptor): c.Expr[TypeInformation[T]] = { + def mkWritableTypeInfo[T <: Writable : c.WeakTypeTag]( + desc: UDTDescriptor): c.Expr[TypeInformation[T]] = { val tpeClazz = c.Expr[Class[T]](Literal(Constant(desc.tpe))) reify { new WritableTypeInfo[T](tpeClazz.splice) @@ -153,7 +153,8 @@ private[flink] trait TypeInformationGen[C <: Context] { c.Expr[T](result) } -// def mkCaseClassTypeInfo[T: c.WeakTypeTag](desc: CaseClassDescriptor): c.Expr[TypeInformation[T]] = { +// def mkCaseClassTypeInfo[T: c.WeakTypeTag]( +// desc: CaseClassDescriptor): c.Expr[TypeInformation[T]] = { // val tpeClazz = c.Expr[Class[_]](Literal(Constant(desc.tpe))) // val caseFields = mkCaseFields(desc) // reify { @@ -178,10 +179,12 @@ private[flink] trait TypeInformationGen[C <: Context] { // c.Expr(mkMap(fields)) // } // -// protected def getFields(name: String, desc: UDTDescriptor): Seq[(String, UDTDescriptor)] = desc match { +// protected def getFields(name: String, desc: UDTDescriptor): Seq[(String, UDTDescriptor)] = +// desc match { // // Flatten product types // case CaseClassDescriptor(_, _, _, _, getters) => -// getters filterNot { _.isBaseField } flatMap { f => getFields(name + "." + f.getter.name, f.desc) } +// getters filterNot { _.isBaseField } flatMap { +// f => getFields(name + "." + f.getter.name, f.desc) } // case _ => Seq((name, desc)) // } -} \ No newline at end of file +} diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/crossDataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/crossDataSet.scala index 2db2ff60302b24b70a610faa532ead13dc38df55..df345876688337d41fb12d11af9b9467a4a11686 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/crossDataSet.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/crossDataSet.scala @@ -129,4 +129,4 @@ private[flink] object CrossDataSetImpl { new CrossDataSetImpl(crossOperator, leftSet, rightSet) } -} \ No newline at end of file +} diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/joinDataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/joinDataSet.scala index c3f4dda51b3d5f1eaf522a5b033dfbc7aacd4ef3..a607301d846fbc9dc8d1d575d529bd41ee300360 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/joinDataSet.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/joinDataSet.scala @@ -228,4 +228,4 @@ private[flink] class UnfinishedJoinOperationImpl[T, O]( new JoinDataSetImpl(joinOperator, leftSet.set, rightSet.set, leftKey, rightKey) } -} \ No newline at end of file +} diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala index c63c9918f55bd797b951e5ea72b7f10673eb9909..73dd721c8a746f79de11ef0ebb888d6126de2acd 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -44,4 +44,4 @@ package object scala { "supported on Case Classes (for now).") } } -} \ No newline at end of file +} diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaTupleComparator.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaTupleComparator.scala index 3d21f05741d8547c6344caaafc8ae5b84590083f..1c8f8df422c4f3fe6b7c69bc66d0021a6b7e07f2 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaTupleComparator.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaTupleComparator.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -15,7 +15,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.flink.api.scala.typeutils import org.apache.flink.api.common.typeutils.{TypeComparator, TypeSerializer} diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaTupleSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaTupleSerializer.scala index a63bb3556eabc02dc4c36b81b8e1f25f21b3636e..90d3b5bf97f3eff2060d19073af8ea9496982d82 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaTupleSerializer.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaTupleSerializer.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -15,7 +15,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.flink.api.scala.typeutils import org.apache.flink.api.common.typeutils.TypeSerializer diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaTupleTypeInfo.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaTupleTypeInfo.scala index c191e8158aa801db7df464a631520c22be118ec8..ad407cbe072527a64661f3f0dc889edc772733de 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaTupleTypeInfo.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaTupleTypeInfo.scala @@ -15,7 +15,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.flink.api.scala.typeutils import org.apache.flink.api.java.typeutils.{AtomicType, TupleTypeInfoBase} @@ -79,8 +78,8 @@ abstract class ScalaTupleTypeInfo[T <: Product]( def getFieldIndices(fields: Array[String]): Array[Int] = { val result = fields map { x => fieldNames.indexOf(x) } if (result.contains(-1)) { - throw new IllegalArgumentException("Fields '" + fields.mkString(", ") + "' are not valid for" + - " " + tupleClass + " with fields '" + fieldNames.mkString(", ") + "'.") + throw new IllegalArgumentException("Fields '" + fields.mkString(", ") + + "' are not valid for " + tupleClass + " with fields '" + fieldNames.mkString(", ") + "'.") } result } diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TypeUtils.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TypeUtils.scala index 5901e480282495c421d8963b52d0cec3d57041b9..8d8f7228555b8093949ea0248ba98dbad80db4dd 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TypeUtils.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TypeUtils.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -15,7 +15,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.flink.api.scala.typeutils import scala.reflect.macros.Context diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/unfinishedKeyPairOperation.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/unfinishedKeyPairOperation.scala index 198388e357e9080e901d924b43600b0f7e76f8d0..85414816573cddef9bf4cf4211b4e40b0761b1e9 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/unfinishedKeyPairOperation.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/unfinishedKeyPairOperation.scala @@ -71,7 +71,9 @@ private[flink] abstract class UnfinishedKeyPairOperation[T, O, R]( * This only works on a CaseClass [[DataSet]]. */ def where(firstLeftField: String, otherLeftFields: String*) = { - val fieldIndices = fieldNames2Indices(leftSet.set.getType, firstLeftField +: otherLeftFields.toArray) + val fieldIndices = fieldNames2Indices( + leftSet.set.getType, + firstLeftField +: otherLeftFields.toArray) val leftKey = new FieldPositionKeys[T](fieldIndices, leftSet.set.getType) new HalfUnfinishedKeyPairOperation[T, O, R](this, leftKey) diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/ScalaAPICompletenessTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/ScalaAPICompletenessTest.scala index 5bfa9b40e1fd94b5b45b17df7c05dd56b0c4f749..5c4606f4027dcd62115c9fc25c75496ae8790e9a 100644 --- a/flink-scala/src/test/scala/org/apache/flink/api/scala/ScalaAPICompletenessTest.scala +++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/ScalaAPICompletenessTest.scala @@ -1,12 +1,13 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -14,7 +15,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.flink.api.scala import java.lang.reflect.Method @@ -149,7 +149,8 @@ class ScalaAPICompletenessTest { checkMethods("SingleInputOperator", "DataSet", classOf[SingleInputOperator[_, _, _]], classOf[DataSet[_]]) - checkMethods("TwoInputOperator", "DataSet", classOf[TwoInputOperator[_, _, _, _]], classOf[DataSet[_]]) + checkMethods("TwoInputOperator", "DataSet", + classOf[TwoInputOperator[_, _, _, _]], classOf[DataSet[_]]) checkMethods("SingleInputUdfOperator", "DataSet", classOf[SingleInputUdfOperator[_, _, _]], classOf[DataSet[_]]) diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/functions/SemanticPropertiesTranslationTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/functions/SemanticPropertiesTranslationTest.scala index 0769891d3f09a84db51fe7c0f6b216369d959c88..29235d695cef2ef36ed13ab731ad01b466e7adb4 100644 --- a/flink-scala/src/test/scala/org/apache/flink/api/scala/functions/SemanticPropertiesTranslationTest.scala +++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/functions/SemanticPropertiesTranslationTest.scala @@ -46,7 +46,7 @@ class SemanticPropertiesTranslationTest { try { val env = ExecutionEnvironment.getExecutionEnvironment - val input = env.fromElements((3l, "test", 42)) + val input = env.fromElements((3L, "test", 42)) input.map(new WildcardConstantMapper[(Long, String, Int)]).print() val plan = env.createProgramPlan() @@ -83,7 +83,7 @@ class SemanticPropertiesTranslationTest { try { val env = ExecutionEnvironment.getExecutionEnvironment - val input = env.fromElements((3l, "test", 42)) + val input = env.fromElements((3L, "test", 42)) input.map(new IndividualConstantMapper[Long, String, Int]).print() val plan = env.createProgramPlan() @@ -120,8 +120,8 @@ class SemanticPropertiesTranslationTest { try { val env = ExecutionEnvironment.getExecutionEnvironment - val input1 = env.fromElements((3l, "test")) - val input2 = env.fromElements((3l, 3.1415)) + val input1 = env.fromElements((3L, "test")) + val input2 = env.fromElements((3L, 3.1415)) input1.join(input2).where(0).equalTo(0)( new ForwardingTupleJoin[Long, String, Long, Double]).print() diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/translation/DeltaIterationTranslationTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/translation/DeltaIterationTranslationTest.scala index 9b953ee3b96ea599772f1d857aed504972ba2472..c458e5692e3861c52b2908d536a5db162aa5ac53 100644 --- a/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/translation/DeltaIterationTranslationTest.scala +++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/translation/DeltaIterationTranslationTest.scala @@ -100,7 +100,9 @@ class DeltaIterationTranslationTest { assertEquals(classOf[IdentityMapper[_]], worksetMapper.getUserCodeWrapper.getUserCodeClass) - assertEquals(classOf[NextWorksetMapper], nextWorksetMapper.getUserCodeWrapper.getUserCodeClass) + assertEquals( + classOf[NextWorksetMapper], + nextWorksetMapper.getUserCodeWrapper.getUserCodeClass) if (solutionSetJoin.getUserCodeWrapper.getUserCodeObject.isInstanceOf[WrappingFunction[_]]) { @@ -203,7 +205,8 @@ class DeltaIterationTranslationTest { // val iteration: DeltaIteration[Tuple3[Double, Long, String], Tuple2[Double, // String]] = initialSolutionSet.iterateDelta(initialWorkSet, 10, 1) // try { -// iteration.getWorkset.coGroup(iteration.getSolutionSet).where(1).equalTo(2).`with`(new DeltaIterationTranslationTest.SolutionWorksetCoGroup1) +// iteration.getWorkset.coGroup(iteration.getSolutionSet).where(1).equalTo(2).`with`( +// new DeltaIterationTranslationTest.SolutionWorksetCoGroup1) // fail("Accepted invalid program.") // } // catch { @@ -211,7 +214,8 @@ class DeltaIterationTranslationTest { // } // } // try { -// iteration.getSolutionSet.coGroup(iteration.getWorkset).where(2).equalTo(1).`with`(new DeltaIterationTranslationTest.SolutionWorksetCoGroup2) +// iteration.getSolutionSet.coGroup(iteration.getWorkset).where(2).equalTo(1).`with`( +// new DeltaIterationTranslationTest.SolutionWorksetCoGroup2) // fail("Accepted invalid program.") // } // catch { diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/translation/ReduceTranslationTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/translation/ReduceTranslationTest.scala index 36aa4f21afdaf6cca7e2a98720c78157f65663af..e0a9c89116e191f43542280489647e6d0d086373 100644 --- a/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/translation/ReduceTranslationTest.scala +++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/translation/ReduceTranslationTest.scala @@ -121,7 +121,9 @@ class ReduceTranslationTest { assertEquals(keyValueInfo, reducer.getOperatorInfo.getOutputType) assertEquals(keyValueInfo, keyProjector.getOperatorInfo.getInputType) assertEquals(initialData.set.getType, keyProjector.getOperatorInfo.getOutputType) - assertEquals(classOf[KeyExtractingMapper[_, _]], keyExtractor.getUserCodeWrapper.getUserCodeClass) + assertEquals( + classOf[KeyExtractingMapper[_, _]], + keyExtractor.getUserCodeWrapper.getUserCodeClass) assertTrue(keyExtractor.getInput.isInstanceOf[GenericDataSourceBase[_, _]]) } catch { diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerTest.scala index ae6f15d41e286a666157879c7b12a21e29c83d3b..84486e6abc0b57df3821329f6cd77c196a85c1b7 100644 --- a/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerTest.scala +++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerTest.scala @@ -34,8 +34,8 @@ class TupleSerializerTest { @Test def testTuple1Int(): Unit = { - val testTuples = - Array(Tuple1(42), Tuple1(1), Tuple1(0), Tuple1(-1), Tuple1(Int.MaxValue), Tuple1(Int.MinValue)) + val testTuples = Array(Tuple1(42), Tuple1(1), Tuple1(0), Tuple1(-1), Tuple1(Int.MaxValue), + Tuple1(Int.MinValue)) runTests(testTuples) } diff --git a/pom.xml b/pom.xml index 6c424572c89850a3815d16e17f572dbe85a0c4d3..a339702cea6007839ef74e52bde6ee9b114dc068 100644 --- a/pom.xml +++ b/pom.xml @@ -522,8 +522,10 @@ under the License. **/*.creole CONTRIBUTORS DEPENDENCIES - + tools/maven/checkstyle.xml + tools/maven/scalastyle-config.xml + **/scalastyle-output.xml tools/maven/suppressions.xml **/pom.xml **/pom.hadoop2.xml @@ -556,6 +558,29 @@ under the License. true + + org.scalastyle + scalastyle-maven-plugin + 0.5.0 + + false + true + true + false + ${basedir}/src/main/scala + ${basedir}/src/test/scala + tools/maven/scalastyle-config.xml + ${project.basedir}/scalastyle-output.xml + UTF-8 + + + + + check + + + + org.apache.maven.plugins diff --git a/tools/maven/scalastyle-config.xml b/tools/maven/scalastyle-config.xml new file mode 100644 index 0000000000000000000000000000000000000000..60d741855e9e1b0fb77bf779965f695229799f6b --- /dev/null +++ b/tools/maven/scalastyle-config.xml @@ -0,0 +1,146 @@ + + + + + + + + + + + + Scalastyle standard configuration + + + + + + + + + + + + + + + + + + + true + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +