提交 31ed0c4c 编写于 作者: A Aljoscha Krettek

[scala] Fix Formatting in Examples and add ITCases

Also actually use termination criterion in TransitivelClosureNaive
Java example.

Add ConnectedComponentsITCase for Scala Example

Also fix some formatting in the example code

Add WebLogAnalysisITCase for Scala Example

Some minor reformatting of example code and scaladoc.

Add ITCases for TriangleEnumeration Scala Examples

Also fix some formatting and make TriangleEnumerationOpt Scala produce the
same output as the Java version.

Add PageRankITCase for Scala Example

Also fix formatting in PageRank Scala Example.

Fix formatting in EnumTriangles Scala Examples

Remove Old/Deprecated Scala Examples and ITCases

Fix formatting in EnumTrianglesBasic.scala

Fix formatting in LinearRegression Scala Example

Remove old Scala LineRank Code and RelQuery Example

[scala] Fix typo in scaladoc in GroupedDataSet

[scala] Fix Scaladoc of Join and CoGroup Operation

Was still referring to the type of join/coGroup function that returns an
Option.

Fix tab vs. spaces in flink-scala and flink-scala-examples
上级 a41a29b4
......@@ -94,7 +94,7 @@ public class TransitiveClosureNaive implements ProgramDescription {
}
});
DataSet<Tuple2<Long, Long>> transitiveClosure = paths.closeWith(nextPaths);
DataSet<Tuple2<Long, Long>> transitiveClosure = paths.closeWith(nextPaths, newPaths);
// emit result
......
......@@ -33,17 +33,13 @@ import org.apache.flink.api.java.ExecutionEnvironment;
*/
public class ConnectedComponentsData {
public static final Object[][] VERTICES = new Object[][] {
new Object[]{1L}, new Object[]{2L}, new Object[]{3L}, new Object[]{4L},
new Object[]{5L},new Object[]{6L}, new Object[]{7L}, new Object[]{8L},
new Object[]{9L}, new Object[]{10L}, new Object[]{11L}, new Object[]{12L},
new Object[]{13L}, new Object[]{14L}, new Object[]{15L}, new Object[]{16L}
};
public static final long[] VERTICES = new long[] {
1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16};
public static DataSet<Long> getDefaultVertexDataSet(ExecutionEnvironment env) {
List<Long> verticesList = new LinkedList<Long>();
for (Object[] vertex : VERTICES) {
verticesList.add((Long) vertex[0]);
for (long vertexId : VERTICES) {
verticesList.add(vertexId);
}
return env.fromCollection(verticesList);
}
......
......@@ -38,11 +38,11 @@ under the License.
<artifactId>flink-scala</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java-examples</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java-examples</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
<build>
......@@ -54,7 +54,7 @@ under the License.
<version>3.1.4</version>
<executions>
<!-- Run scala compiler in the process-resources phase, so that dependencies on
scala classes can be resolved later in the (Java) compile phase -->
scala classes can be resolved later in the (Java) compile phase -->
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
......@@ -64,7 +64,7 @@ under the License.
</execution>
<!-- Run scala compiler in the process-test-resources phase, so that dependencies on
scala classes can be resolved later in the (Java) test-compile phase -->
scala classes can be resolved later in the (Java) test-compile phase -->
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
......@@ -237,7 +237,7 @@ under the License.
</includes>
</configuration>
</execution>
-->
-->
<execution>
<id>WordCount</id>
......@@ -260,7 +260,7 @@ under the License.
</includes>
</configuration>
</execution>
<!--
<!--
<execution>
<id>ConnectedComponents</id>
<phase>package</phase>
......@@ -282,27 +282,27 @@ under the License.
-->
<execution>
<id>TransitiveClosureNaive</id>
<phase>package</phase>
<goals>
<goal>jar</goal>
</goals>
<execution>
<id>TransitiveClosureNaive</id>
<phase>package</phase>
<goals>
<goal>jar</goal>
</goals>
<configuration>
<classifier>TransitiveClosureNaive</classifier>
<configuration>
<classifier>TransitiveClosureNaive</classifier>
<archive>
<manifestEntries>
<program-class>org.apache.flink.examples.scala.graph.TransitiveClosureNaive</program-class>
</manifestEntries>
</archive>
<archive>
<manifestEntries>
<program-class>org.apache.flink.examples.scala.graph.TransitiveClosureNaive</program-class>
</manifestEntries>
</archive>
<includes>
<include>**/wordcount/TransitiveClosureNaive*.class</include>
</includes>
</configuration>
</execution>
<includes>
<include>**/wordcount/TransitiveClosureNaive*.class</include>
</includes>
</configuration>
</execution>
</executions>
</plugin>
......
......@@ -24,27 +24,26 @@ import org.apache.flink.api.common.functions.GroupReduceFunction
import org.apache.flink.util.Collector
import org.apache.flink.examples.java.graph.util.EnumTrianglesData
import org.apache.flink.api.common.operators.Order
import scala.collection.mutable.MutableList
import scala.collection.mutable
/**
* Triangle enumeration is a pre-processing step to find closely connected parts in graphs.
* A triangle consists of three edges that connect three vertices with each other.
*
* <p>
* The algorithm works as follows:
* The algorithm works as follows:
* It groups all edges that share a common vertex and builds triads, i.e., triples of vertices
* that are connected by two edges. Finally, all triads are filtered for which no third edge exists
* that closes the triangle.
*
* <p>
* Input files are plain text files and must be formatted as follows:
* <ul>
* <li>Edges are represented as pairs for vertex IDs which are separated by space
* characters. Edges are separated by new-line characters.<br>
* For example <code>"1 2\n2 12\n1 12\n42 63\n"</code> gives four (undirected) edges (1)-(2), (2)-(12), (1)-(12), and (42)-(63)
* that include a triangle
* </ul>
*
* - Edges are represented as pairs for vertex IDs which are separated by space
* characters. Edges are separated by new-line characters.
* For example `"1 2\n2 12\n1 12\n42 63\n"` gives four (undirected) edges (1)-(2), (2)-(12),
* (1)-(12), and (42)-(63) that include a triangle
*
* <pre>
* (1)
* / \
......@@ -59,13 +58,11 @@ import scala.collection.mutable.MutableList
* If no parameters are provided, the program is run with default data from
* [[org.apache.flink.examples.java.graph.util.EnumTrianglesData]]
*
* <p>
* This example shows how to use:
* <ul>
* <li>Custom Java objects which extend Tuple
* <li>Group Sorting
* </ul>
*
*
* - Custom Java objects which extend Tuple
* - Group Sorting
*
*/
object EnumTrianglesBasic {
......@@ -91,7 +88,7 @@ object EnumTrianglesBasic {
// emit result
if (fileOutput) {
triangles.writeAsCsv(outputPath, "\n", " ")
triangles.writeAsCsv(outputPath, "\n", ",")
} else {
triangles.print()
}
......@@ -119,12 +116,12 @@ object EnumTrianglesBasic {
*/
class TriadBuilder extends GroupReduceFunction[Edge, Triad] {
val vertices = MutableList[Integer]()
val vertices = mutable.MutableList[Integer]()
override def reduce(edges: java.lang.Iterable[Edge], out: Collector[Triad]) = {
// clear vertex list
vertices.clear
vertices.clear()
// build and emit triads
for(e <- edges.asScala) {
......@@ -153,10 +150,10 @@ object EnumTrianglesBasic {
false
}
} else {
System.out.println("Executing Enum Triangles Basic example with built-in default data.");
System.out.println(" Provide parameters to read input data from files.");
System.out.println(" See the documentation for the correct format of input files.");
System.out.println(" Usage: EnumTriangleBasic <edge path> <result path>");
System.out.println("Executing Enum Triangles Basic example with built-in default data.")
System.out.println(" Provide parameters to read input data from files.")
System.out.println(" See the documentation for the correct format of input files.")
System.out.println(" Usage: EnumTriangleBasic <edge path> <result path>")
}
true
}
......
......@@ -26,6 +26,8 @@ import org.apache.flink.examples.java.graph.util.EnumTrianglesData
import org.apache.flink.api.common.operators.Order
import scala.collection.mutable.MutableList
import scala.collection.mutable
/**
* Triangle enumeration is a pre-processing step to find closely connected parts in graphs.
......
///**
// * Licensed to the Apache Software Foundation (ASF) under one
// * or more contributor license agreements. See the NOTICE file
// * distributed with this work for additional information
// * regarding copyright ownership. The ASF licenses this file
// * to you under the Apache License, Version 2.0 (the
// * "License"); you may not use this file except in compliance
// * with the License. You may obtain a copy of the License at
// *
// * http://www.apache.org/licenses/LICENSE-2.0
// *
// * Unless required by applicable law or agreed to in writing, software
// * distributed under the License is distributed on an "AS IS" BASIS,
// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// * See the License for the specific language governing permissions and
// * limitations under the License.
// */
//
//package org.apache.flink.examples.scala.graph
//
//import org.apache.flink.client.LocalExecutor
//import org.apache.flink.api.common.{ ProgramDescription, Program }
//
//import org.apache.flink.api.scala._
//import org.apache.flink.api.scala.operators._
//
//
//class LineRank extends Program with Serializable {
//
// case class Edge(source: Int, target: Int, weight: Double)
// case class VectorElement(index: Int, value: Double)
//
// override def getPlan(args: String*) = {
// getScalaPlan(args(0).toInt, args(1), args(2), args(3).toInt, args(4))
// }
//
// def sumElements(elem1: VectorElement, elem2: VectorElement) = VectorElement(elem1.index, elem1.value + elem2.value)
//
// def sgtTimes(SGT: DataSetOLD[Edge], vector: DataSetOLD[VectorElement]) = {
// SGT.join(vector).where(_.source).isEqualTo(_.index)
// .map((edge, elem) => VectorElement(edge.target, edge.weight * elem.value))
// .groupBy(_.index).reduce(sumElements)
// }
//
// def tgTimes(TG: DataSetOLD[Edge], vector: DataSetOLD[VectorElement]) = {
// TG.join(vector).where(_.target).isEqualTo(_.index)
// .map((edge, elem) => VectorElement(edge.source, edge.weight * elem.value))
// }
//
// def rescale(v3: DataSetOLD[VectorElement], c: Double, r: Double) = {
// v3.map(elem => { VectorElement(elem.index, c * elem.value + (1 - c) * r) })
// }
//
// def powerMethod(SGT: DataSetOLD[Edge], TG: DataSetOLD[Edge], d: DataSetOLD[VectorElement], c: Double, r: Double)(v: DataSetOLD[VectorElement]) = {
//
// val v1 = d.join(v).where(_.index).isEqualTo(_.index)
// .map((leftElem, rightElem) => VectorElement(leftElem.index, leftElem.value * rightElem.value))
//
// val v2 = sgtTimes(SGT, v1)
// val v3 = tgTimes(TG, v2)
// val nextV = rescale(v3, c, r)
//
// nextV
// }
//
// def getScalaPlan(numSubTasks: Int, sourceIncidenceMatrixPath: String, targetIncidenceMatrixPath: String, m: Int,
// outputPath: String) = {
//
// val c = .85
// val r = 1.0 / m
//
// val SGT = DataSource(sourceIncidenceMatrixPath, CsvInputFormat[Edge]())
// val TG = DataSource(targetIncidenceMatrixPath, CsvInputFormat[Edge]())
//
// val d1 = SGT.map(edge => VectorElement(edge.target, edge.weight))
// .groupBy(_.index)
// .reduce(sumElements)
//
// val d2 = tgTimes(TG, d1)
//
// val d = d2.map(elem => VectorElement(elem.index, 1 / elem.value))
//
// val initialV1 = d.map(elem => VectorElement(elem.index, elem.value * m))
// val initialV2 = sgtTimes(SGT, initialV1)
// val initialV3 = tgTimes(TG, initialV2)
// val initialV = rescale(initialV3, c, r)
//
// val v = initialV.iterate(5, powerMethod(SGT, TG, d, c, r))
//
// val output = v.write(outputPath, CsvOutputFormat())
//
// val plan = new ScalaPlan(Seq(output), "LineRank")
// plan.setDefaultParallelism(numSubTasks)
// plan
// }
//}
\ No newline at end of file
......@@ -109,4 +109,4 @@ object TransitiveClosureNaive {
env.fromCollection(edgeData)
}
}
}
}
\ No newline at end of file
///**
// * Licensed to the Apache Software Foundation (ASF) under one
// * or more contributor license agreements. See the NOTICE file
// * distributed with this work for additional information
// * regarding copyright ownership. The ASF licenses this file
// * to you under the Apache License, Version 2.0 (the
// * "License"); you may not use this file except in compliance
// * with the License. You may obtain a copy of the License at
// *
// * http://www.apache.org/licenses/LICENSE-2.0
// *
// * Unless required by applicable law or agreed to in writing, software
// * distributed under the License is distributed on an "AS IS" BASIS,
// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// * See the License for the specific language governing permissions and
// * limitations under the License.
// */
//
//
//package org.apache.flink.examples.scala.iterative
//
//import org.apache.flink.client.LocalExecutor
//import org.apache.flink.api.common.Program
//import org.apache.flink.api.common.ProgramDescription
//
//import org.apache.flink.api.scala._
//import org.apache.flink.api.scala.operators._
//
///**
// * Example of using the bulk iteration with termination criterion with the
// * scala api.
// */
//class TerminationCriterion extends Program with ProgramDescription with Serializable {
// override def getDescription() = {
// "Parameters: <maxNumberIterations> <output>"
// }
//
// override def getPlan(args: String*) = {
// getScalaPlan(args(0).toInt, args(1))
// }
//
// def getScalaPlan(maxNumberIterations: Int, resultOutput: String) = {
// val dataSource = CollectionDataSource[Double](List(1.0))
//
// val halve = (partialSolution: DataSetOLD[Double]) => {
// partialSolution map { x => x /2 }
// }
//
// val terminationCriterion = (prev: DataSetOLD[Double], cur: DataSetOLD[Double]) => {
// val diff = prev cross cur map { (valuePrev, valueCurrent) => math.abs(valuePrev - valueCurrent) }
// diff filter {
// difference => difference > 0.1
// }
// }
//
// val iteration = dataSource.iterateWithTermination(maxNumberIterations, halve, terminationCriterion)
//
//
// val sink = iteration.write(resultOutput, CsvOutputFormat())
//
// val plan = new ScalaPlan(Seq(sink))
// plan.setDefaultParallelism(1)
// plan
// }
//}
//
//object RunTerminationCriterion {
// def main(args: Array[String]) {
// val tc = new TerminationCriterion
//
// if(args.size < 2) {
// println(tc.getDescription())
// return
// }
// val plan = tc.getScalaPlan(args(0).toInt, args(1))
// LocalExecutor.execute(plan)
// }
//}
......@@ -28,58 +28,56 @@ import org.apache.flink.examples.java.ml.util.LinearRegressionData
import scala.collection.JavaConverters._
/**
* This example implements a basic Linear Regression to solve the y = theta0 + theta1*x problem using batch gradient descent algorithm.
* This example implements a basic Linear Regression to solve the y = theta0 + theta1*x problem
* using batch gradient descent algorithm.
*
* <p>
* Linear Regression with BGD(batch gradient descent) algorithm is an iterative clustering algorithm and works as follows:<br>
* Giving a data set and target set, the BGD try to find out the best parameters for the data set to fit the target set.
* In each iteration, the algorithm computes the gradient of the cost function and use it to update all the parameters.
* The algorithm terminates after a fixed number of iterations (as in this implementation)
* Linear Regression with BGD(batch gradient descent) algorithm is an iterative clustering
* algorithm and works as follows:
*
* Giving a data set and target set, the BGD try to find out the best parameters for the data set
* to fit the target set.
* In each iteration, the algorithm computes the gradient of the cost function and use it to
* update all the parameters.
* The algorithm terminates after a fixed number of iterations (as in this implementation).
* With enough iteration, the algorithm can minimize the cost function and find the best parameters
* This is the Wikipedia entry for the <a href = "http://en.wikipedia.org/wiki/Linear_regression">Linear regression</a> and <a href = "http://en.wikipedia.org/wiki/Gradient_descent">Gradient descent algorithm</a>.
*
* <p>
* This implementation works on one-dimensional data. And find the two-dimensional theta.<br>
* It find the best Theta parameter to fit the target.
*
* <p>
* This is the Wikipedia entry for the
* [[http://en.wikipedia.org/wiki/Linear_regression Linear regression]] and
* [[http://en.wikipedia.org/wiki/Gradient_descent Gradient descent algorithm]].
*
* This implementation works on one-dimensional data and finds the best two-dimensional theta to
* fit the target.
*
* Input files are plain text files and must be formatted as follows:
* <ul>
* <li>Data points are represented as two double values separated by a blank character. The first one represent the X(the training data) and the second represent the Y(target).
* Data points are separated by newline characters.<br>
* For example <code>"-0.02 -0.04\n5.3 10.6\n"</code> gives two data points (x=-0.02, y=-0.04) and (x=5.3, y=10.6).
* </ul>
*
* <p>
*
* - Data points are represented as two double values separated by a blank character. The first
* one represent the X(the training data) and the second represent the Y(target). Data points are
* separated by newline characters.
* For example `"-0.02 -0.04\n5.3 10.6\n"`gives two data points
* (x=-0.02, y=-0.04) and (x=5.3, y=10.6).
*
* This example shows how to use:
* <ul>
* <li> Bulk iterations
* <li> Broadcast variables in bulk iterations
* <li> Custom Java objects (PoJos)
* </ul>
*
* - Bulk iterations
* - Broadcast variables in bulk iterations
*/
object LinearRegression {
// *************************************************************************
// PROGRAM
// *************************************************************************
def main(args: Array[String]) {
if (!parseParameters(args)) {
return
}
val env = ExecutionEnvironment.getExecutionEnvironment
val data: DataSet[Data] = getDataSet(env)
val parameters: DataSet[Params] = getParamsDataSet(env)
val data = getDataSet(env)
val parameters = getParamsDataSet(env)
val result = parameters.iterate(numIterations) { currentParameters =>
val newParameters = data
.map(new SubUpdate).withBroadcastSet(currentParameters, "parameters")
.reduce { (val1, val2) =>
val new_theta0: Double = val1._1.getTheta0 + val2._1.getTheta0
val new_theta1: Double = val1._1.getTheta1 + val2._1.getTheta1
val result: Params = new Params(new_theta0, new_theta1)
(result, val1._2 + val2._2)
}
.reduce { (p1, p2) =>
val result = p1._1 + p2._1
(result, p1._2 + p2._2)
}
.map { x => x._1.div(x._2) }
newParameters
}
......@@ -88,73 +86,28 @@ object LinearRegression {
result.writeAsText(outputPath)
}
else {
result.print
result.print()
}
env.execute("Scala Linear Regression example")
}
// *************************************************************************
// DATA TYPES
// *************************************************************************
/**
* A simple data sample, x means the input, and y means the target.
*/
class Data extends Serializable {
def this(x: Double, y: Double) {
this()
this.x = x
this.y = y
}
override def toString: String = {
"(" + x + "|" + y + ")"
}
var x: Double = .0
var y: Double = .0
}
case class Data(var x: Double, var y: Double)
/**
* A set of parameters -- theta0, theta1.
*/
class Params extends Serializable {
case class Params(theta0: Double, theta1: Double) {
def div(a: Int): Params = {
Params(theta0 / a, theta1 / a)
}
def this(x0: Double, x1: Double) {
this()
this.theta0 = x0
this.theta1 = x1
}
override def toString: String = {
theta0 + " " + theta1
}
def getTheta0: Double = {
theta0
}
def getTheta1: Double = {
theta1
}
def setTheta0(theta0: Double) {
this.theta0 = theta0
}
def setTheta1(theta1: Double) {
this.theta1 = theta1
}
def div(a: Integer): Params = {
this.theta0 = theta0 / a
this.theta1 = theta1 / a
return this
}
private var theta0: Double = .0
private var theta1: Double = .0
}
def +(other: Params) = {
Params(theta0 + other.theta0, theta1 + other.theta1)
}
}
// *************************************************************************
// USER FUNCTIONS
......@@ -163,24 +116,22 @@ object LinearRegression {
/**
* Compute a single BGD type update for every parameters.
*/
class SubUpdate extends RichMapFunction[Data, Tuple2[Params, Integer]] {
class SubUpdate extends RichMapFunction[Data, (Params, Int)] {
private var parameters: Traversable[Params] = null
var parameter: Params = null
private var count: Int = 1
private var parameter: Params = null
/** Reads the parameters from a broadcast variable into a collection. */
override def open(parameters: Configuration) {
this.parameters = getRuntimeContext.getBroadcastVariable[Params]("parameters").asScala
val parameters = getRuntimeContext.getBroadcastVariable[Params]("parameters").asScala
parameter = parameters.head
}
def map(in: Data): Tuple2[Params, Integer] = {
for (p <- parameters) {
this.parameter = p
}
val theta_0: Double = parameter.getTheta0 - 0.01 * ((parameter.getTheta0 + (parameter.getTheta1 * in.x)) - in.y)
val theta_1: Double = parameter.getTheta1 - 0.01 * (((parameter.getTheta0 + (parameter.getTheta1 * in.x)) - in.y) * in.x)
new Tuple2[Params, Integer](new Params(theta_0, theta_1), count)
def map(in: Data): (Params, Int) = {
val theta0 =
parameter.theta0 - 0.01 * ((parameter.theta0 + (parameter.theta1 * in.x)) - in.y)
val theta1 =
parameter.theta1 - 0.01 * (((parameter.theta0 + (parameter.theta1 * in.x)) - in.y) * in.x)
(Params(theta0, theta1), 1)
}
}
......@@ -198,7 +149,7 @@ object LinearRegression {
if (programArguments.length == 3) {
dataPath = programArguments(0)
outputPath = programArguments(1)
numIterations = Integer.parseInt(programArguments(2))
numIterations = programArguments(2).toInt
}
else {
System.err.println("Usage: LinearRegression <data path> <result path> <num iterations>")
......@@ -206,11 +157,13 @@ object LinearRegression {
}
}
else {
System.out.println("Executing Linear Regression example with default parameters and built-in default data.")
System.out.println(" Provide parameters to read input data from files.")
System.out.println(" See the documentation for the correct format of input files.")
System.out.println(" We provide a data generator to create synthetic input files for this program.")
System.out.println(" Usage: LinearRegression <data path> <result path> <num iterations>")
System.out.println("Executing Linear Regression example with default parameters and " +
"built-in default data.")
System.out.println(" Provide parameters to read input data from files.")
System.out.println(" See the documentation for the correct format of input files.")
System.out.println(" We provide a data generator to create synthetic input files for this " +
"program.")
System.out.println(" Usage: LinearRegression <data path> <result path> <num iterations>")
}
true
}
......@@ -225,7 +178,7 @@ object LinearRegression {
}
else {
val data = LinearRegressionData.DATA map {
case Array(x, y) => new Data(x.asInstanceOf[Double], y.asInstanceOf[Double])
case Array(x, y) => Data(x.asInstanceOf[Double], y.asInstanceOf[Double])
}
env.fromCollection(data)
}
......@@ -233,7 +186,7 @@ object LinearRegression {
private def getParamsDataSet(env: ExecutionEnvironment): DataSet[Params] = {
val params = LinearRegressionData.PARAMS map {
case Array(x, y) => new Params(x.asInstanceOf[Double], y.asInstanceOf[Double])
case Array(x, y) => Params(x.asInstanceOf[Double], y.asInstanceOf[Double])
}
env.fromCollection(params)
}
......
///**
// * Licensed to the Apache Software Foundation (ASF) under one
// * or more contributor license agreements. See the NOTICE file
// * distributed with this work for additional information
// * regarding copyright ownership. The ASF licenses this file
// * to you under the Apache License, Version 2.0 (the
// * "License"); you may not use this file except in compliance
// * with the License. You may obtain a copy of the License at
// *
// * http://www.apache.org/licenses/LICENSE-2.0
// *
// * Unless required by applicable law or agreed to in writing, software
// * distributed under the License is distributed on an "AS IS" BASIS,
// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// * See the License for the specific language governing permissions and
// * limitations under the License.
// */
//
//
//package org.apache.flink.examples.scala.relational;
//
//import org.apache.flink.client.LocalExecutor
//import org.apache.flink.api.common.Program
//import org.apache.flink.api.common.ProgramDescription
//
//import org.apache.flink.api.scala._
//import org.apache.flink.api.scala.operators._
//
//
///**
// * The TPC-H is a decision support benchmark on relational data.
// * Its documentation and the data generator (DBGEN) can be found
// * on http://www.tpc.org/tpch/ .
// *
// * This Flink program implements a modified version of the query 3 of
// * the TPC-H benchmark including one join, some filtering and an
// * aggregation. The query resembles the following SQL statement:
// * <pre>
// * SELECT l_orderkey, o_shippriority, sum(l_extendedprice) as revenue
// * FROM orders, lineitem
// * WHERE l_orderkey = o_orderkey
// * AND o_orderstatus = "X"
// * AND YEAR(o_orderdate) > Y
// * AND o_orderpriority LIKE "Z%"
// * GROUP BY l_orderkey, o_shippriority;
// * </pre>
// */
//class RelationalQuery extends Program with ProgramDescription with Serializable {
//
// case class Order(orderId: Int, status: Char, year: Int, orderPriority: String, shipPriority: Int)
// case class LineItem(orderId: Int, extendedPrice: Double)
// case class PrioritizedOrder(orderId: Int, shipPriority: Int, revenue: Double)
//
//
// def getScalaPlan(numSubTasks: Int, ordersInput: String, lineItemsInput: String, ordersOutput: String, status: Char = 'F', minYear: Int = 1993, priority: String = "5") = {
//
// // ORDER intput: parse as CSV and select relevant fields
// val orders = DataSource(ordersInput, CsvInputFormat[(Int, String, String, String, String, String, String, Int)]("\n", '|'))
// .map { t => Order(t._1, t._3.charAt(0), t._5.substring(0,4).toInt, t._6, t._8) }
//
// // ORDER intput: parse as CSV and select relevant fields
// val lineItems = DataSource(lineItemsInput, CsvInputFormat[(Int, String, String, String, String, Double)]("\n", '|'))
// .map { t => LineItem(t._1, t._6) }
//
// // filter the orders input
// val filteredOrders = orders filter { o => o.status == status && o.year > minYear && o.orderPriority.startsWith(priority) }
//
// // join the filteres result with the lineitem input
// val prioritizedItems = filteredOrders join lineItems where { _.orderId } isEqualTo { _.orderId } map { (o, li) => PrioritizedOrder(o.orderId, o.shipPriority, li.extendedPrice) }
//
// // group by and sum the joined data
// val prioritizedOrders = prioritizedItems groupBy { pi => (pi.orderId, pi.shipPriority) } reduce { (po1, po2) => po1.copy(revenue = po1.revenue + po2.revenue) }
//
// // write the result as csv
// val output = prioritizedOrders.write(ordersOutput, CsvOutputFormat("\n", "|"))
//
// val plan = new ScalaPlan(Seq(output), "Relational Query")
// plan.setDefaultParallelism(numSubTasks)
// plan
// }
//
// override def getDescription() = {
// "Parameters: <orders>, <lineitem>, <output>, <degree-of-parallelism>"
// }
// override def getPlan(args: String*) = {
// getScalaPlan(args(3).toInt, args(0), args(1), args(2))
// }
//}
//
//
///**
// * Entry point to make the example standalone runnable with the local executor
// */
//object RunRelationalQuery {
//
// def main(args: Array[String]) {
// val query = new RelationalQuery
//
// if (args.size < 4) {
// println(query.getDescription)
// return
// }
// val plan = query.getScalaPlan(args(3).toInt, args(0), args(1), args(2))
// LocalExecutor.execute(plan)
// }
//}
//
///**
// * Licensed to the Apache Software Foundation (ASF) under one
// * or more contributor license agreements. See the NOTICE file
// * distributed with this work for additional information
// * regarding copyright ownership. The ASF licenses this file
// * to you under the Apache License, Version 2.0 (the
// * "License"); you may not use this file except in compliance
// * with the License. You may obtain a copy of the License at
// *
// * http://www.apache.org/licenses/LICENSE-2.0
// *
// * Unless required by applicable law or agreed to in writing, software
// * distributed under the License is distributed on an "AS IS" BASIS,
// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// * See the License for the specific language governing permissions and
// * limitations under the License.
// */
//
//
//package org.apache.flink.examples.scala.testing
//
//// Uncomment if you need to rebuild it for PackagedProgramEndToEndTest
////
////import org.apache.flink.api.common.Program
////import org.apache.flink.api.common.ProgramDescription
////
////import org.apache.flink.api.scala._
////import org.apache.flink.api.scala.operators._
////
////
////class KMeansForTest extends Program with ProgramDescription {
////
//// override def getPlan(args: String*) = {
//// getScalaPlan(args(0).toInt, args(1), args(2), args(3), args(4).toInt)
//// }
////
//// case class Point(x: Double, y: Double, z: Double) {
//// def computeEuclidianDistance(other: Point) = other match {
//// case Point(x2, y2, z2) => math.sqrt(math.pow(x - x2, 2) + math.pow(y - y2, 2) + math.pow(z - z2, 2))
//// }
//// }
////
//// case class Distance(dataPoint: Point, clusterId: Int, distance: Double)
////
//// def asPointSum = (pid: Int, dist: Distance) => dist.clusterId -> PointSum(1, dist.dataPoint)
////
//// // def sumPointSums = (dataPoints: Iterator[(Int, PointSum)]) => dataPoints.reduce { (z, v) => z.copy(_2 = z._2 + v._2) }
//// def sumPointSums = (dataPoints: Iterator[(Int, PointSum)]) => {
//// dataPoints.reduce { (z, v) => z.copy(_2 = z._2 + v._2) }
//// }
////
////
//// case class PointSum(count: Int, pointSum: Point) {
//// def +(that: PointSum) = that match {
//// case PointSum(c, Point(x, y, z)) => PointSum(count + c, Point(x + pointSum.x, y + pointSum.y, z + pointSum.z))
//// }
////
//// def toPoint() = Point(round(pointSum.x / count), round(pointSum.y / count), round(pointSum.z / count))
////
//// // Rounding ensures that we get the same results in a multi-iteration run
//// // as we do in successive single-iteration runs, since the output format
//// // only contains two decimal places.
//// private def round(d: Double) = math.round(d * 100.0) / 100.0;
//// }
////
//// def parseInput = (line: String) => {
//// val PointInputPattern = """(\d+)\|-?(\d+\.\d+)\|-?(\d+\.\d+)\|-?(\d+\.\d+)\|""".r
//// val PointInputPattern(id, x, y, z) = line
//// (id.toInt, Point(x.toDouble, y.toDouble, z.toDouble))
//// }
////
//// def formatOutput = (cid: Int, p: Point) => "%d|%.2f|%.2f|%.2f|".format(cid, p.x, p.y, p.z)
////
//// def computeDistance(p: (Int, Point), c: (Int, Point)) = {
//// val ((pid, dataPoint), (cid, clusterPoint)) = (p, c)
//// val distToCluster = dataPoint.computeEuclidianDistance(clusterPoint)
////
//// pid -> Distance(dataPoint, cid, distToCluster)
//// }
////
////
//// def getScalaPlan(numSubTasks: Int, dataPointInput: String, clusterInput: String, clusterOutput: String, numIterations: Int) = {
//// val dataPoints = DataSource(dataPointInput, DelimitedInputFormat(parseInput))
//// val clusterPoints = DataSource(clusterInput, DelimitedInputFormat(parseInput))
////
//// val finalCenters = clusterPoints.iterate(numIterations, { centers =>
////
//// val distances = dataPoints cross centers map computeDistance
//// val nearestCenters = distances groupBy { case (pid, _) => pid } reduceGroup { ds => ds.minBy(_._2.distance) } map asPointSum.tupled
//// val newCenters = nearestCenters groupBy { case (cid, _) => cid } reduceGroup sumPointSums map { case (cid, pSum) => cid -> pSum.toPoint() }
////
//// newCenters
//// })
////
//// val output = finalCenters.write(clusterOutput, DelimitedOutputFormat(formatOutput.tupled))
////
//// val plan = new ScalaPlan(Seq(output), "KMeans Iteration (ONLY FOR TESTING)")
//// plan.setDefaultParallelism(numSubTasks)
//// plan
//// }
////
//// override def getDescription() = {
//// "Parameters: [numSubStasksS] [dataPoints] [clusterCenters] [output] [numIterations]"
//// }
////}
\ No newline at end of file
///**
// * Licensed to the Apache Software Foundation (ASF) under one
// * or more contributor license agreements. See the NOTICE file
// * distributed with this work for additional information
// * regarding copyright ownership. The ASF licenses this file
// * to you under the Apache License, Version 2.0 (the
// * "License"); you may not use this file except in compliance
// * with the License. You may obtain a copy of the License at
// *
// * http://www.apache.org/licenses/LICENSE-2.0
// *
// * Unless required by applicable law or agreed to in writing, software
// * distributed under the License is distributed on an "AS IS" BASIS,
// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// * See the License for the specific language governing permissions and
// * limitations under the License.
// */
//
//
//package org.apache.flink.examples.scala.wordcount
//
//import org.apache.flink.client.LocalExecutor
//import org.apache.flink.api.common.Program
//import org.apache.flink.api.common.ProgramDescription
//
//import org.apache.flink.api.scala._
//import org.apache.flink.api.scala.operators._
//
//
///**
// * Implementation of word count in Scala. This example uses the built in count function for tuples.
// */
//class WordCountWithCount extends WordCount {
//
// override def getScalaPlan(numSubTasks: Int, textInput: String, wordsOutput: String) = {
// val input = TextFile(textInput)
//
// val words = input flatMap { _.toLowerCase().split("""\W+""") filter { _ != "" } }
// val counts = words groupBy { x => x } count()
//
// val output = counts.write(wordsOutput, CsvOutputFormat("\n", " "))
//
// val plan = new ScalaPlan(Seq(output), "Word Count")
// plan.setDefaultParallelism(numSubTasks)
// plan
// }
//}
//
//
///**
// * Entry point to make the example standalone runnable with the local executor.
// */
//object RunWordCountWithCount {
// def main(args: Array[String]) {
// val wc = new WordCountWithCount
// if (args.size < 3) {
// println(wc.getDescription)
// return
// }
// val plan = wc.getScalaPlan(args(0).toInt, args(1), args(2))
// LocalExecutor.execute(plan)
// }
//}
///**
// * Licensed to the Apache Software Foundation (ASF) under one
// * or more contributor license agreements. See the NOTICE file
// * distributed with this work for additional information
// * regarding copyright ownership. The ASF licenses this file
// * to you under the Apache License, Version 2.0 (the
// * "License"); you may not use this file except in compliance
// * with the License. You may obtain a copy of the License at
// *
// * http://www.apache.org/licenses/LICENSE-2.0
// *
// * Unless required by applicable law or agreed to in writing, software
// * distributed under the License is distributed on an "AS IS" BASIS,
// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// * See the License for the specific language governing permissions and
// * limitations under the License.
// */
//
//
//package org.apache.flink.examples.scala.wordcount
//
//import org.apache.flink.client.LocalExecutor
//import org.apache.flink.api.common.Program
//import org.apache.flink.api.common.ProgramDescription
//
//import org.apache.flink.types.IntValue
//import org.apache.flink.types.StringValue
//
//import org.apache.flink.api.scala._
//import org.apache.flink.api.scala.operators._
//
//
///**
// * Implementation of word count in Scala, using a user defined type rather than one of the
// * built-in supported types like primitives, tuples, or other (nested) case classes.
// */
//class WordCountWithUserDefinedType extends Program with Serializable {
//
// def getScalaPlan(numSubTasks: Int, textInput: String, wordsOutput: String) = {
// val input = TextFile(textInput)
//
// val words = input flatMap { _.toLowerCase().split("""\W+""") filter { _ != "" } map { w => (new StringValue(w), new IntValue(1)) } }
//
// val counts = words
// .groupBy { case (word, _) => word }
// .reduce { (w1, w2) => (w1._1, new IntValue(w1._2.getValue + w2._2.getValue)) }
//
// val output = counts.write(wordsOutput, CsvOutputFormat("\n", " "))
//
// val plan = new ScalaPlan(Seq(output), "Word Count (immutable)")
// plan.setDefaultParallelism(numSubTasks)
// plan
// }
//
//
// override def getPlan(args: String*) = {
// getScalaPlan(args(0).toInt, args(1), args(2))
// }
//}
......@@ -39,23 +39,23 @@ under the License.
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${project.version}</version>
......@@ -108,7 +108,7 @@ under the License.
<version>3.1.4</version>
<executions>
<!-- Run scala compiler in the process-resources phase, so that dependencies on
scala classes can be resolved later in the (Java) compile phase -->
scala classes can be resolved later in the (Java) compile phase -->
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
......@@ -118,7 +118,7 @@ under the License.
</execution>
<!-- Run scala compiler in the process-test-resources phase, so that dependencies on
scala classes can be resolved later in the (Java) test-compile phase -->
scala classes can be resolved later in the (Java) test-compile phase -->
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
......
......@@ -635,14 +635,12 @@ class DataSet[T: ClassTag](private[flink] val set: JavaDataSet[T]) {
* val left: DataSet[(String, Int, Int)] = ...
* val right: DataSet[(Int, String, Int)] = ...
* val joined = left.join(right).where(0).isEqualTo(1) { (l, r) =>
* if (l._2 > 4) {
* Some((l._2, r._3))
* } else {
* None
* }
* (l._1, r._2)
* }
* }}}
* This can be used to implement a filter directly in the join or to output more than one values:
* A join function with a [[Collector]] can be used to implement a filter directly in the join
* or to output more than one values. This type of join function does not return a value, instead
* values are emitted using the collector:
* {{{
* val left: DataSet[(String, Int, Int)] = ...
* val right: DataSet[(Int, String, Int)] = ...
......@@ -696,11 +694,12 @@ class DataSet[T: ClassTag](private[flink] val set: JavaDataSet[T]) {
* val right: DataSet[(Int, String, Int)] = ...
* val coGrouped = left.coGroup(right).where(0).isEqualTo(1) { (l, r) =>
* // l and r are of type TraversableOnce
* Some((l.min, r.max))
* (l.min, r.max)
* }
* }}}
* This can be used to implement a filter directly in the coGroup or to output more than one
* values:
* A coGroup function with a [[Collector]] can be used to implement a filter directly in the
* coGroup or to output more than one values. This type of coGroup function does not return a
* value, instead values are emitted using the collector
* {{{
* val left: DataSet[(String, Int, Int)] = ...
* val right: DataSet[(Int, String, Int)] = ...
......
......@@ -144,11 +144,9 @@ trait GroupedDataSet[T] {
}
/**
* /**
* Private implementation for [[GroupedDataSet]] to keep the implementation details, i.e. the
* parameters of the constructor, hidden.
*/
*/
private[flink] class GroupedDataSetImpl[T: ClassTag](
private val set: JavaDataSet[T],
private val keys: Keys[T])
......@@ -256,7 +254,7 @@ private[flink] class GroupedDataSetImpl[T: ClassTag](
}
def reduceGroup[R: TypeInformation: ClassTag](
fun: (TraversableOnce[T]) => R): DataSet[R] = {
fun: (TraversableOnce[T]) => R): DataSet[R] = {
Validate.notNull(fun, "Group reduce function must not be null.")
val reducer = new GroupReduceFunction[T, R] {
def reduce(in: java.lang.Iterable[T], out: Collector[R]) {
......
///**
// * Licensed to the Apache Software Foundation (ASF) under one
// * or more contributor license agreements. See the NOTICE file
// * distributed with this work for additional information
// * regarding copyright ownership. The ASF licenses this file
// * to you under the Apache License, Version 2.0 (the
// * "License"); you may not use this file except in compliance
// * with the License. You may obtain a copy of the License at
// *
// * http://www.apache.org/licenses/LICENSE-2.0
// *
// * Unless required by applicable law or agreed to in writing, software
// * distributed under the License is distributed on an "AS IS" BASIS,
// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// * See the License for the specific language governing permissions and
// * limitations under the License.
// */
//
//
//package org.apache.flink.test.exampleScalaPrograms;
//
//import org.apache.flink.api.common.Plan;
//import org.apache.flink.configuration.Configuration;
//import org.apache.flink.examples.scala.graph.ComputeEdgeDegrees;
//
//public class ComputeEdgeDegreesITCase extends org.apache.flink.test.recordJobTests.ComputeEdgeDegreesITCase {
//
// public ComputeEdgeDegreesITCase(Configuration config) {
// super(config);
// }
//
// @Override
// protected Plan getTestJob() {
// ComputeEdgeDegrees computeDegrees = new ComputeEdgeDegrees();
// return computeDegrees.getScalaPlan(
// config.getInteger("ComputeEdgeDegreesTest#NumSubtasks", DOP),
// edgesPath, resultPath);
// }
//}
///**
// * Licensed to the Apache Software Foundation (ASF) under one
// * or more contributor license agreements. See the NOTICE file
// * distributed with this work for additional information
// * regarding copyright ownership. The ASF licenses this file
// * to you under the Apache License, Version 2.0 (the
// * "License"); you may not use this file except in compliance
// * with the License. You may obtain a copy of the License at
// *
// * http://www.apache.org/licenses/LICENSE-2.0
// *
// * Unless required by applicable law or agreed to in writing, software
// * distributed under the License is distributed on an "AS IS" BASIS,
// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// * See the License for the specific language governing permissions and
// * limitations under the License.
// */
//
//
//package org.apache.flink.test.exampleScalaPrograms;
//
//import org.apache.flink.api.common.Plan;
//import org.apache.flink.examples.scala.graph.ConnectedComponents;
//
//public class ConnectedComponentsITCase extends org.apache.flink.test.iterative.ConnectedComponentsITCase {
//
// @Override
// protected Plan getTestJob() {
// ConnectedComponents cc = new ConnectedComponents();
// Plan plan = cc.getScalaPlan(verticesPath, edgesPath, resultPath, 100);
// plan.setDefaultParallelism(DOP);
// return plan;
// }
//}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.test.exampleScalaPrograms;
import org.apache.flink.examples.scala.graph.ConnectedComponents;
import org.apache.flink.test.testdata.ConnectedComponentsData;
import org.apache.flink.test.util.JavaProgramTestBase;
import java.io.BufferedReader;
public class ConnectedComponentsITCase extends JavaProgramTestBase {
private static final long SEED = 0xBADC0FFEEBEEFL;
private static final int NUM_VERTICES = 1000;
private static final int NUM_EDGES = 10000;
private String verticesPath;
private String edgesPath;
private String resultPath;
@Override
protected void preSubmit() throws Exception {
verticesPath = createTempFile("vertices.txt", ConnectedComponentsData.getEnumeratingVertices(NUM_VERTICES));
edgesPath = createTempFile("edges.txt", ConnectedComponentsData.getRandomOddEvenEdges(NUM_EDGES, NUM_VERTICES, SEED));
resultPath = getTempFilePath("results");
}
@Override
protected void testProgram() throws Exception {
ConnectedComponents.main(new String[] {verticesPath, edgesPath, resultPath, "100"});
}
@Override
protected void postSubmit() throws Exception {
for (BufferedReader reader : getResultReader(resultPath)) {
ConnectedComponentsData.checkOddEvenResult(reader);
}
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.test.exampleScalaPrograms;
import org.apache.flink.examples.scala.graph.EnumTrianglesBasic;
import org.apache.flink.test.testdata.EnumTriangleData;
import org.apache.flink.test.util.JavaProgramTestBase;
public class EnumTriangleBasicITCase extends JavaProgramTestBase {
protected String edgePath;
protected String resultPath;
@Override
protected void preSubmit() throws Exception {
edgePath = createTempFile("edges", EnumTriangleData.EDGES);
resultPath = getTempDirPath("triangles");
}
@Override
protected void postSubmit() throws Exception {
compareResultsByLinesInMemory(EnumTriangleData.TRIANGLES_BY_ID, resultPath);
}
@Override
protected void testProgram() throws Exception {
EnumTrianglesBasic.main(new String[] { edgePath, resultPath });
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.test.exampleScalaPrograms;
import org.apache.flink.examples.scala.graph.EnumTrianglesOpt;
import org.apache.flink.test.testdata.EnumTriangleData;
import org.apache.flink.test.util.JavaProgramTestBase;
public class EnumTriangleOptITCase extends JavaProgramTestBase {
protected String edgePath;
protected String resultPath;
@Override
protected void preSubmit() throws Exception {
edgePath = createTempFile("edges", EnumTriangleData.EDGES);
resultPath = getTempDirPath("triangles");
}
@Override
protected void postSubmit() throws Exception {
compareResultsByLinesInMemory(EnumTriangleData.TRIANGLES_BY_DEGREE, resultPath);
}
@Override
protected void testProgram() throws Exception {
EnumTrianglesOpt.main(new String[] { edgePath, resultPath });
}
}
///**
// * Licensed to the Apache Software Foundation (ASF) under one
// * or more contributor license agreements. See the NOTICE file
// * distributed with this work for additional information
// * regarding copyright ownership. The ASF licenses this file
// * to you under the Apache License, Version 2.0 (the
// * "License"); you may not use this file except in compliance
// * with the License. You may obtain a copy of the License at
// *
// * http://www.apache.org/licenses/LICENSE-2.0
// *
// * Unless required by applicable law or agreed to in writing, software
// * distributed under the License is distributed on an "AS IS" BASIS,
// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// * See the License for the specific language governing permissions and
// * limitations under the License.
// */
//
//
//package org.apache.flink.test.exampleScalaPrograms;
//
//import org.apache.flink.api.common.Plan;
//import org.apache.flink.configuration.Configuration;
//import org.apache.flink.examples.scala.graph.EnumTrianglesOnEdgesWithDegrees;
//
//public class EnumTrianglesOnEdgesWithDegreesITCase extends org.apache.flink.test.recordJobTests.EnumTrianglesOnEdgesWithDegreesITCase {
//
// public EnumTrianglesOnEdgesWithDegreesITCase(Configuration config) {
// super(config);
// }
//
// @Override
// protected Plan getTestJob() {
// EnumTrianglesOnEdgesWithDegrees enumTriangles = new EnumTrianglesOnEdgesWithDegrees();
// return enumTriangles.getScalaPlan(
// config.getInteger("EnumTrianglesTest#NumSubtasks", DOP),
// edgesPath, resultPath);
// }
//}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.test.exampleScalaPrograms;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.examples.scala.graph.PageRankBasic;
import org.apache.flink.test.testdata.PageRankData;
import org.apache.flink.test.util.JavaProgramTestBase;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Collection;
import java.util.LinkedList;
@RunWith(Parameterized.class)
public class PageRankITCase extends JavaProgramTestBase {
private static int NUM_PROGRAMS = 2;
private int curProgId = config.getInteger("ProgramId", -1);
private String verticesPath;
private String edgesPath;
private String resultPath;
private String expectedResult;
public PageRankITCase(Configuration config) {
super(config);
}
@Override
protected void preSubmit() throws Exception {
resultPath = getTempDirPath("result");
verticesPath = createTempFile("vertices.txt", PageRankData.VERTICES);
edgesPath = createTempFile("edges.txt", PageRankData.EDGES);
}
@Override
protected void testProgram() throws Exception {
expectedResult = runProgram(curProgId);
}
@Override
protected void postSubmit() throws Exception {
compareKeyValueParisWithDelta(expectedResult, resultPath, " ", 0.01);
}
@Parameters
public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
for(int i=1; i <= NUM_PROGRAMS; i++) {
Configuration config = new Configuration();
config.setInteger("ProgramId", i);
tConfigs.add(config);
}
return toParameterList(tConfigs);
}
public String runProgram(int progId) throws Exception {
switch(progId) {
case 1: {
PageRankBasic.main(new String[] {verticesPath, edgesPath, resultPath, PageRankData.NUM_VERTICES+"", "3"});
return PageRankData.RANKS_AFTER_3_ITERATIONS;
}
case 2: {
// start with a very high number of iteration such that the dynamic convergence criterion must handle termination
PageRankBasic.main(new String[] {verticesPath, edgesPath, resultPath, PageRankData.NUM_VERTICES+"", "1000"});
return PageRankData.RANKS_AFTER_EPSILON_0_0001_CONVERGENCE;
}
default:
throw new IllegalArgumentException("Invalid program id");
}
}
}
///**
// * Licensed to the Apache Software Foundation (ASF) under one
// * or more contributor license agreements. See the NOTICE file
// * distributed with this work for additional information
// * regarding copyright ownership. The ASF licenses this file
// * to you under the Apache License, Version 2.0 (the
// * "License"); you may not use this file except in compliance
// * with the License. You may obtain a copy of the License at
// *
// * http://www.apache.org/licenses/LICENSE-2.0
// *
// * Unless required by applicable law or agreed to in writing, software
// * distributed under the License is distributed on an "AS IS" BASIS,
// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// * See the License for the specific language governing permissions and
// * limitations under the License.
// */
//
//
//package org.apache.flink.test.exampleScalaPrograms;
//
//import org.apache.flink.api.common.Plan;
//import org.apache.flink.configuration.Configuration;
//import org.apache.flink.examples.scala.relational.RelationalQuery;
//import org.junit.runner.RunWith;
//import org.junit.runners.Parameterized;
//
//import java.util.Locale;
//
//@RunWith(Parameterized.class)
//public class RelationalQueryITCase extends org.apache.flink.test.recordJobTests.TPCHQuery3ITCase {
//
// public RelationalQueryITCase(Configuration config) {
// super(config);
// Locale.setDefault(Locale.US);
// }
//
// @Override
// protected Plan getTestJob() {
//
// RelationalQuery tpch3 = new RelationalQuery();
// return tpch3.getScalaPlan(
// config.getInteger("dop", 1),
// ordersPath,
// lineitemsPath,
// resultPath,
// 'F', 1993, "5");
// }
//}
///**
// * Licensed to the Apache Software Foundation (ASF) under one
// * or more contributor license agreements. See the NOTICE file
// * distributed with this work for additional information
// * regarding copyright ownership. The ASF licenses this file
// * to you under the Apache License, Version 2.0 (the
// * "License"); you may not use this file except in compliance
// * with the License. You may obtain a copy of the License at
// *
// * http://www.apache.org/licenses/LICENSE-2.0
// *
// * Unless required by applicable law or agreed to in writing, software
// * distributed under the License is distributed on an "AS IS" BASIS,
// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// * See the License for the specific language governing permissions and
// * limitations under the License.
// */
//
//
//package org.apache.flink.test.exampleScalaPrograms;
//
//import org.apache.flink.api.common.Plan;
//import org.apache.flink.examples.scala.graph.TransitiveClosureNaive;
//import org.apache.flink.test.util.RecordAPITestBase;
//
//public class TransitiveClosureNaiveITCase extends RecordAPITestBase {
//
// protected String verticesPath = null;
// protected String edgesPath = null;
// protected String resultPath = null;
//
// private static final String VERTICES = "0\n1\n2";
// private static final String EDGES = "0|1\n1|2";
// private static final String EXPECTED = "0|0|0\n0|1|1\n0|2|2\n1|1|0\n1|2|1\n2|2|0";
//
// @Override
// protected void preSubmit() throws Exception {
// verticesPath = createTempFile("vertices.txt", VERTICES);
// edgesPath = createTempFile("edges.txt", EDGES);
// resultPath = getTempDirPath("transitiveClosure");
// }
//
// @Override
// protected Plan getTestJob() {
// TransitiveClosureNaive transitiveClosureNaive = new TransitiveClosureNaive();
// // "2" is the number of iterations here
// return transitiveClosureNaive.getScalaPlan(DOP, 2, verticesPath, edgesPath, resultPath);
// }
//
// @Override
// protected void postSubmit() throws Exception {
// compareResultsByLinesInMemory(EXPECTED, resultPath);
// }
//}
///**
// * Licensed to the Apache Software Foundation (ASF) under one
// * or more contributor license agreements. See the NOTICE file
// * distributed with this work for additional information
// * regarding copyright ownership. The ASF licenses this file
// * to you under the Apache License, Version 2.0 (the
// * "License"); you may not use this file except in compliance
// * with the License. You may obtain a copy of the License at
// *
// * http://www.apache.org/licenses/LICENSE-2.0
// *
// * Unless required by applicable law or agreed to in writing, software
// * distributed under the License is distributed on an "AS IS" BASIS,
// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// * See the License for the specific language governing permissions and
// * limitations under the License.
// */
//
//
//package org.apache.flink.test.exampleScalaPrograms;
//
//import org.apache.flink.api.common.Plan;
//import org.apache.flink.examples.scala.relational.WebLogAnalysis;
//
//public class WebLogAnalysisITCase extends org.apache.flink.test.recordJobTests.WebLogAnalysisITCase {
//
// @Override
// protected Plan getTestJob() {
// WebLogAnalysis webLogAnalysis = new WebLogAnalysis();
// return webLogAnalysis.getScalaPlan(DOP, docsPath, ranksPath, visitsPath, resultPath);
// }
//}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.test.exampleScalaPrograms;
import org.apache.flink.examples.scala.relational.WebLogAnalysis;
import org.apache.flink.test.testdata.WebLogAnalysisData;
import org.apache.flink.test.util.JavaProgramTestBase;
public class WebLogAnalysisITCase extends JavaProgramTestBase {
private String docsPath;
private String ranksPath;
private String visitsPath;
private String resultPath;
@Override
protected void preSubmit() throws Exception {
docsPath = createTempFile("docs", WebLogAnalysisData.DOCS);
ranksPath = createTempFile("ranks", WebLogAnalysisData.RANKS);
visitsPath = createTempFile("visits", WebLogAnalysisData.VISITS);
resultPath = getTempDirPath("result");
}
@Override
protected void postSubmit() throws Exception {
compareResultsByLinesInMemory(WebLogAnalysisData.EXCEPTED_RESULT, resultPath);
}
@Override
protected void testProgram() throws Exception {
WebLogAnalysis.main(new String[]{docsPath, ranksPath, visitsPath, resultPath});
}
}
///**
// * Licensed to the Apache Software Foundation (ASF) under one
// * or more contributor license agreements. See the NOTICE file
// * distributed with this work for additional information
// * regarding copyright ownership. The ASF licenses this file
// * to you under the Apache License, Version 2.0 (the
// * "License"); you may not use this file except in compliance
// * with the License. You may obtain a copy of the License at
// *
// * http://www.apache.org/licenses/LICENSE-2.0
// *
// * Unless required by applicable law or agreed to in writing, software
// * distributed under the License is distributed on an "AS IS" BASIS,
// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// * See the License for the specific language governing permissions and
// * limitations under the License.
// */
//
//
//package org.apache.flink.test.exampleScalaPrograms;
//
//import org.apache.flink.api.common.Plan;
//import org.apache.flink.examples.scala.wordcount.WordCountWithUserDefinedType;
//
//
//public class WordCountPactValueITCase extends org.apache.flink.test.recordJobTests.WordCountITCase {
//
// @Override
// protected Plan getTestJob() {
// WordCountWithUserDefinedType wc = new WordCountWithUserDefinedType();
// return wc.getScalaPlan(DOP, textPath, resultPath);
// }
//}
///**
// * Licensed to the Apache Software Foundation (ASF) under one
// * or more contributor license agreements. See the NOTICE file
// * distributed with this work for additional information
// * regarding copyright ownership. The ASF licenses this file
// * to you under the Apache License, Version 2.0 (the
// * "License"); you may not use this file except in compliance
// * with the License. You may obtain a copy of the License at
// *
// * http://www.apache.org/licenses/LICENSE-2.0
// *
// * Unless required by applicable law or agreed to in writing, software
// * distributed under the License is distributed on an "AS IS" BASIS,
// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// * See the License for the specific language governing permissions and
// * limitations under the License.
// */
//
//
//package org.apache.flink.test.exampleScalaPrograms;
//
//import org.apache.flink.api.common.Plan;
//import org.apache.flink.examples.scala.wordcount.WordCountWithCount;
//
//public class WordCountWithCountFunctionITCase extends org.apache.flink.test.recordJobTests.WordCountITCase {
//
// @Override
// protected Plan getTestJob() {
// return new WordCountWithCount().getScalaPlan(DOP, textPath, resultPath);
// }
//}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册