diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/TransitiveClosureNaive.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/TransitiveClosureNaive.java index 30230d646491c9c12527ab40040853f6a16a8464..9754a346cf537fbc157e2b93bc210649af8d6b60 100644 --- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/TransitiveClosureNaive.java +++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/TransitiveClosureNaive.java @@ -94,7 +94,7 @@ public class TransitiveClosureNaive implements ProgramDescription { } }); - DataSet> transitiveClosure = paths.closeWith(nextPaths); + DataSet> transitiveClosure = paths.closeWith(nextPaths, newPaths); // emit result diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/util/ConnectedComponentsData.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/util/ConnectedComponentsData.java index 27c7d45788dd06c40a4eeabc2fec4c34335620f4..59df7fe0d60237e2c6b872975d3693dd1b1618cd 100644 --- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/util/ConnectedComponentsData.java +++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/util/ConnectedComponentsData.java @@ -33,17 +33,13 @@ import org.apache.flink.api.java.ExecutionEnvironment; */ public class ConnectedComponentsData { - public static final Object[][] VERTICES = new Object[][] { - new Object[]{1L}, new Object[]{2L}, new Object[]{3L}, new Object[]{4L}, - new Object[]{5L},new Object[]{6L}, new Object[]{7L}, new Object[]{8L}, - new Object[]{9L}, new Object[]{10L}, new Object[]{11L}, new Object[]{12L}, - new Object[]{13L}, new Object[]{14L}, new Object[]{15L}, new Object[]{16L} - }; + public static final long[] VERTICES = new long[] { + 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}; public static DataSet getDefaultVertexDataSet(ExecutionEnvironment env) { List verticesList = new LinkedList(); - for (Object[] vertex : VERTICES) { - verticesList.add((Long) vertex[0]); + for (long vertexId : VERTICES) { + verticesList.add(vertexId); } return env.fromCollection(verticesList); } diff --git a/flink-examples/flink-scala-examples/pom.xml b/flink-examples/flink-scala-examples/pom.xml index 3b50c8b715badda1538a6edd70c09bd5ccd055f5..79053d65fa0eee525a8ac60dc901038c7907a940 100644 --- a/flink-examples/flink-scala-examples/pom.xml +++ b/flink-examples/flink-scala-examples/pom.xml @@ -38,11 +38,11 @@ under the License. flink-scala ${project.version} - - org.apache.flink - flink-java-examples - ${project.version} - + + org.apache.flink + flink-java-examples + ${project.version} + @@ -54,7 +54,7 @@ under the License. 3.1.4 + scala classes can be resolved later in the (Java) compile phase --> scala-compile-first process-resources @@ -64,7 +64,7 @@ under the License. + scala classes can be resolved later in the (Java) test-compile phase --> scala-test-compile process-test-resources @@ -237,7 +237,7 @@ under the License. - --> + --> WordCount @@ -260,7 +260,7 @@ under the License. - - - TransitiveClosureNaive - package - - jar - + + TransitiveClosureNaive + package + + jar + - - TransitiveClosureNaive + + TransitiveClosureNaive - - - org.apache.flink.examples.scala.graph.TransitiveClosureNaive - - + + + org.apache.flink.examples.scala.graph.TransitiveClosureNaive + + - - **/wordcount/TransitiveClosureNaive*.class - - - + + **/wordcount/TransitiveClosureNaive*.class + + + diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesBasic.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesBasic.scala index c920c31a1bcb14e3c8016a92c3dff2417cf2c977..fe121d50d72b5900dcc12b22905711aff8b60573 100644 --- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesBasic.scala +++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesBasic.scala @@ -24,27 +24,26 @@ import org.apache.flink.api.common.functions.GroupReduceFunction import org.apache.flink.util.Collector import org.apache.flink.examples.java.graph.util.EnumTrianglesData import org.apache.flink.api.common.operators.Order -import scala.collection.mutable.MutableList + +import scala.collection.mutable /** * Triangle enumeration is a pre-processing step to find closely connected parts in graphs. * A triangle consists of three edges that connect three vertices with each other. * - *

- * The algorithm works as follows: + * The algorithm works as follows: * It groups all edges that share a common vertex and builds triads, i.e., triples of vertices * that are connected by two edges. Finally, all triads are filtered for which no third edge exists * that closes the triangle. * - *

* Input files are plain text files and must be formatted as follows: - *

    - *
  • Edges are represented as pairs for vertex IDs which are separated by space - * characters. Edges are separated by new-line characters.
    - * For example "1 2\n2 12\n1 12\n42 63\n" gives four (undirected) edges (1)-(2), (2)-(12), (1)-(12), and (42)-(63) - * that include a triangle - *
+ * + * - Edges are represented as pairs for vertex IDs which are separated by space + * characters. Edges are separated by new-line characters. + * For example `"1 2\n2 12\n1 12\n42 63\n"` gives four (undirected) edges (1)-(2), (2)-(12), + * (1)-(12), and (42)-(63) that include a triangle + * *
  *     (1)
  *     /  \
@@ -59,13 +58,11 @@ import scala.collection.mutable.MutableList
  * If no parameters are provided, the program is run with default data from 
  * [[org.apache.flink.examples.java.graph.util.EnumTrianglesData]]
  * 
- * 

* This example shows how to use: - *

    - *
  • Custom Java objects which extend Tuple - *
  • Group Sorting - *
- * + * + * - Custom Java objects which extend Tuple + * - Group Sorting + * */ object EnumTrianglesBasic { @@ -91,7 +88,7 @@ object EnumTrianglesBasic { // emit result if (fileOutput) { - triangles.writeAsCsv(outputPath, "\n", " ") + triangles.writeAsCsv(outputPath, "\n", ",") } else { triangles.print() } @@ -119,12 +116,12 @@ object EnumTrianglesBasic { */ class TriadBuilder extends GroupReduceFunction[Edge, Triad] { - val vertices = MutableList[Integer]() + val vertices = mutable.MutableList[Integer]() override def reduce(edges: java.lang.Iterable[Edge], out: Collector[Triad]) = { // clear vertex list - vertices.clear + vertices.clear() // build and emit triads for(e <- edges.asScala) { @@ -153,10 +150,10 @@ object EnumTrianglesBasic { false } } else { - System.out.println("Executing Enum Triangles Basic example with built-in default data."); - System.out.println(" Provide parameters to read input data from files."); - System.out.println(" See the documentation for the correct format of input files."); - System.out.println(" Usage: EnumTriangleBasic "); + System.out.println("Executing Enum Triangles Basic example with built-in default data.") + System.out.println(" Provide parameters to read input data from files.") + System.out.println(" See the documentation for the correct format of input files.") + System.out.println(" Usage: EnumTriangleBasic ") } true } diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOpt.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOpt.scala index 80cce3505f453023f8b494af7f893f5e2ca474ef..937049134b55ac35490172dd22fe1c0f7b70295a 100644 --- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOpt.scala +++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOpt.scala @@ -26,6 +26,8 @@ import org.apache.flink.examples.java.graph.util.EnumTrianglesData import org.apache.flink.api.common.operators.Order import scala.collection.mutable.MutableList +import scala.collection.mutable + /** * Triangle enumeration is a pre-processing step to find closely connected parts in graphs. diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/LineRank.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/LineRank.scala deleted file mode 100644 index 6902a6f2ad2609386af7c201de36f3e7d865eaec..0000000000000000000000000000000000000000 --- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/LineRank.scala +++ /dev/null @@ -1,96 +0,0 @@ -///** -// * Licensed to the Apache Software Foundation (ASF) under one -// * or more contributor license agreements. See the NOTICE file -// * distributed with this work for additional information -// * regarding copyright ownership. The ASF licenses this file -// * to you under the Apache License, Version 2.0 (the -// * "License"); you may not use this file except in compliance -// * with the License. You may obtain a copy of the License at -// * -// * http://www.apache.org/licenses/LICENSE-2.0 -// * -// * Unless required by applicable law or agreed to in writing, software -// * distributed under the License is distributed on an "AS IS" BASIS, -// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// * See the License for the specific language governing permissions and -// * limitations under the License. -// */ -// -//package org.apache.flink.examples.scala.graph -// -//import org.apache.flink.client.LocalExecutor -//import org.apache.flink.api.common.{ ProgramDescription, Program } -// -//import org.apache.flink.api.scala._ -//import org.apache.flink.api.scala.operators._ -// -// -//class LineRank extends Program with Serializable { -// -// case class Edge(source: Int, target: Int, weight: Double) -// case class VectorElement(index: Int, value: Double) -// -// override def getPlan(args: String*) = { -// getScalaPlan(args(0).toInt, args(1), args(2), args(3).toInt, args(4)) -// } -// -// def sumElements(elem1: VectorElement, elem2: VectorElement) = VectorElement(elem1.index, elem1.value + elem2.value) -// -// def sgtTimes(SGT: DataSetOLD[Edge], vector: DataSetOLD[VectorElement]) = { -// SGT.join(vector).where(_.source).isEqualTo(_.index) -// .map((edge, elem) => VectorElement(edge.target, edge.weight * elem.value)) -// .groupBy(_.index).reduce(sumElements) -// } -// -// def tgTimes(TG: DataSetOLD[Edge], vector: DataSetOLD[VectorElement]) = { -// TG.join(vector).where(_.target).isEqualTo(_.index) -// .map((edge, elem) => VectorElement(edge.source, edge.weight * elem.value)) -// } -// -// def rescale(v3: DataSetOLD[VectorElement], c: Double, r: Double) = { -// v3.map(elem => { VectorElement(elem.index, c * elem.value + (1 - c) * r) }) -// } -// -// def powerMethod(SGT: DataSetOLD[Edge], TG: DataSetOLD[Edge], d: DataSetOLD[VectorElement], c: Double, r: Double)(v: DataSetOLD[VectorElement]) = { -// -// val v1 = d.join(v).where(_.index).isEqualTo(_.index) -// .map((leftElem, rightElem) => VectorElement(leftElem.index, leftElem.value * rightElem.value)) -// -// val v2 = sgtTimes(SGT, v1) -// val v3 = tgTimes(TG, v2) -// val nextV = rescale(v3, c, r) -// -// nextV -// } -// -// def getScalaPlan(numSubTasks: Int, sourceIncidenceMatrixPath: String, targetIncidenceMatrixPath: String, m: Int, -// outputPath: String) = { -// -// val c = .85 -// val r = 1.0 / m -// -// val SGT = DataSource(sourceIncidenceMatrixPath, CsvInputFormat[Edge]()) -// val TG = DataSource(targetIncidenceMatrixPath, CsvInputFormat[Edge]()) -// -// val d1 = SGT.map(edge => VectorElement(edge.target, edge.weight)) -// .groupBy(_.index) -// .reduce(sumElements) -// -// val d2 = tgTimes(TG, d1) -// -// val d = d2.map(elem => VectorElement(elem.index, 1 / elem.value)) -// -// val initialV1 = d.map(elem => VectorElement(elem.index, elem.value * m)) -// val initialV2 = sgtTimes(SGT, initialV1) -// val initialV3 = tgTimes(TG, initialV2) -// val initialV = rescale(initialV3, c, r) -// -// val v = initialV.iterate(5, powerMethod(SGT, TG, d, c, r)) -// -// val output = v.write(outputPath, CsvOutputFormat()) -// -// val plan = new ScalaPlan(Seq(output), "LineRank") -// plan.setDefaultParallelism(numSubTasks) -// plan -// } -//} \ No newline at end of file diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala index 25347ca8ac1ae70a4f66f5c52d7b0339f9b1e930..5416bb49bf8c95e9328ceb6c5278bb09ab832d93 100644 --- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala +++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala @@ -109,4 +109,4 @@ object TransitiveClosureNaive { env.fromCollection(edgeData) } } -} +} \ No newline at end of file diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/iterative/TerminationCriterion.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/iterative/TerminationCriterion.scala deleted file mode 100644 index b9f226477f0f36c6e764e94ca5d04a34400f6f11..0000000000000000000000000000000000000000 --- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/iterative/TerminationCriterion.scala +++ /dev/null @@ -1,78 +0,0 @@ -///** -// * Licensed to the Apache Software Foundation (ASF) under one -// * or more contributor license agreements. See the NOTICE file -// * distributed with this work for additional information -// * regarding copyright ownership. The ASF licenses this file -// * to you under the Apache License, Version 2.0 (the -// * "License"); you may not use this file except in compliance -// * with the License. You may obtain a copy of the License at -// * -// * http://www.apache.org/licenses/LICENSE-2.0 -// * -// * Unless required by applicable law or agreed to in writing, software -// * distributed under the License is distributed on an "AS IS" BASIS, -// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// * See the License for the specific language governing permissions and -// * limitations under the License. -// */ -// -// -//package org.apache.flink.examples.scala.iterative -// -//import org.apache.flink.client.LocalExecutor -//import org.apache.flink.api.common.Program -//import org.apache.flink.api.common.ProgramDescription -// -//import org.apache.flink.api.scala._ -//import org.apache.flink.api.scala.operators._ -// -///** -// * Example of using the bulk iteration with termination criterion with the -// * scala api. -// */ -//class TerminationCriterion extends Program with ProgramDescription with Serializable { -// override def getDescription() = { -// "Parameters: " -// } -// -// override def getPlan(args: String*) = { -// getScalaPlan(args(0).toInt, args(1)) -// } -// -// def getScalaPlan(maxNumberIterations: Int, resultOutput: String) = { -// val dataSource = CollectionDataSource[Double](List(1.0)) -// -// val halve = (partialSolution: DataSetOLD[Double]) => { -// partialSolution map { x => x /2 } -// } -// -// val terminationCriterion = (prev: DataSetOLD[Double], cur: DataSetOLD[Double]) => { -// val diff = prev cross cur map { (valuePrev, valueCurrent) => math.abs(valuePrev - valueCurrent) } -// diff filter { -// difference => difference > 0.1 -// } -// } -// -// val iteration = dataSource.iterateWithTermination(maxNumberIterations, halve, terminationCriterion) -// -// -// val sink = iteration.write(resultOutput, CsvOutputFormat()) -// -// val plan = new ScalaPlan(Seq(sink)) -// plan.setDefaultParallelism(1) -// plan -// } -//} -// -//object RunTerminationCriterion { -// def main(args: Array[String]) { -// val tc = new TerminationCriterion -// -// if(args.size < 2) { -// println(tc.getDescription()) -// return -// } -// val plan = tc.getScalaPlan(args(0).toInt, args(1)) -// LocalExecutor.execute(plan) -// } -//} diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/ml/LinearRegression.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/ml/LinearRegression.scala index 3e6627570f1ff86baa2accba6b869b4ad05b5ef7..95dcb9aedcf39d401fcdba1da8b3203e58696f89 100644 --- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/ml/LinearRegression.scala +++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/ml/LinearRegression.scala @@ -28,58 +28,56 @@ import org.apache.flink.examples.java.ml.util.LinearRegressionData import scala.collection.JavaConverters._ /** - * This example implements a basic Linear Regression to solve the y = theta0 + theta1*x problem using batch gradient descent algorithm. + * This example implements a basic Linear Regression to solve the y = theta0 + theta1*x problem + * using batch gradient descent algorithm. * - *

- * Linear Regression with BGD(batch gradient descent) algorithm is an iterative clustering algorithm and works as follows:
- * Giving a data set and target set, the BGD try to find out the best parameters for the data set to fit the target set. - * In each iteration, the algorithm computes the gradient of the cost function and use it to update all the parameters. - * The algorithm terminates after a fixed number of iterations (as in this implementation) + * Linear Regression with BGD(batch gradient descent) algorithm is an iterative clustering + * algorithm and works as follows: + * + * Giving a data set and target set, the BGD try to find out the best parameters for the data set + * to fit the target set. + * In each iteration, the algorithm computes the gradient of the cost function and use it to + * update all the parameters. + * The algorithm terminates after a fixed number of iterations (as in this implementation). * With enough iteration, the algorithm can minimize the cost function and find the best parameters - * This is the Wikipedia entry for the Linear regression and Gradient descent algorithm. - * - *

- * This implementation works on one-dimensional data. And find the two-dimensional theta.
- * It find the best Theta parameter to fit the target. - * - *

+ * This is the Wikipedia entry for the + * [[http://en.wikipedia.org/wiki/Linear_regression Linear regression]] and + * [[http://en.wikipedia.org/wiki/Gradient_descent Gradient descent algorithm]]. + * + * This implementation works on one-dimensional data and finds the best two-dimensional theta to + * fit the target. + * * Input files are plain text files and must be formatted as follows: - *

    - *
  • Data points are represented as two double values separated by a blank character. The first one represent the X(the training data) and the second represent the Y(target). - * Data points are separated by newline characters.
    - * For example "-0.02 -0.04\n5.3 10.6\n" gives two data points (x=-0.02, y=-0.04) and (x=5.3, y=10.6). - *
- * - *

+ * + * - Data points are represented as two double values separated by a blank character. The first + * one represent the X(the training data) and the second represent the Y(target). Data points are + * separated by newline characters. + * For example `"-0.02 -0.04\n5.3 10.6\n"`gives two data points + * (x=-0.02, y=-0.04) and (x=5.3, y=10.6). + * * This example shows how to use: - *

    - *
  • Bulk iterations - *
  • Broadcast variables in bulk iterations - *
  • Custom Java objects (PoJos) - *
+ * + * - Bulk iterations + * - Broadcast variables in bulk iterations */ object LinearRegression { - // ************************************************************************* - // PROGRAM - // ************************************************************************* def main(args: Array[String]) { if (!parseParameters(args)) { return } val env = ExecutionEnvironment.getExecutionEnvironment - val data: DataSet[Data] = getDataSet(env) - val parameters: DataSet[Params] = getParamsDataSet(env) + val data = getDataSet(env) + val parameters = getParamsDataSet(env) + val result = parameters.iterate(numIterations) { currentParameters => val newParameters = data .map(new SubUpdate).withBroadcastSet(currentParameters, "parameters") - .reduce { (val1, val2) => - val new_theta0: Double = val1._1.getTheta0 + val2._1.getTheta0 - val new_theta1: Double = val1._1.getTheta1 + val2._1.getTheta1 - val result: Params = new Params(new_theta0, new_theta1) - (result, val1._2 + val2._2) - } + .reduce { (p1, p2) => + val result = p1._1 + p2._1 + (result, p1._2 + p2._2) + } .map { x => x._1.div(x._2) } newParameters } @@ -88,73 +86,28 @@ object LinearRegression { result.writeAsText(outputPath) } else { - result.print + result.print() } env.execute("Scala Linear Regression example") } - // ************************************************************************* - // DATA TYPES - // ************************************************************************* /** * A simple data sample, x means the input, and y means the target. */ - class Data extends Serializable { - - def this(x: Double, y: Double) { - this() - this.x = x - this.y = y - } - - override def toString: String = { - "(" + x + "|" + y + ")" - } - - var x: Double = .0 - var y: Double = .0 - } + case class Data(var x: Double, var y: Double) /** * A set of parameters -- theta0, theta1. */ - class Params extends Serializable { + case class Params(theta0: Double, theta1: Double) { + def div(a: Int): Params = { + Params(theta0 / a, theta1 / a) + } - def this(x0: Double, x1: Double) { - this() - this.theta0 = x0 - this.theta1 = x1 - } - - override def toString: String = { - theta0 + " " + theta1 - } - - def getTheta0: Double = { - theta0 - } - - def getTheta1: Double = { - theta1 - } - - def setTheta0(theta0: Double) { - this.theta0 = theta0 - } - - def setTheta1(theta1: Double) { - this.theta1 = theta1 - } - - def div(a: Integer): Params = { - this.theta0 = theta0 / a - this.theta1 = theta1 / a - return this - } - - private var theta0: Double = .0 - private var theta1: Double = .0 - } + def +(other: Params) = { + Params(theta0 + other.theta0, theta1 + other.theta1) + } + } // ************************************************************************* // USER FUNCTIONS @@ -163,24 +116,22 @@ object LinearRegression { /** * Compute a single BGD type update for every parameters. */ - class SubUpdate extends RichMapFunction[Data, Tuple2[Params, Integer]] { + class SubUpdate extends RichMapFunction[Data, (Params, Int)] { - private var parameters: Traversable[Params] = null - var parameter: Params = null - private var count: Int = 1 + private var parameter: Params = null /** Reads the parameters from a broadcast variable into a collection. */ override def open(parameters: Configuration) { - this.parameters = getRuntimeContext.getBroadcastVariable[Params]("parameters").asScala + val parameters = getRuntimeContext.getBroadcastVariable[Params]("parameters").asScala + parameter = parameters.head } - def map(in: Data): Tuple2[Params, Integer] = { - for (p <- parameters) { - this.parameter = p - } - val theta_0: Double = parameter.getTheta0 - 0.01 * ((parameter.getTheta0 + (parameter.getTheta1 * in.x)) - in.y) - val theta_1: Double = parameter.getTheta1 - 0.01 * (((parameter.getTheta0 + (parameter.getTheta1 * in.x)) - in.y) * in.x) - new Tuple2[Params, Integer](new Params(theta_0, theta_1), count) + def map(in: Data): (Params, Int) = { + val theta0 = + parameter.theta0 - 0.01 * ((parameter.theta0 + (parameter.theta1 * in.x)) - in.y) + val theta1 = + parameter.theta1 - 0.01 * (((parameter.theta0 + (parameter.theta1 * in.x)) - in.y) * in.x) + (Params(theta0, theta1), 1) } } @@ -198,7 +149,7 @@ object LinearRegression { if (programArguments.length == 3) { dataPath = programArguments(0) outputPath = programArguments(1) - numIterations = Integer.parseInt(programArguments(2)) + numIterations = programArguments(2).toInt } else { System.err.println("Usage: LinearRegression ") @@ -206,11 +157,13 @@ object LinearRegression { } } else { - System.out.println("Executing Linear Regression example with default parameters and built-in default data.") - System.out.println(" Provide parameters to read input data from files.") - System.out.println(" See the documentation for the correct format of input files.") - System.out.println(" We provide a data generator to create synthetic input files for this program.") - System.out.println(" Usage: LinearRegression ") + System.out.println("Executing Linear Regression example with default parameters and " + + "built-in default data.") + System.out.println(" Provide parameters to read input data from files.") + System.out.println(" See the documentation for the correct format of input files.") + System.out.println(" We provide a data generator to create synthetic input files for this " + + "program.") + System.out.println(" Usage: LinearRegression ") } true } @@ -225,7 +178,7 @@ object LinearRegression { } else { val data = LinearRegressionData.DATA map { - case Array(x, y) => new Data(x.asInstanceOf[Double], y.asInstanceOf[Double]) + case Array(x, y) => Data(x.asInstanceOf[Double], y.asInstanceOf[Double]) } env.fromCollection(data) } @@ -233,7 +186,7 @@ object LinearRegression { private def getParamsDataSet(env: ExecutionEnvironment): DataSet[Params] = { val params = LinearRegressionData.PARAMS map { - case Array(x, y) => new Params(x.asInstanceOf[Double], y.asInstanceOf[Double]) + case Array(x, y) => Params(x.asInstanceOf[Double], y.asInstanceOf[Double]) } env.fromCollection(params) } diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/RelationalQuery.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/RelationalQuery.scala deleted file mode 100644 index 5e7d7f334949247bedb0d5b0cfb2b77a46ee8a2f..0000000000000000000000000000000000000000 --- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/RelationalQuery.scala +++ /dev/null @@ -1,107 +0,0 @@ -///** -// * Licensed to the Apache Software Foundation (ASF) under one -// * or more contributor license agreements. See the NOTICE file -// * distributed with this work for additional information -// * regarding copyright ownership. The ASF licenses this file -// * to you under the Apache License, Version 2.0 (the -// * "License"); you may not use this file except in compliance -// * with the License. You may obtain a copy of the License at -// * -// * http://www.apache.org/licenses/LICENSE-2.0 -// * -// * Unless required by applicable law or agreed to in writing, software -// * distributed under the License is distributed on an "AS IS" BASIS, -// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// * See the License for the specific language governing permissions and -// * limitations under the License. -// */ -// -// -//package org.apache.flink.examples.scala.relational; -// -//import org.apache.flink.client.LocalExecutor -//import org.apache.flink.api.common.Program -//import org.apache.flink.api.common.ProgramDescription -// -//import org.apache.flink.api.scala._ -//import org.apache.flink.api.scala.operators._ -// -// -///** -// * The TPC-H is a decision support benchmark on relational data. -// * Its documentation and the data generator (DBGEN) can be found -// * on http://www.tpc.org/tpch/ . -// * -// * This Flink program implements a modified version of the query 3 of -// * the TPC-H benchmark including one join, some filtering and an -// * aggregation. The query resembles the following SQL statement: -// *
-// * SELECT l_orderkey, o_shippriority, sum(l_extendedprice) as revenue
-// *   FROM orders, lineitem
-// *   WHERE l_orderkey = o_orderkey
-// *     AND o_orderstatus = "X"
-// *     AND YEAR(o_orderdate) > Y
-// *     AND o_orderpriority LIKE "Z%"
-// *   GROUP BY l_orderkey, o_shippriority;
-// * 
-// */ -//class RelationalQuery extends Program with ProgramDescription with Serializable { -// -// case class Order(orderId: Int, status: Char, year: Int, orderPriority: String, shipPriority: Int) -// case class LineItem(orderId: Int, extendedPrice: Double) -// case class PrioritizedOrder(orderId: Int, shipPriority: Int, revenue: Double) -// -// -// def getScalaPlan(numSubTasks: Int, ordersInput: String, lineItemsInput: String, ordersOutput: String, status: Char = 'F', minYear: Int = 1993, priority: String = "5") = { -// -// // ORDER intput: parse as CSV and select relevant fields -// val orders = DataSource(ordersInput, CsvInputFormat[(Int, String, String, String, String, String, String, Int)]("\n", '|')) -// .map { t => Order(t._1, t._3.charAt(0), t._5.substring(0,4).toInt, t._6, t._8) } -// -// // ORDER intput: parse as CSV and select relevant fields -// val lineItems = DataSource(lineItemsInput, CsvInputFormat[(Int, String, String, String, String, Double)]("\n", '|')) -// .map { t => LineItem(t._1, t._6) } -// -// // filter the orders input -// val filteredOrders = orders filter { o => o.status == status && o.year > minYear && o.orderPriority.startsWith(priority) } -// -// // join the filteres result with the lineitem input -// val prioritizedItems = filteredOrders join lineItems where { _.orderId } isEqualTo { _.orderId } map { (o, li) => PrioritizedOrder(o.orderId, o.shipPriority, li.extendedPrice) } -// -// // group by and sum the joined data -// val prioritizedOrders = prioritizedItems groupBy { pi => (pi.orderId, pi.shipPriority) } reduce { (po1, po2) => po1.copy(revenue = po1.revenue + po2.revenue) } -// -// // write the result as csv -// val output = prioritizedOrders.write(ordersOutput, CsvOutputFormat("\n", "|")) -// -// val plan = new ScalaPlan(Seq(output), "Relational Query") -// plan.setDefaultParallelism(numSubTasks) -// plan -// } -// -// override def getDescription() = { -// "Parameters: , , , " -// } -// override def getPlan(args: String*) = { -// getScalaPlan(args(3).toInt, args(0), args(1), args(2)) -// } -//} -// -// -///** -// * Entry point to make the example standalone runnable with the local executor -// */ -//object RunRelationalQuery { -// -// def main(args: Array[String]) { -// val query = new RelationalQuery -// -// if (args.size < 4) { -// println(query.getDescription) -// return -// } -// val plan = query.getScalaPlan(args(3).toInt, args(0), args(1), args(2)) -// LocalExecutor.execute(plan) -// } -//} -// diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/testing/KMeansForTest.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/testing/KMeansForTest.scala deleted file mode 100644 index 52bfc1510f7366ededb04f84402586aafb11f8f9..0000000000000000000000000000000000000000 --- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/testing/KMeansForTest.scala +++ /dev/null @@ -1,105 +0,0 @@ -///** -// * Licensed to the Apache Software Foundation (ASF) under one -// * or more contributor license agreements. See the NOTICE file -// * distributed with this work for additional information -// * regarding copyright ownership. The ASF licenses this file -// * to you under the Apache License, Version 2.0 (the -// * "License"); you may not use this file except in compliance -// * with the License. You may obtain a copy of the License at -// * -// * http://www.apache.org/licenses/LICENSE-2.0 -// * -// * Unless required by applicable law or agreed to in writing, software -// * distributed under the License is distributed on an "AS IS" BASIS, -// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// * See the License for the specific language governing permissions and -// * limitations under the License. -// */ -// -// -//package org.apache.flink.examples.scala.testing -// -//// Uncomment if you need to rebuild it for PackagedProgramEndToEndTest -//// -////import org.apache.flink.api.common.Program -////import org.apache.flink.api.common.ProgramDescription -//// -////import org.apache.flink.api.scala._ -////import org.apache.flink.api.scala.operators._ -//// -//// -////class KMeansForTest extends Program with ProgramDescription { -//// -//// override def getPlan(args: String*) = { -//// getScalaPlan(args(0).toInt, args(1), args(2), args(3), args(4).toInt) -//// } -//// -//// case class Point(x: Double, y: Double, z: Double) { -//// def computeEuclidianDistance(other: Point) = other match { -//// case Point(x2, y2, z2) => math.sqrt(math.pow(x - x2, 2) + math.pow(y - y2, 2) + math.pow(z - z2, 2)) -//// } -//// } -//// -//// case class Distance(dataPoint: Point, clusterId: Int, distance: Double) -//// -//// def asPointSum = (pid: Int, dist: Distance) => dist.clusterId -> PointSum(1, dist.dataPoint) -//// -//// // def sumPointSums = (dataPoints: Iterator[(Int, PointSum)]) => dataPoints.reduce { (z, v) => z.copy(_2 = z._2 + v._2) } -//// def sumPointSums = (dataPoints: Iterator[(Int, PointSum)]) => { -//// dataPoints.reduce { (z, v) => z.copy(_2 = z._2 + v._2) } -//// } -//// -//// -//// case class PointSum(count: Int, pointSum: Point) { -//// def +(that: PointSum) = that match { -//// case PointSum(c, Point(x, y, z)) => PointSum(count + c, Point(x + pointSum.x, y + pointSum.y, z + pointSum.z)) -//// } -//// -//// def toPoint() = Point(round(pointSum.x / count), round(pointSum.y / count), round(pointSum.z / count)) -//// -//// // Rounding ensures that we get the same results in a multi-iteration run -//// // as we do in successive single-iteration runs, since the output format -//// // only contains two decimal places. -//// private def round(d: Double) = math.round(d * 100.0) / 100.0; -//// } -//// -//// def parseInput = (line: String) => { -//// val PointInputPattern = """(\d+)\|-?(\d+\.\d+)\|-?(\d+\.\d+)\|-?(\d+\.\d+)\|""".r -//// val PointInputPattern(id, x, y, z) = line -//// (id.toInt, Point(x.toDouble, y.toDouble, z.toDouble)) -//// } -//// -//// def formatOutput = (cid: Int, p: Point) => "%d|%.2f|%.2f|%.2f|".format(cid, p.x, p.y, p.z) -//// -//// def computeDistance(p: (Int, Point), c: (Int, Point)) = { -//// val ((pid, dataPoint), (cid, clusterPoint)) = (p, c) -//// val distToCluster = dataPoint.computeEuclidianDistance(clusterPoint) -//// -//// pid -> Distance(dataPoint, cid, distToCluster) -//// } -//// -//// -//// def getScalaPlan(numSubTasks: Int, dataPointInput: String, clusterInput: String, clusterOutput: String, numIterations: Int) = { -//// val dataPoints = DataSource(dataPointInput, DelimitedInputFormat(parseInput)) -//// val clusterPoints = DataSource(clusterInput, DelimitedInputFormat(parseInput)) -//// -//// val finalCenters = clusterPoints.iterate(numIterations, { centers => -//// -//// val distances = dataPoints cross centers map computeDistance -//// val nearestCenters = distances groupBy { case (pid, _) => pid } reduceGroup { ds => ds.minBy(_._2.distance) } map asPointSum.tupled -//// val newCenters = nearestCenters groupBy { case (cid, _) => cid } reduceGroup sumPointSums map { case (cid, pSum) => cid -> pSum.toPoint() } -//// -//// newCenters -//// }) -//// -//// val output = finalCenters.write(clusterOutput, DelimitedOutputFormat(formatOutput.tupled)) -//// -//// val plan = new ScalaPlan(Seq(output), "KMeans Iteration (ONLY FOR TESTING)") -//// plan.setDefaultParallelism(numSubTasks) -//// plan -//// } -//// -//// override def getDescription() = { -//// "Parameters: [numSubStasksS] [dataPoints] [clusterCenters] [output] [numIterations]" -//// } -////} \ No newline at end of file diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/wordcount/WordCountWithCount.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/wordcount/WordCountWithCount.scala deleted file mode 100644 index f71b18deec3507688f9713f4c4ee65e93ac3a3ac..0000000000000000000000000000000000000000 --- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/wordcount/WordCountWithCount.scala +++ /dev/null @@ -1,63 +0,0 @@ -///** -// * Licensed to the Apache Software Foundation (ASF) under one -// * or more contributor license agreements. See the NOTICE file -// * distributed with this work for additional information -// * regarding copyright ownership. The ASF licenses this file -// * to you under the Apache License, Version 2.0 (the -// * "License"); you may not use this file except in compliance -// * with the License. You may obtain a copy of the License at -// * -// * http://www.apache.org/licenses/LICENSE-2.0 -// * -// * Unless required by applicable law or agreed to in writing, software -// * distributed under the License is distributed on an "AS IS" BASIS, -// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// * See the License for the specific language governing permissions and -// * limitations under the License. -// */ -// -// -//package org.apache.flink.examples.scala.wordcount -// -//import org.apache.flink.client.LocalExecutor -//import org.apache.flink.api.common.Program -//import org.apache.flink.api.common.ProgramDescription -// -//import org.apache.flink.api.scala._ -//import org.apache.flink.api.scala.operators._ -// -// -///** -// * Implementation of word count in Scala. This example uses the built in count function for tuples. -// */ -//class WordCountWithCount extends WordCount { -// -// override def getScalaPlan(numSubTasks: Int, textInput: String, wordsOutput: String) = { -// val input = TextFile(textInput) -// -// val words = input flatMap { _.toLowerCase().split("""\W+""") filter { _ != "" } } -// val counts = words groupBy { x => x } count() -// -// val output = counts.write(wordsOutput, CsvOutputFormat("\n", " ")) -// -// val plan = new ScalaPlan(Seq(output), "Word Count") -// plan.setDefaultParallelism(numSubTasks) -// plan -// } -//} -// -// -///** -// * Entry point to make the example standalone runnable with the local executor. -// */ -//object RunWordCountWithCount { -// def main(args: Array[String]) { -// val wc = new WordCountWithCount -// if (args.size < 3) { -// println(wc.getDescription) -// return -// } -// val plan = wc.getScalaPlan(args(0).toInt, args(1), args(2)) -// LocalExecutor.execute(plan) -// } -//} diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/wordcount/WordCountWithUserDefinedType.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/wordcount/WordCountWithUserDefinedType.scala deleted file mode 100644 index 2ee0c4363548ccb8f0b410abc3d3f7473d887140..0000000000000000000000000000000000000000 --- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/wordcount/WordCountWithUserDefinedType.scala +++ /dev/null @@ -1,59 +0,0 @@ -///** -// * Licensed to the Apache Software Foundation (ASF) under one -// * or more contributor license agreements. See the NOTICE file -// * distributed with this work for additional information -// * regarding copyright ownership. The ASF licenses this file -// * to you under the Apache License, Version 2.0 (the -// * "License"); you may not use this file except in compliance -// * with the License. You may obtain a copy of the License at -// * -// * http://www.apache.org/licenses/LICENSE-2.0 -// * -// * Unless required by applicable law or agreed to in writing, software -// * distributed under the License is distributed on an "AS IS" BASIS, -// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// * See the License for the specific language governing permissions and -// * limitations under the License. -// */ -// -// -//package org.apache.flink.examples.scala.wordcount -// -//import org.apache.flink.client.LocalExecutor -//import org.apache.flink.api.common.Program -//import org.apache.flink.api.common.ProgramDescription -// -//import org.apache.flink.types.IntValue -//import org.apache.flink.types.StringValue -// -//import org.apache.flink.api.scala._ -//import org.apache.flink.api.scala.operators._ -// -// -///** -// * Implementation of word count in Scala, using a user defined type rather than one of the -// * built-in supported types like primitives, tuples, or other (nested) case classes. -// */ -//class WordCountWithUserDefinedType extends Program with Serializable { -// -// def getScalaPlan(numSubTasks: Int, textInput: String, wordsOutput: String) = { -// val input = TextFile(textInput) -// -// val words = input flatMap { _.toLowerCase().split("""\W+""") filter { _ != "" } map { w => (new StringValue(w), new IntValue(1)) } } -// -// val counts = words -// .groupBy { case (word, _) => word } -// .reduce { (w1, w2) => (w1._1, new IntValue(w1._2.getValue + w2._2.getValue)) } -// -// val output = counts.write(wordsOutput, CsvOutputFormat("\n", " ")) -// -// val plan = new ScalaPlan(Seq(output), "Word Count (immutable)") -// plan.setDefaultParallelism(numSubTasks) -// plan -// } -// -// -// override def getPlan(args: String*) = { -// getScalaPlan(args(0).toInt, args(1), args(2)) -// } -//} diff --git a/flink-scala/pom.xml b/flink-scala/pom.xml index 45db3909b35030f0692571a73de21cc1b3cf3fb3..dfd941925dc8c5a9545c5c4ce4bd34993809da50 100644 --- a/flink-scala/pom.xml +++ b/flink-scala/pom.xml @@ -39,23 +39,23 @@ under the License. ${project.version} - - org.apache.flink - flink-core - ${project.version} - test-jar - test - - - - org.apache.flink - flink-java - ${project.version} - test-jar - test - - - + + org.apache.flink + flink-core + ${project.version} + test-jar + test + + + + org.apache.flink + flink-java + ${project.version} + test-jar + test + + + org.apache.flink flink-java ${project.version} @@ -108,7 +108,7 @@ under the License. 3.1.4 + scala classes can be resolved later in the (Java) compile phase --> scala-compile-first process-resources @@ -118,7 +118,7 @@ under the License. + scala classes can be resolved later in the (Java) test-compile phase --> scala-test-compile process-test-resources diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala index d04f96817a9acae23915480f8222861de61a0ead..e9730934cbbc01275bdc3527002f5fb2c44f8830 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala @@ -635,14 +635,12 @@ class DataSet[T: ClassTag](private[flink] val set: JavaDataSet[T]) { * val left: DataSet[(String, Int, Int)] = ... * val right: DataSet[(Int, String, Int)] = ... * val joined = left.join(right).where(0).isEqualTo(1) { (l, r) => - * if (l._2 > 4) { - * Some((l._2, r._3)) - * } else { - * None - * } + * (l._1, r._2) * } * }}} - * This can be used to implement a filter directly in the join or to output more than one values: + * A join function with a [[Collector]] can be used to implement a filter directly in the join + * or to output more than one values. This type of join function does not return a value, instead + * values are emitted using the collector: * {{{ * val left: DataSet[(String, Int, Int)] = ... * val right: DataSet[(Int, String, Int)] = ... @@ -696,11 +694,12 @@ class DataSet[T: ClassTag](private[flink] val set: JavaDataSet[T]) { * val right: DataSet[(Int, String, Int)] = ... * val coGrouped = left.coGroup(right).where(0).isEqualTo(1) { (l, r) => * // l and r are of type TraversableOnce - * Some((l.min, r.max)) + * (l.min, r.max) * } * }}} - * This can be used to implement a filter directly in the coGroup or to output more than one - * values: + * A coGroup function with a [[Collector]] can be used to implement a filter directly in the + * coGroup or to output more than one values. This type of coGroup function does not return a + * value, instead values are emitted using the collector * {{{ * val left: DataSet[(String, Int, Int)] = ... * val right: DataSet[(Int, String, Int)] = ... diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala index a7ca8210727f91dcc7119f1efdd34f71e1d3c42e..802fd09ff6910fe5f96d32d810bc7fd509b66fdc 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala @@ -144,11 +144,9 @@ trait GroupedDataSet[T] { } /** - * /** * Private implementation for [[GroupedDataSet]] to keep the implementation details, i.e. the * parameters of the constructor, hidden. */ - */ private[flink] class GroupedDataSetImpl[T: ClassTag]( private val set: JavaDataSet[T], private val keys: Keys[T]) @@ -256,7 +254,7 @@ private[flink] class GroupedDataSetImpl[T: ClassTag]( } def reduceGroup[R: TypeInformation: ClassTag]( - fun: (TraversableOnce[T]) => R): DataSet[R] = { + fun: (TraversableOnce[T]) => R): DataSet[R] = { Validate.notNull(fun, "Group reduce function must not be null.") val reducer = new GroupReduceFunction[T, R] { def reduce(in: java.lang.Iterable[T], out: Collector[R]) { diff --git a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/ComputeEdgeDegreesITCase.java b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/ComputeEdgeDegreesITCase.java deleted file mode 100644 index 2426bc9589844196bfce6e596a6d6eed8a64be80..0000000000000000000000000000000000000000 --- a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/ComputeEdgeDegreesITCase.java +++ /dev/null @@ -1,39 +0,0 @@ -///** -// * Licensed to the Apache Software Foundation (ASF) under one -// * or more contributor license agreements. See the NOTICE file -// * distributed with this work for additional information -// * regarding copyright ownership. The ASF licenses this file -// * to you under the Apache License, Version 2.0 (the -// * "License"); you may not use this file except in compliance -// * with the License. You may obtain a copy of the License at -// * -// * http://www.apache.org/licenses/LICENSE-2.0 -// * -// * Unless required by applicable law or agreed to in writing, software -// * distributed under the License is distributed on an "AS IS" BASIS, -// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// * See the License for the specific language governing permissions and -// * limitations under the License. -// */ -// -// -//package org.apache.flink.test.exampleScalaPrograms; -// -//import org.apache.flink.api.common.Plan; -//import org.apache.flink.configuration.Configuration; -//import org.apache.flink.examples.scala.graph.ComputeEdgeDegrees; -// -//public class ComputeEdgeDegreesITCase extends org.apache.flink.test.recordJobTests.ComputeEdgeDegreesITCase { -// -// public ComputeEdgeDegreesITCase(Configuration config) { -// super(config); -// } -// -// @Override -// protected Plan getTestJob() { -// ComputeEdgeDegrees computeDegrees = new ComputeEdgeDegrees(); -// return computeDegrees.getScalaPlan( -// config.getInteger("ComputeEdgeDegreesTest#NumSubtasks", DOP), -// edgesPath, resultPath); -// } -//} diff --git a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/ConnectedComponentsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/ConnectedComponentsITCase.java index 69a5c9a84a14579d08cab6574174c6f1877dbb13..71a7e23af264c33bd1b181c48245af34cc4afd71 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/ConnectedComponentsITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/ConnectedComponentsITCase.java @@ -1,34 +1,60 @@ -///** -// * Licensed to the Apache Software Foundation (ASF) under one -// * or more contributor license agreements. See the NOTICE file -// * distributed with this work for additional information -// * regarding copyright ownership. The ASF licenses this file -// * to you under the Apache License, Version 2.0 (the -// * "License"); you may not use this file except in compliance -// * with the License. You may obtain a copy of the License at -// * -// * http://www.apache.org/licenses/LICENSE-2.0 -// * -// * Unless required by applicable law or agreed to in writing, software -// * distributed under the License is distributed on an "AS IS" BASIS, -// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// * See the License for the specific language governing permissions and -// * limitations under the License. -// */ -// -// -//package org.apache.flink.test.exampleScalaPrograms; -// -//import org.apache.flink.api.common.Plan; -//import org.apache.flink.examples.scala.graph.ConnectedComponents; -// -//public class ConnectedComponentsITCase extends org.apache.flink.test.iterative.ConnectedComponentsITCase { -// -// @Override -// protected Plan getTestJob() { -// ConnectedComponents cc = new ConnectedComponents(); -// Plan plan = cc.getScalaPlan(verticesPath, edgesPath, resultPath, 100); -// plan.setDefaultParallelism(DOP); -// return plan; -// } -//} +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.test.exampleScalaPrograms; + +import org.apache.flink.examples.scala.graph.ConnectedComponents; +import org.apache.flink.test.testdata.ConnectedComponentsData; +import org.apache.flink.test.util.JavaProgramTestBase; + +import java.io.BufferedReader; + +public class ConnectedComponentsITCase extends JavaProgramTestBase { + + private static final long SEED = 0xBADC0FFEEBEEFL; + + private static final int NUM_VERTICES = 1000; + + private static final int NUM_EDGES = 10000; + + + private String verticesPath; + private String edgesPath; + private String resultPath; + + + @Override + protected void preSubmit() throws Exception { + verticesPath = createTempFile("vertices.txt", ConnectedComponentsData.getEnumeratingVertices(NUM_VERTICES)); + edgesPath = createTempFile("edges.txt", ConnectedComponentsData.getRandomOddEvenEdges(NUM_EDGES, NUM_VERTICES, SEED)); + resultPath = getTempFilePath("results"); + } + + @Override + protected void testProgram() throws Exception { + ConnectedComponents.main(new String[] {verticesPath, edgesPath, resultPath, "100"}); + } + + @Override + protected void postSubmit() throws Exception { + for (BufferedReader reader : getResultReader(resultPath)) { + ConnectedComponentsData.checkOddEvenResult(reader); + } + } +} diff --git a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/EnumTriangleBasicITCase.java b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/EnumTriangleBasicITCase.java new file mode 100644 index 0000000000000000000000000000000000000000..5c1987660ab97dde3a39b9b54c6fea57f8f0c2f8 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/EnumTriangleBasicITCase.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.exampleScalaPrograms; + +import org.apache.flink.examples.scala.graph.EnumTrianglesBasic; +import org.apache.flink.test.testdata.EnumTriangleData; +import org.apache.flink.test.util.JavaProgramTestBase; + +public class EnumTriangleBasicITCase extends JavaProgramTestBase { + + protected String edgePath; + protected String resultPath; + + @Override + protected void preSubmit() throws Exception { + edgePath = createTempFile("edges", EnumTriangleData.EDGES); + resultPath = getTempDirPath("triangles"); + } + + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(EnumTriangleData.TRIANGLES_BY_ID, resultPath); + } + + @Override + protected void testProgram() throws Exception { + EnumTrianglesBasic.main(new String[] { edgePath, resultPath }); + } + +} diff --git a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/EnumTriangleOptITCase.java b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/EnumTriangleOptITCase.java new file mode 100644 index 0000000000000000000000000000000000000000..944aaf4fb8e341cc444e329d27a0acc36377aacb --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/EnumTriangleOptITCase.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.exampleScalaPrograms; + +import org.apache.flink.examples.scala.graph.EnumTrianglesOpt; +import org.apache.flink.test.testdata.EnumTriangleData; +import org.apache.flink.test.util.JavaProgramTestBase; + +public class EnumTriangleOptITCase extends JavaProgramTestBase { + + protected String edgePath; + protected String resultPath; + + @Override + protected void preSubmit() throws Exception { + edgePath = createTempFile("edges", EnumTriangleData.EDGES); + resultPath = getTempDirPath("triangles"); + } + + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(EnumTriangleData.TRIANGLES_BY_DEGREE, resultPath); + } + + @Override + protected void testProgram() throws Exception { + EnumTrianglesOpt.main(new String[] { edgePath, resultPath }); + } + +} diff --git a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/EnumTrianglesOnEdgesWithDegreesITCase.java b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/EnumTrianglesOnEdgesWithDegreesITCase.java deleted file mode 100644 index ab8c56398f3be275bb3dea56900a28cb439702c9..0000000000000000000000000000000000000000 --- a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/EnumTrianglesOnEdgesWithDegreesITCase.java +++ /dev/null @@ -1,39 +0,0 @@ -///** -// * Licensed to the Apache Software Foundation (ASF) under one -// * or more contributor license agreements. See the NOTICE file -// * distributed with this work for additional information -// * regarding copyright ownership. The ASF licenses this file -// * to you under the Apache License, Version 2.0 (the -// * "License"); you may not use this file except in compliance -// * with the License. You may obtain a copy of the License at -// * -// * http://www.apache.org/licenses/LICENSE-2.0 -// * -// * Unless required by applicable law or agreed to in writing, software -// * distributed under the License is distributed on an "AS IS" BASIS, -// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// * See the License for the specific language governing permissions and -// * limitations under the License. -// */ -// -// -//package org.apache.flink.test.exampleScalaPrograms; -// -//import org.apache.flink.api.common.Plan; -//import org.apache.flink.configuration.Configuration; -//import org.apache.flink.examples.scala.graph.EnumTrianglesOnEdgesWithDegrees; -// -//public class EnumTrianglesOnEdgesWithDegreesITCase extends org.apache.flink.test.recordJobTests.EnumTrianglesOnEdgesWithDegreesITCase { -// -// public EnumTrianglesOnEdgesWithDegreesITCase(Configuration config) { -// super(config); -// } -// -// @Override -// protected Plan getTestJob() { -// EnumTrianglesOnEdgesWithDegrees enumTriangles = new EnumTrianglesOnEdgesWithDegrees(); -// return enumTriangles.getScalaPlan( -// config.getInteger("EnumTrianglesTest#NumSubtasks", DOP), -// edgesPath, resultPath); -// } -//} diff --git a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/PageRankITCase.java b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/PageRankITCase.java new file mode 100644 index 0000000000000000000000000000000000000000..2369a4b62f3d28cf19e802755f16516c90c946a5 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/PageRankITCase.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.exampleScalaPrograms; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.examples.scala.graph.PageRankBasic; +import org.apache.flink.test.testdata.PageRankData; +import org.apache.flink.test.util.JavaProgramTestBase; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.Collection; +import java.util.LinkedList; + +@RunWith(Parameterized.class) +public class PageRankITCase extends JavaProgramTestBase { + + private static int NUM_PROGRAMS = 2; + + private int curProgId = config.getInteger("ProgramId", -1); + + private String verticesPath; + private String edgesPath; + private String resultPath; + private String expectedResult; + + public PageRankITCase(Configuration config) { + super(config); + } + + @Override + protected void preSubmit() throws Exception { + resultPath = getTempDirPath("result"); + verticesPath = createTempFile("vertices.txt", PageRankData.VERTICES); + edgesPath = createTempFile("edges.txt", PageRankData.EDGES); + } + + @Override + protected void testProgram() throws Exception { + expectedResult = runProgram(curProgId); + } + + @Override + protected void postSubmit() throws Exception { + compareKeyValueParisWithDelta(expectedResult, resultPath, " ", 0.01); + } + + @Parameters + public static Collection getConfigurations() throws FileNotFoundException, IOException { + + LinkedList tConfigs = new LinkedList(); + + for(int i=1; i <= NUM_PROGRAMS; i++) { + Configuration config = new Configuration(); + config.setInteger("ProgramId", i); + tConfigs.add(config); + } + + return toParameterList(tConfigs); + } + + + public String runProgram(int progId) throws Exception { + + switch(progId) { + case 1: { + PageRankBasic.main(new String[] {verticesPath, edgesPath, resultPath, PageRankData.NUM_VERTICES+"", "3"}); + return PageRankData.RANKS_AFTER_3_ITERATIONS; + } + case 2: { + // start with a very high number of iteration such that the dynamic convergence criterion must handle termination + PageRankBasic.main(new String[] {verticesPath, edgesPath, resultPath, PageRankData.NUM_VERTICES+"", "1000"}); + return PageRankData.RANKS_AFTER_EPSILON_0_0001_CONVERGENCE; + } + + default: + throw new IllegalArgumentException("Invalid program id"); + } + } +} diff --git a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/RelationalQueryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/RelationalQueryITCase.java deleted file mode 100644 index ca5a7079d6062fc04401ad26a071aa4085f2c7e7..0000000000000000000000000000000000000000 --- a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/RelationalQueryITCase.java +++ /dev/null @@ -1,49 +0,0 @@ -///** -// * Licensed to the Apache Software Foundation (ASF) under one -// * or more contributor license agreements. See the NOTICE file -// * distributed with this work for additional information -// * regarding copyright ownership. The ASF licenses this file -// * to you under the Apache License, Version 2.0 (the -// * "License"); you may not use this file except in compliance -// * with the License. You may obtain a copy of the License at -// * -// * http://www.apache.org/licenses/LICENSE-2.0 -// * -// * Unless required by applicable law or agreed to in writing, software -// * distributed under the License is distributed on an "AS IS" BASIS, -// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// * See the License for the specific language governing permissions and -// * limitations under the License. -// */ -// -// -//package org.apache.flink.test.exampleScalaPrograms; -// -//import org.apache.flink.api.common.Plan; -//import org.apache.flink.configuration.Configuration; -//import org.apache.flink.examples.scala.relational.RelationalQuery; -//import org.junit.runner.RunWith; -//import org.junit.runners.Parameterized; -// -//import java.util.Locale; -// -//@RunWith(Parameterized.class) -//public class RelationalQueryITCase extends org.apache.flink.test.recordJobTests.TPCHQuery3ITCase { -// -// public RelationalQueryITCase(Configuration config) { -// super(config); -// Locale.setDefault(Locale.US); -// } -// -// @Override -// protected Plan getTestJob() { -// -// RelationalQuery tpch3 = new RelationalQuery(); -// return tpch3.getScalaPlan( -// config.getInteger("dop", 1), -// ordersPath, -// lineitemsPath, -// resultPath, -// 'F', 1993, "5"); -// } -//} diff --git a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/TransitiveClosureNaiveITCase.java b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/TransitiveClosureNaiveITCase.java deleted file mode 100644 index 4d0eb24e92102297d12bd76f0d31870380294de6..0000000000000000000000000000000000000000 --- a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/TransitiveClosureNaiveITCase.java +++ /dev/null @@ -1,54 +0,0 @@ -///** -// * Licensed to the Apache Software Foundation (ASF) under one -// * or more contributor license agreements. See the NOTICE file -// * distributed with this work for additional information -// * regarding copyright ownership. The ASF licenses this file -// * to you under the Apache License, Version 2.0 (the -// * "License"); you may not use this file except in compliance -// * with the License. You may obtain a copy of the License at -// * -// * http://www.apache.org/licenses/LICENSE-2.0 -// * -// * Unless required by applicable law or agreed to in writing, software -// * distributed under the License is distributed on an "AS IS" BASIS, -// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// * See the License for the specific language governing permissions and -// * limitations under the License. -// */ -// -// -//package org.apache.flink.test.exampleScalaPrograms; -// -//import org.apache.flink.api.common.Plan; -//import org.apache.flink.examples.scala.graph.TransitiveClosureNaive; -//import org.apache.flink.test.util.RecordAPITestBase; -// -//public class TransitiveClosureNaiveITCase extends RecordAPITestBase { -// -// protected String verticesPath = null; -// protected String edgesPath = null; -// protected String resultPath = null; -// -// private static final String VERTICES = "0\n1\n2"; -// private static final String EDGES = "0|1\n1|2"; -// private static final String EXPECTED = "0|0|0\n0|1|1\n0|2|2\n1|1|0\n1|2|1\n2|2|0"; -// -// @Override -// protected void preSubmit() throws Exception { -// verticesPath = createTempFile("vertices.txt", VERTICES); -// edgesPath = createTempFile("edges.txt", EDGES); -// resultPath = getTempDirPath("transitiveClosure"); -// } -// -// @Override -// protected Plan getTestJob() { -// TransitiveClosureNaive transitiveClosureNaive = new TransitiveClosureNaive(); -// // "2" is the number of iterations here -// return transitiveClosureNaive.getScalaPlan(DOP, 2, verticesPath, edgesPath, resultPath); -// } -// -// @Override -// protected void postSubmit() throws Exception { -// compareResultsByLinesInMemory(EXPECTED, resultPath); -// } -//} diff --git a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/WebLogAnalysisITCase.java b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/WebLogAnalysisITCase.java index 63c598cf20050d1750fbe6952c7f86668cf46918..2d8ad319af9c42f878ca25e083f911a012b080ae 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/WebLogAnalysisITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/WebLogAnalysisITCase.java @@ -1,32 +1,49 @@ -///** -// * Licensed to the Apache Software Foundation (ASF) under one -// * or more contributor license agreements. See the NOTICE file -// * distributed with this work for additional information -// * regarding copyright ownership. The ASF licenses this file -// * to you under the Apache License, Version 2.0 (the -// * "License"); you may not use this file except in compliance -// * with the License. You may obtain a copy of the License at -// * -// * http://www.apache.org/licenses/LICENSE-2.0 -// * -// * Unless required by applicable law or agreed to in writing, software -// * distributed under the License is distributed on an "AS IS" BASIS, -// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// * See the License for the specific language governing permissions and -// * limitations under the License. -// */ -// -// -//package org.apache.flink.test.exampleScalaPrograms; -// -//import org.apache.flink.api.common.Plan; -//import org.apache.flink.examples.scala.relational.WebLogAnalysis; -// -//public class WebLogAnalysisITCase extends org.apache.flink.test.recordJobTests.WebLogAnalysisITCase { -// -// @Override -// protected Plan getTestJob() { -// WebLogAnalysis webLogAnalysis = new WebLogAnalysis(); -// return webLogAnalysis.getScalaPlan(DOP, docsPath, ranksPath, visitsPath, resultPath); -// } -//} +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.exampleScalaPrograms; + + +import org.apache.flink.examples.scala.relational.WebLogAnalysis; +import org.apache.flink.test.testdata.WebLogAnalysisData; +import org.apache.flink.test.util.JavaProgramTestBase; + +public class WebLogAnalysisITCase extends JavaProgramTestBase { + + private String docsPath; + private String ranksPath; + private String visitsPath; + private String resultPath; + + @Override + protected void preSubmit() throws Exception { + docsPath = createTempFile("docs", WebLogAnalysisData.DOCS); + ranksPath = createTempFile("ranks", WebLogAnalysisData.RANKS); + visitsPath = createTempFile("visits", WebLogAnalysisData.VISITS); + resultPath = getTempDirPath("result"); + } + + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(WebLogAnalysisData.EXCEPTED_RESULT, resultPath); + } + @Override + protected void testProgram() throws Exception { + WebLogAnalysis.main(new String[]{docsPath, ranksPath, visitsPath, resultPath}); + } +} diff --git a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/WordCountPactValueITCase.java b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/WordCountPactValueITCase.java deleted file mode 100644 index 94ec2243484f07d80776e98be9f06b52953abfc2..0000000000000000000000000000000000000000 --- a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/WordCountPactValueITCase.java +++ /dev/null @@ -1,33 +0,0 @@ -///** -// * Licensed to the Apache Software Foundation (ASF) under one -// * or more contributor license agreements. See the NOTICE file -// * distributed with this work for additional information -// * regarding copyright ownership. The ASF licenses this file -// * to you under the Apache License, Version 2.0 (the -// * "License"); you may not use this file except in compliance -// * with the License. You may obtain a copy of the License at -// * -// * http://www.apache.org/licenses/LICENSE-2.0 -// * -// * Unless required by applicable law or agreed to in writing, software -// * distributed under the License is distributed on an "AS IS" BASIS, -// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// * See the License for the specific language governing permissions and -// * limitations under the License. -// */ -// -// -//package org.apache.flink.test.exampleScalaPrograms; -// -//import org.apache.flink.api.common.Plan; -//import org.apache.flink.examples.scala.wordcount.WordCountWithUserDefinedType; -// -// -//public class WordCountPactValueITCase extends org.apache.flink.test.recordJobTests.WordCountITCase { -// -// @Override -// protected Plan getTestJob() { -// WordCountWithUserDefinedType wc = new WordCountWithUserDefinedType(); -// return wc.getScalaPlan(DOP, textPath, resultPath); -// } -//} diff --git a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/WordCountWithCountFunctionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/WordCountWithCountFunctionITCase.java deleted file mode 100644 index 5f53f72b60a22132960c2bb4e58d81df226416df..0000000000000000000000000000000000000000 --- a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/WordCountWithCountFunctionITCase.java +++ /dev/null @@ -1,31 +0,0 @@ -///** -// * Licensed to the Apache Software Foundation (ASF) under one -// * or more contributor license agreements. See the NOTICE file -// * distributed with this work for additional information -// * regarding copyright ownership. The ASF licenses this file -// * to you under the Apache License, Version 2.0 (the -// * "License"); you may not use this file except in compliance -// * with the License. You may obtain a copy of the License at -// * -// * http://www.apache.org/licenses/LICENSE-2.0 -// * -// * Unless required by applicable law or agreed to in writing, software -// * distributed under the License is distributed on an "AS IS" BASIS, -// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// * See the License for the specific language governing permissions and -// * limitations under the License. -// */ -// -// -//package org.apache.flink.test.exampleScalaPrograms; -// -//import org.apache.flink.api.common.Plan; -//import org.apache.flink.examples.scala.wordcount.WordCountWithCount; -// -//public class WordCountWithCountFunctionITCase extends org.apache.flink.test.recordJobTests.WordCountITCase { -// -// @Override -// protected Plan getTestJob() { -// return new WordCountWithCount().getScalaPlan(DOP, textPath, resultPath); -// } -//}