提交 4cce46eb 编写于 作者: M mbalassi 提交者: Aljoscha Krettek

Renamed java examples package

上级 7fe9273f
......@@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.flink.example.java.clustering;
package org.apache.flink.examples.java.clustering;
import java.io.Serializable;
import java.util.Collection;
......@@ -27,7 +27,7 @@ import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.example.java.clustering.util.KMeansData;
import org.apache.flink.examples.java.clustering.util.KMeansData;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.IterativeDataSet;
......
......@@ -17,12 +17,12 @@
*/
package org.apache.flink.example.java.clustering.util;
package org.apache.flink.examples.java.clustering.util;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.example.java.clustering.KMeans.Centroid;
import org.apache.flink.example.java.clustering.KMeans.Point;
import org.apache.flink.examples.java.clustering.KMeans.Centroid;
import org.apache.flink.examples.java.clustering.KMeans.Point;
import java.util.LinkedList;
import java.util.List;
......
......@@ -17,7 +17,7 @@
*/
package org.apache.flink.example.java.clustering.util;
package org.apache.flink.examples.java.clustering.util;
import java.io.BufferedWriter;
import java.io.File;
......@@ -27,7 +27,7 @@ import java.text.DecimalFormat;
import java.util.Locale;
import java.util.Random;
import org.apache.flink.example.java.clustering.KMeans;
import org.apache.flink.examples.java.clustering.KMeans;
/**
* Generates data for the {@link KMeans} example program.
......
......@@ -17,7 +17,7 @@
*/
package org.apache.flink.example.java.graph;
package org.apache.flink.examples.java.graph;
import org.apache.flink.api.common.ProgramDescription;
import org.apache.flink.api.common.functions.FlatJoinFunction;
......@@ -34,7 +34,7 @@ import org.apache.flink.util.Collector;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.example.java.graph.util.ConnectedComponentsData;
import org.apache.flink.examples.java.graph.util.ConnectedComponentsData;
/**
* An implementation of the connected components algorithm, using a delta iteration.
......
......@@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.flink.example.java.graph;
package org.apache.flink.examples.java.graph;
import java.util.ArrayList;
import java.util.Iterator;
......@@ -30,9 +30,9 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.example.java.graph.util.EnumTrianglesData;
import org.apache.flink.example.java.graph.util.EnumTrianglesDataTypes.Edge;
import org.apache.flink.example.java.graph.util.EnumTrianglesDataTypes.Triad;
import org.apache.flink.examples.java.graph.util.EnumTrianglesData;
import org.apache.flink.examples.java.graph.util.EnumTrianglesDataTypes.Edge;
import org.apache.flink.examples.java.graph.util.EnumTrianglesDataTypes.Triad;
/**
* Triangle enumeration is a pre-processing step to find closely connected parts in graphs.
......
......@@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.flink.example.java.graph;
package org.apache.flink.examples.java.graph;
import java.util.ArrayList;
import java.util.Iterator;
......@@ -32,10 +32,10 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.example.java.graph.util.EnumTrianglesData;
import org.apache.flink.example.java.graph.util.EnumTrianglesDataTypes.Edge;
import org.apache.flink.example.java.graph.util.EnumTrianglesDataTypes.EdgeWithDegrees;
import org.apache.flink.example.java.graph.util.EnumTrianglesDataTypes.Triad;
import org.apache.flink.examples.java.graph.util.EnumTrianglesData;
import org.apache.flink.examples.java.graph.util.EnumTrianglesDataTypes.Edge;
import org.apache.flink.examples.java.graph.util.EnumTrianglesDataTypes.EdgeWithDegrees;
import org.apache.flink.examples.java.graph.util.EnumTrianglesDataTypes.Triad;
/**
* Triangle enumeration is a pre-processing step to find closely connected parts in graphs.
......
......@@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.flink.example.java.graph;
package org.apache.flink.examples.java.graph;
import static org.apache.flink.api.java.aggregation.Aggregations.SUM;
......@@ -33,7 +33,7 @@ import org.apache.flink.util.Collector;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.flink.example.java.graph.util.PageRankData;
import org.apache.flink.examples.java.graph.util.PageRankData;
/**
* A basic implementation of the Page Rank algorithm using a bulk iteration.
......
......@@ -17,7 +17,7 @@
*/
package org.apache.flink.example.java.graph;
package org.apache.flink.examples.java.graph;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
......@@ -25,10 +25,9 @@ import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.flink.api.common.ProgramDescription;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.example.java.graph.util.ConnectedComponentsData;
import org.apache.flink.examples.java.graph.util.ConnectedComponentsData;
import org.apache.flink.util.Collector;
import java.util.HashSet;
......
......@@ -17,7 +17,7 @@
*/
package org.apache.flink.example.java.graph.util;
package org.apache.flink.examples.java.graph.util;
import java.util.LinkedList;
import java.util.List;
......
......@@ -17,14 +17,14 @@
*/
package org.apache.flink.example.java.graph.util;
package org.apache.flink.examples.java.graph.util;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.example.java.graph.util.EnumTrianglesDataTypes.Edge;
import org.apache.flink.examples.java.graph.util.EnumTrianglesDataTypes.Edge;
/**
* Provides the default data sets used for the Triangle Enumeration example programs.
......
......@@ -17,7 +17,7 @@
*/
package org.apache.flink.example.java.graph.util;
package org.apache.flink.examples.java.graph.util;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
......
......@@ -17,7 +17,7 @@
*/
package org.apache.flink.example.java.graph.util;
package org.apache.flink.examples.java.graph.util;
import java.util.ArrayList;
import java.util.List;
......
......@@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.flink.example.java.misc;
package org.apache.flink.examples.java.misc;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
......
......@@ -17,7 +17,7 @@
*/
package org.apache.flink.example.java.ml;
package org.apache.flink.examples.java.ml;
import java.io.Serializable;
import java.util.Collection;
......@@ -27,7 +27,7 @@ import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.example.java.ml.util.LinearRegressionData;
import org.apache.flink.examples.java.ml.util.LinearRegressionData;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.IterativeDataSet;
......
......@@ -16,12 +16,12 @@
* limitations under the License.
*/
package org.apache.flink.example.java.ml.util;
package org.apache.flink.examples.java.ml.util;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.example.java.ml.LinearRegression.Data;
import org.apache.flink.example.java.ml.LinearRegression.Params;
import org.apache.flink.examples.java.ml.LinearRegression.Data;
import org.apache.flink.examples.java.ml.LinearRegression.Params;
import java.util.LinkedList;
import java.util.List;
......
......@@ -17,7 +17,7 @@
*/
package org.apache.flink.example.java.ml.util;
package org.apache.flink.examples.java.ml.util;
import java.io.BufferedWriter;
import java.io.File;
......@@ -28,7 +28,7 @@ import java.util.Locale;
import java.util.Random;
/**
* Generates data for the {@link org.apache.flink.example.java.ml.LinearRegression} example program.
* Generates data for the {@link org.apache.flink.examples.java.ml.LinearRegression} example program.
*/
public class LinearRegressionDataGenerator {
......@@ -43,7 +43,7 @@ public class LinearRegressionDataGenerator {
private static final char DELIMITER = ' ';
/**
* Main method to generate data for the {@link org.apache.flink.example.java.ml.LinearRegression} example program.
* Main method to generate data for the {@link org.apache.flink.examples.java.ml.LinearRegression} example program.
* <p>
* The generator creates to files:
* <ul>
......
......@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.example.java.relational;
package org.apache.flink.examples.java.relational;
import java.io.IOException;
import java.util.ArrayList;
......
......@@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.flink.example.java.relational;
package org.apache.flink.examples.java.relational;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.java.aggregation.Aggregations;
......
......@@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.flink.example.java.relational;
package org.apache.flink.examples.java.relational;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
......
......@@ -17,7 +17,7 @@
*/
package org.apache.flink.example.java.relational;
package org.apache.flink.examples.java.relational;
import java.text.DateFormat;
import java.text.ParseException;
......
......@@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.flink.example.java.relational;
package org.apache.flink.examples.java.relational;
import org.apache.flink.api.common.functions.CoGroupFunction;
......@@ -27,8 +27,8 @@ import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.util.Collector;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.example.java.relational.util.WebLogData;
import org.apache.flink.example.java.relational.util.WebLogDataGenerator;
import org.apache.flink.examples.java.relational.util.WebLogData;
import org.apache.flink.examples.java.relational.util.WebLogDataGenerator;
/**
* This program processes web logs and relational data.
......
......@@ -17,7 +17,7 @@
*/
package org.apache.flink.example.java.relational.util;
package org.apache.flink.examples.java.relational.util;
import java.util.ArrayList;
import java.util.List;
......
......@@ -17,14 +17,14 @@
*/
package org.apache.flink.example.java.relational.util;
package org.apache.flink.examples.java.relational.util;
import java.io.FileWriter;
import java.io.IOException;
import java.util.Calendar;
import java.util.Random;
import org.apache.flink.example.java.relational.WebLogAnalysis;
import org.apache.flink.examples.java.relational.WebLogAnalysis;
/**
* Data generator for the {@link WebLogAnalysis} example program.
......
......@@ -16,14 +16,14 @@
* limitations under the License.
*/
package org.apache.flink.example.java.wordcount;
package org.apache.flink.examples.java.wordcount;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.example.java.wordcount.util.WordCountData;
import org.apache.flink.examples.java.wordcount.util.WordCountData;
/**
* Implements the "WordCount" program that computes a simple word occurrence histogram
......
......@@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.flink.example.java.wordcount.util;
package org.apache.flink.examples.java.wordcount.util;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
......
......@@ -20,7 +20,7 @@ package org.apache.flink.examples.scala.clustering
import org.apache.flink.api.common.functions._
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration
import org.apache.flink.example.java.clustering.util.KMeansData
import org.apache.flink.examples.java.clustering.util.KMeansData
import scala.collection.JavaConverters._
......
......@@ -15,56 +15,41 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.examples.scala.graph;
package org.apache.flink.examples.scala.graph
import org.apache.flink.api.scala._
import org.apache.flink.example.java.graph.util.ConnectedComponentsData
import org.apache.flink.api.java.aggregation.Aggregations
import org.apache.flink.util.Collector
import org.apache.flink.examples.java.graph.util.ConnectedComponentsData
/**
* An implementation of the connected components algorithm, using a delta iteration.
*
* Initially, the algorithm assigns each vertex an unique ID. In each step, a vertex picks the minimum of its own ID and its
* neighbors' IDs, as its new ID and tells its neighbors about its new ID. After the algorithm has completed, all vertices in the
* same component will have the same ID.
*
* A vertex whose component ID did not change needs not propagate its information in the next step. Because of that,
* the algorithm is easily expressible via a delta iteration. We here model the solution set as the vertices with
* their current component ids, and the workset as the changed vertices. Because we see all vertices initially as
* changed, the initial workset and the initial solution set are identical. Also, the delta to the solution set
* is consequently also the next workset.
*
* Input files are plain text files and must be formatted as follows:
*
* - Vertices represented as IDs and separated by new-line characters.
*
* For example
* {{{
* "1\n2\n12\n42\n63\n"
* }}}
* gives five vertices (1), (2), (12), (42), and (63).
*
*
* - Edges are represented as pairs for vertex IDs which are separated by space
* characters. Edges are separated by new-line characters.
*
* For example
* {{{
* "1 2\n2 12\n1 12\n42 63\n"
* }}}
* gives four (undirected) edges (1)-(2), (2)-(12), (1)-(12), and (42)-(63).
* Initially, the algorithm assigns each vertex an unique ID. In each step, a vertex picks the
* minimum of its own ID and its neighbors' IDs, as its new ID and tells its neighbors about its
* new ID. After the algorithm has completed, all vertices in the same component will have the same
* ID.
*
* A vertex whose component ID did not change needs not propagate its information in the next
* step. Because of that, the algorithm is easily expressible via a delta iteration. We here model
* the solution set as the vertices with their current component ids, and the workset as the changed
* vertices. Because we see all vertices initially as changed, the initial workset and the initial
* solution set are identical. Also, the delta to the solution set is consequently also the next
* workset.
*
* Input files are plain text files and must be formatted as follows:
*
* - Vertices represented as IDs and separated by new-line characters. For example,
* `"1\n2\n12\n42\n63\n"` gives five vertices (1), (2), (12), (42), and (63).
* - Edges are represented as pairs for vertex IDs which are separated by space characters. Edges
* are separated by new-line characters. For example `"1 2\n2 12\n1 12\n42 63\n"`
* gives four (undirected) edges (1)-(2), (2)-(12), (1)-(12), and (42)-(63).
*
* Usage:
* {{{
* ConnectedComponents <vertices path> <edges path> <result path> <max number of iterations>
* }}}
* {{{
* ConnectedComponents <vertices path> <edges path> <result path> <max number of iterations>
* }}}
*
* If no parameters are provided, the program is run with default data from
* [[org.apache.flink.example.java.graph.util.ConnectedComponentsData]]
* and 10 iterations.
* [[org.apache.flink.example.java.graph.util.ConnectedComponentsData]] and 10 iterations.
*
*
* This example shows how to use:
......@@ -78,46 +63,37 @@ object ConnectedComponents {
if (!parseParameters(args)) {
return
}
// set up execution environment
val env = ExecutionEnvironment.getExecutionEnvironment
// read vertex and edge data
// read vertex and edge data
// assign the initial components (equal to the vertex id)
val vertices = getVerticesDataSet(env).map(id => (id, id))
val vertices = getVerticesDataSet(env).map { id => (id, id) }
// undirected edges by emitting for each input edge the input edges itself and an inverted version
val edges = getEdgesDataSet(env)
.flatMap { (edge, out: Collector[(Long, Long)]) =>
out.collect(edge)
out.collect((edge._2, edge._1))
}
val edges = getEdgesDataSet(env).flatMap { edge => Seq(edge, (edge._2, edge._1)) }
// open a delta iteration
val verticesWithComponents = vertices.iterateDelta(vertices, maxIterations, Array(0)) {
(s, ws) => {
(s, ws) =>
// apply the step logic: join with the edges
val allNeighbors = ws.join(edges).where(0).equalTo(0) { (vertex, edge) =>
Some((edge._2, vertex._2))
}
// select the minimum neighbor
val minNeighbors = allNeighbors.groupBy(0).min(1)
// apply the step logic: join with the edges
val allNeighbors = ws.join(edges)
.where(0).equalTo(0)
.map { in => (in._2._2, in._1._2) }
// select the minimum neighbor
val minNeighbors = allNeighbors.groupBy(0).aggregate(Aggregations.MIN, 1)
// update if the component of the candidate is smaller
val updatedComponents = minNeighbors.join(s).where(0).equalTo(0)
.flatMap { newAndOldComponent =>
newAndOldComponent match {
case ((vId, cNew), (_, cOld)) if cNew < cOld => Some((vId, cNew))
case _ => None
}
}
val updatedComponents = minNeighbors.join(s).where(0).equalTo(0) {
(newVertex, oldVertex) => if (newVertex._2 < oldVertex._2) Some(newVertex) else None
}
// delta and new workset are identical
(updatedComponents, updatedComponents)
}
(updatedComponents, updatedComponents)
}
if (fileOutput) {
if (fileOutput) {
verticesWithComponents.writeAsCsv(outputPath, "\n", " ")
} else {
verticesWithComponents.print()
......@@ -154,10 +130,7 @@ object ConnectedComponents {
.map { x => x._1 }
}
else {
val vertexData = ConnectedComponentsData.VERTICES map {
case Array(x) => x.asInstanceOf[Long]
}
env.fromCollection(vertexData);
env.fromCollection(ConnectedComponentsData.VERTICES)
}
}
......@@ -173,7 +146,7 @@ object ConnectedComponents {
val edgeData = ConnectedComponentsData.EDGES map {
case Array(x, y) => (x.asInstanceOf[Long], y.asInstanceOf[Long])
}
env.fromCollection(edgeData);
env.fromCollection(edgeData)
}
}
......@@ -182,4 +155,4 @@ object ConnectedComponents {
private var edgesPath: String = null
private var maxIterations: Int = 10
private var outputPath: String = null
}
\ No newline at end of file
}
......@@ -22,7 +22,7 @@ import scala.collection.JavaConverters._
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.common.functions.GroupReduceFunction
import org.apache.flink.util.Collector
import org.apache.flink.example.java.graph.util.EnumTrianglesData
import org.apache.flink.examples.java.graph.util.EnumTrianglesData
import org.apache.flink.api.common.operators.Order
import scala.collection.mutable.MutableList
......@@ -175,4 +175,4 @@ object EnumTrianglesBasic {
private var edgePath: String = null
private var outputPath: String = null
}
\ No newline at end of file
}
......@@ -22,7 +22,7 @@ import scala.collection.JavaConverters._
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.common.functions.GroupReduceFunction
import org.apache.flink.util.Collector
import org.apache.flink.example.java.graph.util.EnumTrianglesData
import org.apache.flink.examples.java.graph.util.EnumTrianglesData
import org.apache.flink.api.common.operators.Order
import scala.collection.mutable.MutableList
......@@ -241,4 +241,4 @@ object EnumTrianglesOpt {
private var edgePath: String = null
private var outputPath: String = null
}
\ No newline at end of file
}
......@@ -18,7 +18,7 @@
package org.apache.flink.examples.scala.graph
import org.apache.flink.api.scala._
import org.apache.flink.example.java.graph.util.PageRankData
import org.apache.flink.examples.java.graph.util.PageRankData
import org.apache.flink.api.java.aggregation.Aggregations.SUM
import org.apache.flink.util.Collector
......@@ -197,4 +197,4 @@ object PageRankBasic {
private var numPages: Long = 0
private var maxIterations: Int = 10
}
\ No newline at end of file
}
......@@ -18,12 +18,11 @@
package org.apache.flink.examples.scala.graph
import org.apache.flink.api.scala._
import org.apache.flink.example.java.graph.util.ConnectedComponentsData
import org.apache.flink.examples.java.graph.util.ConnectedComponentsData
import org.apache.flink.util.Collector
object TransitiveClosureNaive {
def main (args: Array[String]): Unit = {
if (!parseParameters(args)) {
return
......@@ -33,22 +32,22 @@ object TransitiveClosureNaive {
val edges = getEdgesDataSet(env)
val paths = edges.iterateWithTermination(maxIterations) { prevPaths: DataSet[(Long,Long)] =>
val paths = edges.iterateWithTermination(maxIterations) { prevPaths: DataSet[(Long, Long)] =>
val nextPaths = prevPaths
.join(edges)
.where(1).equalTo(0) {
(left,right) => Some((left._1,right._2))
(left, right) => Some((left._1,right._2))
}
.union(prevPaths)
.groupBy(0,1)
.reduce((l,r) => l)
.groupBy(0, 1)
.reduce((l, r) => l)
val terminate = prevPaths
.coGroup(nextPaths)
.where(0).equalTo(0) {
(prev, next, out: Collector[(Long, Long)]) => {
val prevPaths = prev.toList
val prevPaths = prev.toSet
for (n <- next)
if (!prevPaths.contains(n))
out.collect(n)
......@@ -92,7 +91,7 @@ object TransitiveClosureNaive {
System.out.println(" See the documentation for the correct format of input files.")
System.out.println(" Usage: TransitiveClosure <edges path> <result path> <max number of iterations>")
}
return true
true
}
private def getEdgesDataSet(env: ExecutionEnvironment): DataSet[(Long, Long)] = {
......@@ -111,96 +110,3 @@ object TransitiveClosureNaive {
}
}
}
///**
// * Licensed to the Apache Software Foundation (ASF) under one
// * or more contributor license agreements. See the NOTICE file
// * distributed with this work for additional information
// * regarding copyright ownership. The ASF licenses this file
// * to you under the Apache License, Version 2.0 (the
// * "License"); you may not use this file except in compliance
// * with the License. You may obtain a copy of the License at
// *
// * http://www.apache.org/licenses/LICENSE-2.0
// *
// * Unless required by applicable law or agreed to in writing, software
// * distributed under the License is distributed on an "AS IS" BASIS,
// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// * See the License for the specific language governing permissions and
// * limitations under the License.
// */
//
//
//package org.apache.flink.examples.scala.graph;
//
//import org.apache.flink.client.LocalExecutor
//import org.apache.flink.api.common.Program
//import org.apache.flink.api.common.ProgramDescription
//
//import org.apache.flink.api.scala._
//import org.apache.flink.api.scala.operators._
//
//class TransitiveClosureNaive extends Program with ProgramDescription with Serializable {
//
// def getScalaPlan(numSubTasks: Int, numIterations: Int, verticesInput: String, edgesInput: String, pathsOutput: String) = {
// val vertices = DataSource(verticesInput, DelimitedInputFormat(parseVertex))
// val edges = DataSource(edgesInput, DelimitedInputFormat(parseEdge))
//
// def createClosure(paths: DataSetOLD[Path]) = {
//
// val allNewPaths = paths join edges where { p => p.to } isEqualTo { e => e.from } map joinPaths
// val shortestPaths = allNewPaths union paths groupBy { p => (p.from, p.to) } reduceGroup { _ minBy { _.dist } }
//
// shortestPaths
// }
//
// val transitiveClosure = vertices.iterate(numIterations, createClosure)
//
// val output = transitiveClosure.write(pathsOutput, DelimitedOutputFormat(formatOutput))
//
// val plan = new ScalaPlan(Seq(output), "Transitive Closure (Naive)")
// plan.setDefaultParallelism(numSubTasks)
// plan
// }
//
// def joinPaths = (p1: Path, p2: Path) => (p1, p2) match {
// case (Path(from, _, dist1), Path(_, to, dist2)) => Path(from, to, dist1 + dist2)
// }
//
// case class Path(from: Int, to: Int, dist: Int)
//
// def parseVertex = (line: String) => { val v = line.toInt; Path(v, v, 0) }
//
// val EdgeInputPattern = """(\d+)\|(\d+)""".r
//
// def parseEdge = (line: String) => line match {
// case EdgeInputPattern(from, to) => Path(from.toInt, to.toInt, 1)
// }
//
// def formatOutput = (path: Path) => "%d|%d|%d".format(path.from, path.to, path.dist)
//
// override def getDescription() = {
// "Parameters: <numSubStasks> <numIterations> <vertices> <edges> <output>"
// }
//
// override def getPlan(args: String*) = {
// getScalaPlan(args(0).toInt, args(1).toInt, args(2), args(3), args(4))
// }
//}
//
//object RunTransitiveClosureNaive {
// def main(pArgs: Array[String]) {
// if (pArgs.size < 3) {
// println("usage: [-numIterations <int:2>] -vertices <file> -edges <file> -output <file>")
// return
// }
// val args = Args.parse(pArgs)
// val plan = new TransitiveClosureNaive().getScalaPlan(2, args("numIterations", "10").toInt, args("vertices"), args("edges"), args("output"))
// LocalExecutor.execute(plan)
// }
//}
\ No newline at end of file
......@@ -23,8 +23,7 @@ import java.io.Serializable
import org.apache.flink.api.common.functions._
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration
import org.apache.flink.example.java.clustering.util.KMeansData
import org.apache.flink.example.java.ml.util.LinearRegressionData
import org.apache.flink.examples.java.ml.util.LinearRegressionData
import scala.collection.JavaConverters._
......
......@@ -18,14 +18,14 @@
package org.apache.flink.examples.scala.relational
import org.apache.flink.api.scala._
import org.apache.flink.example.java.relational.util.WebLogData
import org.apache.flink.examples.java.relational.util.WebLogData
import org.apache.flink.util.Collector
/**
* This program processes web logs and relational data.
* It implements the following relational query:
*
* <code><pre>
* {{{
* SELECT
* r.pageURL,
* r.pageRank,
......@@ -40,13 +40,15 @@ import org.apache.flink.util.Collector
* WHERE v.destUrl = d.url
* AND v.visitDate < [date]
* );
* </pre></code>
* }}}
*
*
* <p>
* Input files are plain text CSV files using the pipe character ('|') as field separator.
* The tables referenced in the query can be generated using the {@link WebLogDataGenerator} and
* The tables referenced in the query can be generated using the
* [org.apache.flink.example.java.relational.util.WebLogDataGenerator]] and
* have the following schemas
* <code><pre>
*
* {{{
* CREATE TABLE Documents (
* url VARCHAR(100) PRIMARY KEY,
* contents TEXT );
......@@ -66,133 +68,143 @@ import org.apache.flink.util.Collector
* languageCode VARCHAR(6),
* searchWord VARCHAR(32),
* duration INT );
* </pre></code>
* }}}
*
*
* <p>
* Usage: <code>WebLogAnalysis &lt;documents path&gt; &lt;ranks path&gt; &lt;visits path&gt; &lt;result path&gt;</code><br>
* If no parameters are provided, the program is run with default data from {@link WebLogData}.
* Usage
* {{{
* WebLogAnalysis <documents path> <ranks path> <visits path> <result path>
* }}}
*
* If no parameters are provided, the program is run with default data from
* [[org.apache.flink.example.java.relational.util.WebLogData]].
*
* <p>
* This example shows how to use:
* <ul>
* <li> tuple data types
* <li> projection and join projection
* <li> the CoGroup transformation for an anti-join
* </ul>
*
* - tuple data types
* - projection and join projection
* - the CoGroup transformation for an anti-join
*
*/
object WebLogAnalysis {
def main(args: Array[String]) {
if (!parseParameters(args)) {
return
}
val env = ExecutionEnvironment.getExecutionEnvironment
val documents = getDocumentsDataSet(env)
val ranks = getRanksDataSet(env)
val visits = getVisitsDataSet(env)
val filteredDocs = documents
.filter (doc => doc._2.contains(" editors ") && doc._2.contains ( " oscillations "))
val filteredRanks = ranks
.filter (rank => rank._1 > 40)
val filteredVisits = visits
.filter (visit => visit._2.substring(0,4).toInt == 2007)
val joinDocsRanks = filteredDocs
.join(filteredRanks)
.where(0).equalTo(1)
.map(_._2)
val result = joinDocsRanks
.coGroup(filteredVisits)
.where(1).equalTo(0)
.apply ((ranks, visits, out:Collector[(Int,String,Int)]) => {
if (visits.isEmpty) for (rank <- ranks) out.collect(rank)
})
result.print()
env.execute("WebLogAnalysis Example")
}
private var fileOutput: Boolean = false
private var documentsPath: String = null
private var ranksPath: String = null
private var visitsPath: String = null
private var outputPath: String = null
private def parseParameters (args: Array[String]) : Boolean = {
if (args.length > 0) {
fileOutput = true;
if (args.length == 4) {
documentsPath = args(0)
ranksPath = args(1)
visitsPath = args(2)
outputPath = args(3)
}
else {
System.err.println("Usage: WebLogAnalysis <documents path> <ranks path> <visits path> <result path>")
return false
}
}
else {
System.out.println("Executing WebLog Analysis example with built-in default data.")
System.out.println(" Provide parameters to read input data from files.")
System.out.println(" See the documentation for the correct format of input files.")
System.out.println(" We provide a data generator to create synthetic input files for this program.")
System.out.println(" Usage: WebLogAnalysis <documents path> <ranks path> <visits path> <result path>")
}
return true;
}
private def getDocumentsDataSet(env: ExecutionEnvironment): DataSet[(String,String)] = {
if (fileOutput) {
env.readCsvFile[(String,String)](
documentsPath,
fieldDelimiter = '|',
includedFields = Array(0,1))
}
else {
val documents = WebLogData.DOCUMENTS map {
case Array(x,y) => (x.asInstanceOf[String],y.asInstanceOf[String])
}
env.fromCollection(documents)
}
}
private def getRanksDataSet(env: ExecutionEnvironment) : DataSet[(Int, String, Int)] = {
if (fileOutput) {
env.readCsvFile[(Int,String,Int)](
ranksPath,
fieldDelimiter = '|',
includedFields = Array(0,1,2))
}
else {
val ranks = WebLogData.RANKS map {
case Array(x,y,z) => (x.asInstanceOf[Int], y.asInstanceOf[String], z.asInstanceOf[Int])
}
env.fromCollection(ranks)
}
}
private def getVisitsDataSet (env: ExecutionEnvironment) : DataSet[(String,String)] = {
if (fileOutput) {
env.readCsvFile[(String,String)](
visitsPath,
fieldDelimiter = '|',
includedFields = Array(0,1))
}
else {
val visits = WebLogData.VISITS map {
case Array(x,y) => (x.asInstanceOf[String], y.asInstanceOf[String])
}
env.fromCollection(visits)
}
}
}
\ No newline at end of file
def main(args: Array[String]) {
if (!parseParameters(args)) {
return
}
val env = ExecutionEnvironment.getExecutionEnvironment
val documents = getDocumentsDataSet(env)
val ranks = getRanksDataSet(env)
val visits = getVisitsDataSet(env)
val filteredDocs = documents
.filter(doc => doc._2.contains(" editors ") && doc._2.contains(" oscillations "))
val filteredRanks = ranks
.filter(rank => rank._1 > 40)
val filteredVisits = visits
.filter(visit => visit._2.substring(0, 4).toInt == 2007)
val joinDocsRanks = filteredDocs.join(filteredRanks).where(0).equalTo(1) {
(doc, rank) => Some(rank)
}
val result = joinDocsRanks.coGroup(filteredVisits).where(1).equalTo(0) {
(ranks, visits, out: Collector[(Int, String, Int)]) =>
if (visits.isEmpty) for (rank <- ranks) out.collect(rank)
}
// emit result
if (fileOutput) {
result.writeAsCsv(outputPath, "\n", "|")
} else {
result.print()
}
env.execute("Scala WebLogAnalysis Example")
}
private var fileOutput: Boolean = false
private var documentsPath: String = null
private var ranksPath: String = null
private var visitsPath: String = null
private var outputPath: String = null
private def parseParameters(args: Array[String]): Boolean = {
if (args.length > 0) {
fileOutput = true
if (args.length == 4) {
documentsPath = args(0)
ranksPath = args(1)
visitsPath = args(2)
outputPath = args(3)
}
else {
System.err.println("Usage: WebLogAnalysis <documents path> <ranks path> <visits path> " +
"<result path>")
return false
}
}
else {
System.out.println("Executing WebLog Analysis example with built-in default data.")
System.out.println(" Provide parameters to read input data from files.")
System.out.println(" See the documentation for the correct format of input files.")
System.out.println(" We provide a data generator to create synthetic input files for this " +
"program.")
System.out.println(" Usage: WebLogAnalysis <documents path> <ranks path> <visits path> " +
"<result path>")
}
true
}
private def getDocumentsDataSet(env: ExecutionEnvironment): DataSet[(String, String)] = {
if (fileOutput) {
env.readCsvFile[(String, String)](
documentsPath,
fieldDelimiter = '|',
includedFields = Array(0, 1))
}
else {
val documents = WebLogData.DOCUMENTS map {
case Array(x, y) => (x.asInstanceOf[String], y.asInstanceOf[String])
}
env.fromCollection(documents)
}
}
private def getRanksDataSet(env: ExecutionEnvironment): DataSet[(Int, String, Int)] = {
if (fileOutput) {
env.readCsvFile[(Int, String, Int)](
ranksPath,
fieldDelimiter = '|',
includedFields = Array(0, 1, 2))
}
else {
val ranks = WebLogData.RANKS map {
case Array(x, y, z) => (x.asInstanceOf[Int], y.asInstanceOf[String], z.asInstanceOf[Int])
}
env.fromCollection(ranks)
}
}
private def getVisitsDataSet(env: ExecutionEnvironment): DataSet[(String, String)] = {
if (fileOutput) {
env.readCsvFile[(String, String)](
visitsPath,
fieldDelimiter = '|',
includedFields = Array(1, 2))
}
else {
val visits = WebLogData.VISITS map {
case Array(x, y) => (x.asInstanceOf[String], y.asInstanceOf[String])
}
env.fromCollection(visits)
}
}
}
......@@ -18,7 +18,7 @@
package org.apache.flink.examples.scala.wordcount
import org.apache.flink.api.scala._
import org.apache.flink.example.java.wordcount.util.WordCountData
import org.apache.flink.examples.java.wordcount.util.WordCountData
/**
* Implements the "WordCount" program that computes a simple word occurrence histogram
......
......@@ -30,11 +30,11 @@ import org.apache.flink.compiler.plan.BulkPartialSolutionPlanNode;
import org.apache.flink.compiler.plan.OptimizedPlan;
import org.apache.flink.compiler.plan.SinkPlanNode;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.example.java.graph.PageRankBasic.BuildOutgoingEdgeList;
import org.apache.flink.example.java.graph.PageRankBasic.Dampener;
import org.apache.flink.example.java.graph.PageRankBasic.EpsilonFilter;
import org.apache.flink.example.java.graph.PageRankBasic.JoinVertexWithEdgesMatch;
import org.apache.flink.example.java.graph.PageRankBasic.RankAssigner;
import org.apache.flink.examples.java.graph.PageRankBasic.BuildOutgoingEdgeList;
import org.apache.flink.examples.java.graph.PageRankBasic.Dampener;
import org.apache.flink.examples.java.graph.PageRankBasic.EpsilonFilter;
import org.apache.flink.examples.java.graph.PageRankBasic.JoinVertexWithEdgesMatch;
import org.apache.flink.examples.java.graph.PageRankBasic.RankAssigner;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.apache.flink.runtime.operators.util.LocalStrategy;
import org.apache.flink.test.compiler.util.CompilerTestBase;
......
......@@ -23,7 +23,7 @@ import org.apache.flink.client.program.Client.ProgramAbortException;
import org.apache.flink.client.program.PackagedProgram.PreviewPlanEnvironment;
import org.apache.flink.compiler.plan.OptimizedPlan;
import org.apache.flink.compiler.plandump.PlanJSONDumpGenerator;
import org.apache.flink.example.java.clustering.KMeans;
import org.apache.flink.examples.java.clustering.KMeans;
import org.apache.flink.test.compiler.util.CompilerTestBase;
import org.apache.flink.test.recordJobs.graph.DeltaPageRankWithInitialDeltas;
import org.apache.flink.test.recordJobs.kmeans.KMeansBroadcast;
......
......@@ -21,7 +21,7 @@ package org.apache.flink.test.exampleJavaPrograms;
import java.io.BufferedReader;
import org.apache.flink.example.java.graph.ConnectedComponents;
import org.apache.flink.examples.java.graph.ConnectedComponents;
import org.apache.flink.test.testdata.ConnectedComponentsData;
import org.apache.flink.test.util.JavaProgramTestBase;
......
......@@ -18,7 +18,7 @@
package org.apache.flink.test.exampleJavaPrograms;
import org.apache.flink.example.java.graph.EnumTrianglesBasic;
import org.apache.flink.examples.java.graph.EnumTrianglesBasic;
import org.apache.flink.test.testdata.EnumTriangleData;
import org.apache.flink.test.util.JavaProgramTestBase;
......
......@@ -18,7 +18,7 @@
package org.apache.flink.test.exampleJavaPrograms;
import org.apache.flink.example.java.graph.EnumTrianglesOpt;
import org.apache.flink.examples.java.graph.EnumTrianglesOpt;
import org.apache.flink.test.testdata.EnumTriangleData;
import org.apache.flink.test.util.JavaProgramTestBase;
......
......@@ -24,7 +24,7 @@ import java.util.Collection;
import java.util.LinkedList;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.example.java.graph.PageRankBasic;
import org.apache.flink.examples.java.graph.PageRankBasic;
import org.apache.flink.test.testdata.PageRankData;
import org.apache.flink.test.util.JavaProgramTestBase;
import org.junit.runner.RunWith;
......
......@@ -22,7 +22,7 @@ package org.apache.flink.test.exampleJavaPrograms;
import java.io.BufferedReader;
import org.apache.flink.example.java.graph.TransitiveClosureNaive;
import org.apache.flink.examples.java.graph.TransitiveClosureNaive;
import org.apache.flink.test.testdata.ConnectedComponentsData;
import org.apache.flink.test.testdata.TransitiveClosureData;
import org.apache.flink.test.util.JavaProgramTestBase;
......
......@@ -19,7 +19,7 @@
package org.apache.flink.test.exampleJavaPrograms;
import org.apache.flink.example.java.relational.WebLogAnalysis;
import org.apache.flink.examples.java.relational.WebLogAnalysis;
import org.apache.flink.test.testdata.WebLogAnalysisData;
import org.apache.flink.test.util.JavaProgramTestBase;
......
......@@ -18,7 +18,7 @@
package org.apache.flink.test.exampleJavaPrograms;
import org.apache.flink.example.java.wordcount.WordCount;
import org.apache.flink.examples.java.wordcount.WordCount;
import org.apache.flink.test.testdata.WordCountData;
import org.apache.flink.test.util.JavaProgramTestBase;
......
......@@ -23,7 +23,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
import org.apache.flink.api.java.aggregation.Aggregations;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.example.java.wordcount.WordCount;
import org.apache.flink.examples.java.wordcount.WordCount;
import org.apache.flink.test.testdata.WordCountData;
import org.apache.flink.test.util.JavaProgramTestBase;
import org.junit.Assert;
......
......@@ -34,8 +34,8 @@ import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
import org.apache.flink.example.java.graph.ConnectedComponents.DuplicateValue;
import org.apache.flink.example.java.graph.ConnectedComponents.NeighborWithComponentIDJoin;
import org.apache.flink.examples.java.graph.ConnectedComponents.DuplicateValue;
import org.apache.flink.examples.java.graph.ConnectedComponents.NeighborWithComponentIDJoin;
@SuppressWarnings("serial")
public class CoGroupConnectedComponentsSecondITCase extends JavaProgramTestBase {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册