提交 4d0f6123 编写于 作者: F Fabian Hueske

Refactored and unifid Java API example jobs

上级 5a81d9de
......@@ -22,15 +22,102 @@ import eu.stratosphere.api.java.ExecutionEnvironment;
import eu.stratosphere.api.java.IterativeDataSet;
import eu.stratosphere.api.java.functions.MapFunction;
import eu.stratosphere.api.java.functions.ReduceFunction;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.api.java.tuple.Tuple3;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.example.java.clustering.util.KMeansData;
/**
* This example implements a basic K-Means clustering algorithm.
*
* <p>
* K-Means is an iterative clustering algorithm and works as follows:<br>
* K-Means is given a set of data points to be clustered and an initial set of <i>K</i> cluster centers.
* In each iteration, the algorithm computes the distance of each data point to each cluster center.
* Each point is assigned to the cluster center which is closest to it.
* Subsequently, each cluster center is moved to the center (<i>mean</i>) of all points that have been assigned to it.
* The moved cluster centers are fed into the next iteration.
* The algorithm terminates after a fixed number of iteration (as in this implementation)
* or if cluster centers do not (significantly) move in an iteration.
*
* <p>
* This implementation works on two-dimensional data points. <br>
* It computes an assignment of data points to cluster centers, i.e.,
* each data point is annotated with the id of the final cluster (center) it belongs to.
*
* <p>
* Input files are plain text files must be formatted as follows:
* <ul>
* <li>Data points are represented as two double values separated by a blank character.
* Data points are separated by newline characters.<br>
* For example <code>"1.2 2.3\n5.3 7.2\n"</code> gives two data points (x=1.2, y=2.3) and (x=5.3, y=7.2).
* <li>Cluster centers are represented by an integer id and a point value.<br>
* For example <code>"1 6.2 3.2\n2 2.9 5.7\n"</code> gives two centers (id=1, x=6.2, y=3.2) and (id=2, x=2.9, y=5.7).
* </ul>
*
* <p>
* This example shows how to use:
* <ul>
* <li>Bulk iterations
* <li>Broadcast variables in bulk iterations
* <li>Custom Java objects (PoJos)
* </ul>
*/
@SuppressWarnings("serial")
public class KMeans {
// *************************************************************************
// PROGRAM
// *************************************************************************
public static void main(String[] args) throws Exception {
parseParameters(args);
// set up execution environment
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// get input data
DataSet<Point> points = getPointDataSet(env);
DataSet<Centroid> centroids = getCentroidDataSet(env);
// set number of bulk iterations for KMeans algorithm
IterativeDataSet<Centroid> loop = centroids.iterate(numIterations);
DataSet<Centroid> newCentriods = points
// compute closest centroid for each point
.map(new SelectNearestCenter()).withBroadcastSet(loop, "centroids")
// count and sum point coordinates for each centroid
.map(new CountAppender())
.groupBy(0).reduce(new CentroidAccumulator())
// compute new centroids from point counts and coordinate sums
.map(new CentroidAverager());
// feed new centroids back into next iteration
DataSet<Centroid> finalCentroids = loop.closeWith(newCentriods);
DataSet<Tuple2<Integer, Point>> clusteredPoints = points
// assign points to final clusters
.map(new SelectNearestCenter()).withBroadcastSet(finalCentroids, "centroids");
// emit result
if(fileOutput) {
clusteredPoints.writeAsCsv(outputPath, "\n", ",");
} else {
clusteredPoints.print();
}
// execute program
env.execute("KMeans Example");
}
// *************************************************************************
// DATA TYPES
// *************************************************************************
/**
* A simple three-dimensional point.
* A simple two-dimensional point.
*/
public static class Point implements Serializable {
......@@ -65,10 +152,13 @@ public class KMeans {
@Override
public String toString() {
return "(" + x + "|" + y + ")";
return x + "," + y;
}
}
/**
* A simple two-dimensional centroid, basically a point with an ID.
*/
public static class Centroid extends Point {
public int id;
......@@ -84,27 +174,50 @@ public class KMeans {
super(p.x, p.y);
this.id = id;
}
@Override
public String toString() {
return id + "," + super.toString();
}
}
/**
* Determines the closest cluster center for a data point.
*/
public static final class SelectNearestCenter extends MapFunction<Point, Tuple3<Integer, Point, Long>> {
private static final long serialVersionUID = 1L;
// *************************************************************************
// USER FUNCTIONS
// *************************************************************************
/** Converts a Tuple2<Double,Double> into a Point. */
public static final class TuplePointConverter extends MapFunction<Tuple2<Double, Double>, Point> {
@Override
public Point map(Tuple2<Double, Double> t) throws Exception {
return new Point(t.f0, t.f1);
}
}
/** Converts a Tuple3<Integer, Double,Double> into a Centroid. */
public static final class TupleCentroidConverter extends MapFunction<Tuple3<Integer, Double, Double>, Centroid> {
@Override
public Centroid map(Tuple3<Integer, Double, Double> t) throws Exception {
return new Centroid(t.f0, t.f1, t.f2);
}
}
/** Determines the closest cluster center for a data point. */
public static final class SelectNearestCenter extends MapFunction<Point, Tuple2<Integer, Point>> {
private Collection<Centroid> centroids;
/** Reads the centroid values from the broadcast variable into a collection.*/
/** Reads the centroid values from a broadcast variable into a collection. */
@Override
public void open(Configuration parameters) throws Exception {
this.centroids = getRuntimeContext().getBroadcastVariable("centroids");
}
@Override
public Tuple3<Integer, Point, Long> map(Point p) throws Exception {
public Tuple2<Integer, Point> map(Point p) throws Exception {
double nearestDistance = Double.MAX_VALUE;
int centroidId = 0;
double minDistance = Double.MAX_VALUE;
int closestCentroidId = -1;
// check all cluster centers
for (Centroid centroid : centroids) {
......@@ -112,32 +225,37 @@ public class KMeans {
double distance = p.euclideanDistance(centroid);
// update nearest cluster if necessary
if (distance < nearestDistance) {
nearestDistance = distance;
centroidId = centroid.id;
if (distance < minDistance) {
minDistance = distance;
closestCentroidId = centroid.id;
}
}
// emit a new record with the center id and the data point.
return new Tuple3<Integer, Point, Long>(centroidId, p, 1L);
return new Tuple2<Integer, Point>(closestCentroidId, p);
}
}
/** Appends a count variable to the tuple. */
public static final class CountAppender extends MapFunction<Tuple2<Integer, Point>, Tuple3<Integer, Point, Long>> {
@Override
public Tuple3<Integer, Point, Long> map(Tuple2<Integer, Point> t) {
return new Tuple3<Integer, Point, Long>(t.f0, t.f1, 1L);
}
}
/** The input and output types are (centroid-id, point-sum, count) */
public static class CentroidAccumulator extends ReduceFunction<Tuple3<Integer, Point, Long>> {
private static final long serialVersionUID = 1L;
/** Sums and counts point coordinates. */
public static final class CentroidAccumulator extends ReduceFunction<Tuple3<Integer, Point, Long>> {
@Override
public Tuple3<Integer, Point, Long> reduce(Tuple3<Integer, Point, Long> val1, Tuple3<Integer, Point, Long> val2) {
return new Tuple3<Integer, Point, Long>(val1.f0, val1.f1.add(val2.f1), val1.f2 + val2.f2);
}
}
/** The input and output types are (centroid-id, point-sum, count) */
public static class CentroidAverager extends MapFunction<Tuple3<Integer, Point, Long>, Centroid> {
private static final long serialVersionUID = 1L;
/** Computes new centroid from coordinate sum and count of points. */
public static final class CentroidAverager extends MapFunction<Tuple3<Integer, Point, Long>, Centroid> {
@Override
public Centroid map(Tuple3<Integer, Point, Long> value) {
......@@ -145,30 +263,60 @@ public class KMeans {
}
}
// *************************************************************************
// UTIL METHODS
// *************************************************************************
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Point> points = env.fromElements(new Point(-3.78, -42.01), new Point(-45.96, 30.67), new Point(8.96, -41.58),
new Point(-22.96, 40.73), new Point(4.79, -35.58), new Point(-41.27, 32.42),
new Point(-2.61, -30.43), new Point(-23.33, 26.23), new Point(-9.22, -31.23),
new Point(-45.37, 36.42));
DataSet<Centroid> centroids = env.fromElements(new Centroid(0, 43.28, 47.89),
new Centroid(1, -0.06, -48.97));
IterativeDataSet<Centroid> loop = centroids.iterate(20);
DataSet<Centroid> newCentriods = points
.map(new SelectNearestCenter()).withBroadcastSet(loop, "centroids")
.groupBy(0).reduce(new CentroidAccumulator())
.map(new CentroidAverager());
DataSet<Centroid> result = loop.closeWith(newCentriods);
result.print();
private static boolean fileOutput = false;
private static String pointsPath = null;
private static String centersPath = null;
private static String outputPath = null;
private static int numIterations = 10;
private static void parseParameters(String[] programArguments) {
env.execute("KMeans 2d example");
if(programArguments.length > 0) {
// parse input arguments
fileOutput = true;
if(programArguments.length == 4) {
pointsPath = programArguments[0];
centersPath = programArguments[1];
outputPath = programArguments[2];
numIterations = Integer.parseInt(programArguments[3]);
} else {
System.err.println("Usage: KMeans <points path> <centers path> <result path> <num iterations>");
System.exit(1);
}
} else {
System.out.println("Executing K-Means example with default parameters and built-in default data.");
System.out.println(" Provide parameters to read input data from files.");
System.out.println(" Usage: KMeans <points path> <centers path> <result path> <num iterations>");
}
}
private static DataSet<Point> getPointDataSet(ExecutionEnvironment env) {
if(fileOutput) {
// read points from CSV file
return env.readCsvFile(pointsPath)
.fieldDelimiter(' ')
.includeFields(true, true)
.types(Double.class, Double.class)
.map(new TuplePointConverter());
} else {
return KMeansData.getDefaultPointDataSet(env);
}
}
private static DataSet<Centroid> getCentroidDataSet(ExecutionEnvironment env) {
if(fileOutput) {
return env.readCsvFile(centersPath)
.fieldDelimiter(' ')
.includeFields(true, true, true)
.types(Integer.class, Double.class, Double.class)
.map(new TupleCentroidConverter());
} else {
return KMeansData.getDefaultCentroidDataSet(env);
}
}
}
/***********************************************************************************************************************
* Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
**********************************************************************************************************************/
package eu.stratosphere.example.java.clustering.util;
import eu.stratosphere.api.java.DataSet;
import eu.stratosphere.api.java.ExecutionEnvironment;
import eu.stratosphere.example.java.clustering.KMeans.Centroid;
import eu.stratosphere.example.java.clustering.KMeans.Point;
public class KMeansData {
public static DataSet<Centroid> getDefaultCentroidDataSet(ExecutionEnvironment env) {
return env.fromElements(
new Centroid(1, -31.85, -44.77),
new Centroid(2, 35.16, 17.46),
new Centroid(3, -5.16, 21.93),
new Centroid(4, -24.06, 6.81)
);
}
public static DataSet<Point> getDefaultPointDataSet(ExecutionEnvironment env) {
return env.fromElements(
new Point(-14.22, -48.01),
new Point(-22.78, 37.10),
new Point(56.18, -42.99),
new Point(35.04, 50.29),
new Point(-9.53, -46.26),
new Point(-34.35, 48.25),
new Point(55.82, -57.49),
new Point(21.03, 54.64),
new Point(-13.63, -42.26),
new Point(-36.57, 32.63),
new Point(50.65, -52.40),
new Point(24.48, 34.04),
new Point(-2.69, -36.02),
new Point(-38.80, 36.58),
new Point(24.00, -53.74),
new Point(32.41, 24.96),
new Point(-4.32, -56.92),
new Point(-22.68, 29.42),
new Point(59.02, -39.56),
new Point(24.47, 45.07),
new Point(5.23, -41.20),
new Point(-23.00, 38.15),
new Point(44.55, -51.50),
new Point(14.62, 59.06),
new Point(7.41, -56.05),
new Point(-26.63, 28.97),
new Point(47.37, -44.72),
new Point(29.07, 51.06),
new Point(0.59, -31.89),
new Point(-39.09, 20.78),
new Point(42.97, -48.98),
new Point(34.36, 49.08),
new Point(-21.91, -49.01),
new Point(-46.68, 46.04),
new Point(48.52, -43.67),
new Point(30.05, 49.25),
new Point(4.03, -43.56),
new Point(-37.85, 41.72),
new Point(38.24, -48.32),
new Point(20.83, 57.85)
);
}
}
......@@ -11,7 +11,7 @@
* specific language governing permissions and limitations under the License.
**********************************************************************************************************************/
package eu.stratosphere.example.java.clustering.generator;
package eu.stratosphere.example.java.clustering.util;
import java.io.BufferedWriter;
import java.io.File;
......@@ -21,35 +21,48 @@ import java.text.DecimalFormat;
import java.util.Locale;
import java.util.Random;
public class KMeansSampleDataGenerator {
import eu.stratosphere.example.java.clustering.KMeans;
/**
* Generates data for the {@link KMeans} example program.
*/
public class KMeansDataGenerator {
static {
Locale.setDefault(Locale.US);
}
private static final String CENTERS_FILE = "centers";
private static final String POINTS_FILE = "points";
private static final long DEFAULT_SEED = 4650285087650871364L;
private static final double DEFAULT_VALUE_RANGE = 100.0;
private static final double RELATIVE_STDDEV = 0.08;
private static final int DIMENSIONALITY = 2;
private static final DecimalFormat FORMAT = new DecimalFormat("#0.00");
private static final char DELIMITER = '|';
private static final char DELIMITER = ' ';
/**
* Main method to generate data for the {@link KMeans} example program.
* <p>
* The generator creates to files:
* <ul>
* <li><code>{tmp.dir}/points</code> for the data points
* <li><code>{tmp.dir}/centers</code> for the cluster centers
* </ul>
*
* @param args
* <ol>
* <li>Int: Number of data points
* <li>Int: Number of cluster centers
* <li><b>Optional</b> Double: Standard deviation of data points
* <li><b>Optional</b> Double: Value range of cluster centers
* <li><b>Optional</b> Long: Random seed
* </ol>
*/
public static void main(String[] args) throws IOException {
// check parameter count
if (args.length < 3) {
if (args.length < 2) {
System.out.println("KMeansDataGenerator <numberOfDataPoints> <numberOfClusterCenters> [<relative stddev>] [<centroid range>] [<seed>]");
System.exit(1);
}
......@@ -57,23 +70,21 @@ public class KMeansSampleDataGenerator {
// parse parameters
final int numDataPoints = Integer.parseInt(args[0]);
final int k = Integer.parseInt(args[1]);
final double stddev = args.length > 2 ? Double.parseDouble(args[2]) : RELATIVE_STDDEV;
final double range = args.length > 3 ? Double.parseDouble(args[4]) : DEFAULT_VALUE_RANGE;
final long firstSeed = args.length > 4 ? Long.parseLong(args[4]) : DEFAULT_SEED;
// generate the centers first
final double absoluteStdDev = stddev * range;
final Random random = new Random(firstSeed);
final String tmpDir = System.getProperty("java.io.tmpdir");
// the means for our gaussian distributions
// the means around which data points are distributed
final double[][] means = uniformRandomCenters(random, k, DIMENSIONALITY, range);
// write the points out
BufferedWriter pointsOut = null;
try {
pointsOut = new BufferedWriter(new FileWriter(new File(POINTS_FILE)));
pointsOut = new BufferedWriter(new FileWriter(new File(tmpDir+"/"+POINTS_FILE)));
StringBuilder buffer = new StringBuilder();
double[] point = new double[DIMENSIONALITY];
......@@ -85,7 +96,7 @@ public class KMeansSampleDataGenerator {
for (int d = 0; d < DIMENSIONALITY; d++) {
point[d] = (random.nextGaussian() * absoluteStdDev) + centroid[d];
}
write(i, point, buffer, pointsOut);
writePoint(point, buffer, pointsOut);
nextCentroid = (nextCentroid + 1) % k;
}
}
......@@ -94,18 +105,17 @@ public class KMeansSampleDataGenerator {
pointsOut.close();
}
}
// write the uniformly distributed centers to a file
BufferedWriter centersOut = null;
try {
centersOut = new BufferedWriter(new FileWriter(new File(CENTERS_FILE)));
centersOut = new BufferedWriter(new FileWriter(new File(tmpDir+"/"+CENTERS_FILE)));
StringBuilder buffer = new StringBuilder();
double[][] centers = uniformRandomCenters(random, k, DIMENSIONALITY, range);
for (int i = 0; i < k; i++) {
write(i + 1, centers[i], buffer, centersOut);
writeCenter(i + 1, centers[i], buffer, centersOut);
}
}
finally {
......@@ -113,6 +123,9 @@ public class KMeansSampleDataGenerator {
centersOut.close();
}
}
System.out.println("Wrote "+numDataPoints+" data points to "+tmpDir+"/"+POINTS_FILE);
System.out.println("Wrote "+k+" cluster centers to "+tmpDir+"/"+CENTERS_FILE);
}
private static final double[][] uniformRandomCenters(Random rnd, int num, int dimensionality, double range) {
......@@ -127,16 +140,34 @@ public class KMeansSampleDataGenerator {
return points;
}
private static void write(long id, double[] coordinates, StringBuilder buffer, BufferedWriter out) throws IOException {
private static void writePoint(double[] coordinates, StringBuilder buffer, BufferedWriter out) throws IOException {
buffer.setLength(0);
// write coordinates
for (int j = 0; j < coordinates.length; j++) {
buffer.append(FORMAT.format(coordinates[j]));
if(j < coordinates.length - 1) {
buffer.append(DELIMITER);
}
}
out.write(buffer.toString());
out.newLine();
}
private static void writeCenter(long id, double[] coordinates, StringBuilder buffer, BufferedWriter out) throws IOException {
buffer.setLength(0);
// write id
buffer.append(id);
buffer.append(DELIMITER);
// append all coordinates
// write coordinates
for (int j = 0; j < coordinates.length; j++) {
buffer.append(FORMAT.format(coordinates[j]));
buffer.append('|');
if(j < coordinates.length - 1) {
buffer.append(DELIMITER);
}
}
out.write(buffer.toString());
......
......@@ -23,6 +23,7 @@ import eu.stratosphere.api.java.functions.JoinFunction;
import eu.stratosphere.api.java.functions.MapFunction;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.example.java.graph.util.ConnectedComponentsData;
import eu.stratosphere.util.Collector;
/**
......@@ -35,33 +36,41 @@ import eu.stratosphere.util.Collector;
* their current component ids, and the workset as the changed vertices. Because we see all vertices initially as
* changed, the initial workset and the initial solution set are identical. Also, the delta to the solution set
* is consequently also the next workset.
*
* <p>
* Input files are plain text files must be formatted as follows:
* <ul>
* <li>Vertexes represented as IDs and separated by new-line characters.<br>
* For example <code>"1\n2\n12\n42\n63\n"</code> gives five vertices (1), (2), (12), (42), and (63).
* <li>Edges are represented as pairs for vertex IDs which are separated by space
* characters. Edges are separated by new-line characters.<br>
* For example <code>"1 2\n2 12\n1 12\n42 63\n"</code> gives four (undirected) edges (1)-(2), (2)-(12), (1)-(12), and (42)-(63).
* </ul>
*
* <p>
* This example shows how to use:
* <ul>
* <li>Delta Iterations
* <li>Generic-typed Functions
* </ul>
*/
@SuppressWarnings("serial")
public class ConnectedComponents implements ProgramDescription {
// *************************************************************************
// PROGRAM
// *************************************************************************
public static void main(String... args) throws Exception {
if (args.length < 4) {
System.err.println("Parameters: <vertices-path> <edges-path> <result-path> <max-number-of-iterations>");
return;
}
final int maxIterations = Integer.parseInt(args[3]);
parseParameters(args);
// set up execution environment
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Long> vertices = env.readCsvFile(args[0]).types(Long.class).map(new MapFunction<Tuple1<Long>, Long>() {
public Long map(Tuple1<Long> value) { return value.f0; } });
DataSet<Tuple2<Long, Long>> edges = env.readCsvFile(args[1]).fieldDelimiter(' ').types(Long.class, Long.class);
DataSet<Tuple2<Long, Long>> result = doConnectedComponents(vertices, edges, maxIterations);
result.writeAsCsv(args[2], "\n", " ");
env.execute("Connected Components");
}
public static DataSet<Tuple2<Long, Long>> doConnectedComponents(DataSet<Long> vertices, DataSet<Tuple2<Long, Long>> edges, int maxIterations) {
// read vertex and edge data
DataSet<Long> vertices = getVertexDataSet(env);
DataSet<Tuple2<Long, Long>> edges = getEdgeDataSet(env);
// assign the initial components (equal to the vertex id.
DataSet<Tuple2<Long, Long>> verticesWithInitialId = vertices.map(new DuplicateValue<Long>());
......@@ -77,9 +86,24 @@ public class ConnectedComponents implements ProgramDescription {
.flatMap(new ComponentIdFilter());
// close the delta iteration (delta and new workset are identical)
return iteration.closeWith(changes, changes);
DataSet<Tuple2<Long, Long>> result = iteration.closeWith(changes, changes);
// emit result
if(fileOutput) {
result.writeAsCsv(outputPath, "\n", " ");
} else {
result.print();
}
// execute program
env.execute("Connected Components Example");
}
// *************************************************************************
// USER FUNCTIONS
// *************************************************************************
/**
* Function that turns a value into a 2-tuple where both fields are that value.
*/
......@@ -121,4 +145,59 @@ public class ConnectedComponents implements ProgramDescription {
public String getDescription() {
return "Parameters: <vertices-path> <edges-path> <result-path> <max-number-of-iterations>";
}
// *************************************************************************
// UTIL METHODS
// *************************************************************************
private static boolean fileOutput = false;
private static String verticesPath = null;
private static String edgesPath = null;
private static String outputPath = null;
private static int maxIterations = 10;
private static void parseParameters(String[] programArguments) {
if(programArguments.length > 0) {
// parse input arguments
fileOutput = true;
if(programArguments.length == 4) {
verticesPath = programArguments[0];
edgesPath = programArguments[1];
outputPath = programArguments[2];
maxIterations = Integer.parseInt(programArguments[3]);
} else {
System.err.println("Usage: ConnectedComponents <vertices path> <edges path> <result path> <max number of iterations>");
System.exit(1);
}
} else {
System.out.println("Executing Connected Components example with default parameters and built-in default data.");
System.out.println(" Provide parameters to read input data from files.");
System.out.println(" Usage: ConnectedComponents <vertices path> <edges path> <result path> <max number of iterations>");
}
}
private static DataSet<Long> getVertexDataSet(ExecutionEnvironment env) {
if(fileOutput) {
return env.readCsvFile(verticesPath).types(Long.class)
.map(
new MapFunction<Tuple1<Long>, Long>() {
public Long map(Tuple1<Long> value) { return value.f0; }
});
} else {
return ConnectedComponentsData.getDefaultVertexDataSet(env);
}
}
private static DataSet<Tuple2<Long, Long>> getEdgeDataSet(ExecutionEnvironment env) {
if(fileOutput) {
return env.readCsvFile(edgesPath).fieldDelimiter(' ').types(Long.class, Long.class);
} else {
return ConnectedComponentsData.getDefaultEdgeDataSet(env);
}
}
}
......@@ -12,7 +12,7 @@
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.example.java.triangles;
package eu.stratosphere.example.java.graph;
import java.util.ArrayList;
import java.util.Iterator;
......@@ -24,40 +24,104 @@ import eu.stratosphere.api.java.ExecutionEnvironment;
import eu.stratosphere.api.java.functions.GroupReduceFunction;
import eu.stratosphere.api.java.functions.JoinFunction;
import eu.stratosphere.api.java.functions.MapFunction;
import eu.stratosphere.example.java.triangles.util.EdgeData;
import eu.stratosphere.example.java.triangles.util.EdgeDataTypes.Edge;
import eu.stratosphere.example.java.triangles.util.EdgeDataTypes.Triad;
import eu.stratosphere.example.java.triangles.util.EdgeDataTypes.TupleEdgeConverter;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.example.java.graph.util.EnumTrianglesData;
import eu.stratosphere.example.java.graph.util.EnumTrianglesDataTypes.Edge;
import eu.stratosphere.example.java.graph.util.EnumTrianglesDataTypes.Triad;
import eu.stratosphere.util.Collector;
/**
* Triangle enumeration is a preprocessing step to find closely connected parts in graphs.
* A triangle are three edges that connect three vertices with each other.
*
* <p>
* The algorithm works as follows:
* It groups all edges that share a common vertex and builds triads, i.e., triples of vertices
* that are connected by two edges. Finally, all triads are filtered for which no third edge exists
* that closes the triangle.
*
* This implementation assumes that edges are represented as pairs of vertices and
* vertices are represented as Integer IDs.
*
* The lines of input files need to have the following format:
* "<INT: vertexId1>,<INT: vertexId2>\n"
*
* For example the input:
* "10,20\n10,30\n20,30\n"
* defines three edges (10,20), (10,30), (20,30) which build a triangle.
* <p>
* Input files are plain text files must be formatted as follows:
* <ul>
* <li>Edges are represented as pairs for vertex IDs which are separated by space
* characters. Edges are separated by new-line characters.<br>
* For example <code>"1 2\n2 12\n1 12\n42 63\n"</code> gives four (undirected) edges (1)-(2), (2)-(12), (1)-(12), and (42)-(63)
* that include a triangle
* </ul>
* <pre>
* (1)
* / \
* (2)-(12)
* </pre>
*
* <p>
* This example shows how to use:
* <ul>
* <li>Custom Java objects which extend Tuple
* <li>Group Sorting
* </ul>
*
*/
@SuppressWarnings("serial")
public class EnumTrianglesBasic {
/**
* Projects an edge (pair of vertices) such that the id of the first is smaller than the id of the second.
*/
static boolean fileOutput = false;
static String edgePath = null;
static String outputPath = null;
// *************************************************************************
// PROGRAM
// *************************************************************************
public static void main(String[] args) throws Exception {
parseParameters(args);
// set up execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// read input data
DataSet<Edge> edges = getEdgeDataSet(env);
// project edges by vertex id
DataSet<Edge> edgesById = edges
.map(new EdgeByIdProjector());
DataSet<Triad> triangles = edgesById
// build triads
.groupBy(Edge.V1).sortGroup(Edge.V2, Order.ASCENDING).reduceGroup(new TriadBuilder())
// filter triads
.join(edgesById).where(Triad.V2, Triad.V3).equalTo(Edge.V1, Edge.V2).with(new TriadFilter());
// emit result
if(fileOutput) {
triangles.writeAsCsv(outputPath, "\n", ",");
} else {
triangles.print();
}
// execute program
env.execute("Basic Triangle Enumeration Example");
}
// *************************************************************************
// USER FUNCTIONS
// *************************************************************************
/** Converts a Tuple2 into an Edge */
public static class TupleEdgeConverter extends MapFunction<Tuple2<Integer, Integer>, Edge> {
private final Edge outEdge = new Edge();
@Override
public Edge map(Tuple2<Integer, Integer> t) throws Exception {
outEdge.copyVerticesFromTuple2(t);
return outEdge;
}
}
/** Projects an edge (pair of vertices) such that the id of the first is smaller than the id of the second. */
private static class EdgeByIdProjector extends MapFunction<Edge, Edge> {
private static final long serialVersionUID = 1L;
@Override
public Edge map(Edge inEdge) throws Exception {
......@@ -77,8 +141,6 @@ public class EnumTrianglesBasic {
* Assumes that input edges share the first vertex and are in ascending order of the second vertex.
*/
private static class TriadBuilder extends GroupReduceFunction<Edge, Triad> {
private static final long serialVersionUID = 1L;
private final List<Integer> vertices = new ArrayList<Integer>();
private final Triad outTriad = new Triad();
......@@ -108,11 +170,8 @@ public class EnumTrianglesBasic {
}
}
/**
* Filters triads (three vertices connected by two edges) without a closing third edge.
*/
/** Filters triads (three vertices connected by two edges) without a closing third edge. */
private static class TriadFilter extends JoinFunction<Triad, Edge, Triad> {
private static final long serialVersionUID = 1L;
@Override
public Triad join(Triad triad, Edge edge) throws Exception {
......@@ -120,51 +179,39 @@ public class EnumTrianglesBasic {
}
}
public static void main(String[] args) throws Exception {
String edgePath = "TESTDATA";
String outPath = "STDOUT";
// parse input arguments
if(args.length > 0) {
edgePath = args[0];
}
if(args.length > 1) {
outPath = args[1];
}
// set up execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// *************************************************************************
// UTIL METHODS
// *************************************************************************
// read input data
DataSet<Edge> edges;
if(edgePath.equals("TESTDATA")) {
edges = EdgeData.getDefaultEdgeDataSet(env);
private static void parseParameters(String[] args) {
if(args.length > 0) {
// parse input arguments
fileOutput = true;
if(args.length == 2) {
edgePath = args[0];
outputPath = args[1];
} else {
System.err.println("Usage: EnumTriangleBasic <edge path> <result path>");
System.exit(1);
}
} else {
edges = env.readCsvFile(edgePath)
.fieldDelimiter(',')
.includeFields(true, true)
.types(Integer.class, Integer.class)
.map(new TupleEdgeConverter());
System.out.println("Executing Enum Triangles Basic example with built-in default data.");
System.out.println(" Provide parameters to read input data from files.");
System.out.println(" Usage: EnumTriangleBasic <edge path> <result path>");
}
// project edges by vertex id
DataSet<Edge> edgesById = edges
.map(new EdgeByIdProjector());
// build and filter triads
DataSet<Triad> triangles = edgesById
.groupBy(Edge.V1).sortGroup(Edge.V2, Order.ASCENDING).reduceGroup(new TriadBuilder())
.join(edgesById).where(Triad.V2, Triad.V3).equalTo(Edge.V1, Edge.V2).with(new TriadFilter());
// emit triangles
if(outPath.equals("STDOUT")) {
triangles.print();
}
private static DataSet<Edge> getEdgeDataSet(ExecutionEnvironment env) {
if(fileOutput) {
return env.readCsvFile(edgePath)
.fieldDelimiter(' ')
.includeFields(true, true)
.types(Integer.class, Integer.class)
.map(new TupleEdgeConverter());
} else {
triangles.writeAsCsv(outPath, "\n", ",");
return EnumTrianglesData.getDefaultEdgeDataSet(env);
}
// execute program
env.execute();
}
}
......@@ -12,7 +12,7 @@
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.example.java.triangles;
package eu.stratosphere.example.java.graph;
import java.util.ArrayList;
import java.util.Iterator;
......@@ -26,46 +26,117 @@ import eu.stratosphere.api.java.functions.GroupReduceFunction;
import eu.stratosphere.api.java.functions.JoinFunction;
import eu.stratosphere.api.java.functions.MapFunction;
import eu.stratosphere.api.java.functions.ReduceFunction;
import eu.stratosphere.example.java.triangles.util.EdgeData;
import eu.stratosphere.example.java.triangles.util.EdgeDataTypes.Edge;
import eu.stratosphere.example.java.triangles.util.EdgeDataTypes.EdgeWithDegrees;
import eu.stratosphere.example.java.triangles.util.EdgeDataTypes.Triad;
import eu.stratosphere.example.java.triangles.util.EdgeDataTypes.TupleEdgeConverter;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.example.java.graph.util.EnumTrianglesData;
import eu.stratosphere.example.java.graph.util.EnumTrianglesDataTypes.Edge;
import eu.stratosphere.example.java.graph.util.EnumTrianglesDataTypes.EdgeWithDegrees;
import eu.stratosphere.example.java.graph.util.EnumTrianglesDataTypes.Triad;
import eu.stratosphere.util.Collector;
/**
* Triangle enumeration is a preprocessing step to find closely connected parts in graphs.
* A triangle are three edges that connect three vertices with each other.
*
* <p>
* The basic algorithm works as follows:
* It groups all edges that share a common vertex and builds triads, i.e., triples of vertices
* that are connected by two edges. Finally, all triads are filtered for which no third edge exists
* that closes the triangle.
*
* For a group of n edges that share a common vertex, the number of built triads is quadratic ((n*(n-1))/2).
* <p>
* For a group of <i>n</i> edges that share a common vertex, the number of built triads is quadratic <i>((n*(n-1))/2)</i>.
* Therefore, an optimization of the algorithm is to group edges on the vertex with the smaller output degree to
* reduce the number of triads.
* This implementation extends the basic algorithm by computing output degrees of edge vertices and
* grouping on edges on the vertex with the smaller degree.
*
* This implementation assumes that edges are represented as pairs of vertices and
* vertices are represented as Integer IDs.
*
* The lines of input files need to have the following format:
* "<INT: vertexId1>,<INT: vertexId2>\n"
* <p>
* Input files are plain text files must be formatted as follows:
* <ul>
* <li>Edges are represented as pairs for vertex IDs which are separated by space
* characters. Edges are separated by new-line characters.<br>
* For example <code>"1 2\n2 12\n1 12\n42 63\n"</code> gives four (undirected) edges (1)-(2), (2)-(12), (1)-(12), and (42)-(63)
* that include a triangle
* </ul>
* <pre>
* (1)
* / \
* (2)-(12)
* </pre>
*
* For example the input:
* "10,20\n10,30\n20,30\n"
* defines three edges (10,20), (10,30), (20,30) which build a triangle.
* <p>
* This example shows how to use:
* <ul>
* <li>Custom Java objects which extend Tuple
* <li>Group Sorting
* </ul>
*
*/
@SuppressWarnings("serial")
public class EnumTrianglesOpt {
/**
* Emits for an edge the original edge and its switched version.
*/
// *************************************************************************
// PROGRAM
// *************************************************************************
public static void main(String[] args) throws Exception {
parseParameters(args);
// set up execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// read input data
DataSet<Edge> edges = getEdgeDataSet(env);
// annotate edges with degrees
DataSet<EdgeWithDegrees> edgesWithDegrees = edges
.flatMap(new EdgeDuplicator())
.groupBy(Edge.V1).sortGroup(Edge.V2, Order.ASCENDING).reduceGroup(new DegreeCounter())
.groupBy(EdgeWithDegrees.V1,EdgeWithDegrees.V2).reduce(new DegreeJoiner());
// project edges by degrees
DataSet<Edge> edgesByDegree = edgesWithDegrees
.map(new EdgeByDegreeProjector());
// project edges by vertex id
DataSet<Edge> edgesById = edgesByDegree
.map(new EdgeByIdProjector());
DataSet<Triad> triangles = edgesByDegree
// build triads
.groupBy(Edge.V1).sortGroup(Edge.V2, Order.ASCENDING).reduceGroup(new TriadBuilder())
// filter triads
.join(edgesById).where(Triad.V2,Triad.V3).equalTo(Edge.V1,Edge.V2).with(new TriadFilter());
// emit result
if(fileOutput) {
triangles.writeAsCsv(outputPath, "\n", ",");
} else {
triangles.print();
}
// execute program
env.execute("Triangle Enumeration Example");
}
// *************************************************************************
// USER FUNCTIONS
// *************************************************************************
/** Converts a Tuple2 into an Edge */
public static class TupleEdgeConverter extends MapFunction<Tuple2<Integer, Integer>, Edge> {
private final Edge outEdge = new Edge();
@Override
public Edge map(Tuple2<Integer, Integer> t) throws Exception {
outEdge.copyVerticesFromTuple2(t);
return outEdge;
}
}
/** Emits for an edge the original edge and its switched version. */
private static class EdgeDuplicator extends FlatMapFunction<Edge, Edge> {
private static final long serialVersionUID = 1L;
@Override
public void flatMap(Edge edge, Collector<Edge> out) throws Exception {
......@@ -81,7 +152,6 @@ public class EnumTrianglesOpt {
* For each emitted edge, the first vertex is the vertex with the smaller id.
*/
private static class DegreeCounter extends GroupReduceFunction<Edge, EdgeWithDegrees> {
private static final long serialVersionUID = 1L;
final ArrayList<Integer> otherVertices = new ArrayList<Integer>();
final EdgeWithDegrees outputEdge = new EdgeWithDegrees();
......@@ -130,7 +200,6 @@ public class EnumTrianglesOpt {
* degree annotation.
*/
private static class DegreeJoiner extends ReduceFunction<EdgeWithDegrees> {
private static final long serialVersionUID = 1L;
private final EdgeWithDegrees outEdge = new EdgeWithDegrees();
@Override
......@@ -149,11 +218,8 @@ public class EnumTrianglesOpt {
}
}
/**
* Projects an edge (pair of vertices) such that the first vertex is the vertex with the smaller degree.
*/
/** Projects an edge (pair of vertices) such that the first vertex is the vertex with the smaller degree. */
private static class EdgeByDegreeProjector extends MapFunction<EdgeWithDegrees, Edge> {
private static final long serialVersionUID = 1L;
private final Edge outEdge = new Edge();
......@@ -173,11 +239,8 @@ public class EnumTrianglesOpt {
}
}
/**
* Projects an edge (pair of vertices) such that the id of the first is smaller than the id of the second.
*/
/** Projects an edge (pair of vertices) such that the id of the first is smaller than the id of the second. */
private static class EdgeByIdProjector extends MapFunction<Edge, Edge> {
private static final long serialVersionUID = 1L;
@Override
public Edge map(Edge inEdge) throws Exception {
......@@ -197,7 +260,6 @@ public class EnumTrianglesOpt {
* Assumes that input edges share the first vertex and are in ascending order of the second vertex.
*/
private static class TriadBuilder extends GroupReduceFunction<Edge, Triad> {
private static final long serialVersionUID = 1L;
private final List<Integer> vertices = new ArrayList<Integer>();
private final Triad outTriad = new Triad();
......@@ -228,11 +290,8 @@ public class EnumTrianglesOpt {
}
}
/**
* Filters triads (three vertices connected by two edges) without a closing third edge.
*/
/** Filters triads (three vertices connected by two edges) without a closing third edge. */
private static class TriadFilter extends JoinFunction<Triad, Edge, Triad> {
private static final long serialVersionUID = 1L;
@Override
public Triad join(Triad triad, Edge edge) throws Exception {
......@@ -240,60 +299,43 @@ public class EnumTrianglesOpt {
}
}
public static void main(String[] args) throws Exception {
String edgePath = "TESTDATA";
String outPath = "STDOUT";
// *************************************************************************
// UTIL METHODS
// *************************************************************************
private static boolean fileOutput = false;
private static String edgePath = null;
private static String outputPath = null;
private static void parseParameters(String[] args) {
// parse input arguments
if(args.length > 0) {
edgePath = args[0];
}
if(args.length > 1) {
outPath = args[1];
}
// set up execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// read input data
DataSet<Edge> edges;
if(edgePath.equals("TESTDATA")) {
edges = EdgeData.getDefaultEdgeDataSet(env);
// parse input arguments
fileOutput = true;
if(args.length == 2) {
edgePath = args[0];
outputPath = args[1];
} else {
System.err.println("Usage: EnumTriangleBasic <edge path> <result path>");
System.exit(1);
}
} else {
edges = env.readCsvFile(edgePath)
.fieldDelimiter(',')
.includeFields(true, true)
.types(Integer.class, Integer.class)
.map(new TupleEdgeConverter());
System.out.println("Executing Enum Triangles Opt example with built-in default data.");
System.out.println(" Provide parameters to read input data from files.");
System.out.println(" Usage: EnumTriangleBasic <edge path> <result path>");
}
// annotate edges with degrees
DataSet<EdgeWithDegrees> edgesWithDegrees = edges
.flatMap(new EdgeDuplicator())
.groupBy(Edge.V1).sortGroup(Edge.V2, Order.ASCENDING).reduceGroup(new DegreeCounter())
.groupBy(EdgeWithDegrees.V1,EdgeWithDegrees.V2).reduce(new DegreeJoiner());
// project edges by degrees
DataSet<Edge> edgesByDegree = edgesWithDegrees
.map(new EdgeByDegreeProjector());
// project edges by vertex id
DataSet<Edge> edgesById = edgesByDegree
.map(new EdgeByIdProjector());
// build and filter triads
DataSet<Triad> triangles = edgesByDegree
.groupBy(Edge.V1).sortGroup(Edge.V2, Order.ASCENDING).reduceGroup(new TriadBuilder())
.join(edgesById).where(Triad.V2,Triad.V3).equalTo(Edge.V1,Edge.V2).with(new TriadFilter());
// emit triangles
if(outPath.equals("STDOUT")) {
triangles.print();
}
private static DataSet<Edge> getEdgeDataSet(ExecutionEnvironment env) {
if(fileOutput) {
return env.readCsvFile(edgePath)
.fieldDelimiter(' ')
.includeFields(true, true)
.types(Integer.class, Integer.class)
.map(new TupleEdgeConverter());
} else {
triangles.writeAsCsv(outPath, "\n", ",");
return EnumTrianglesData.getDefaultEdgeDataSet(env);
}
// execute program
env.execute();
}
}
......@@ -27,68 +27,104 @@ import eu.stratosphere.api.java.functions.FlatMapFunction;
import eu.stratosphere.api.java.functions.GroupReduceFunction;
import eu.stratosphere.api.java.functions.MapFunction;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.example.java.graph.util.PageRankData;
import eu.stratosphere.util.Collector;
/**
* A basic implementation of the page rank algorithm using a bulk iteration.
*
* <p>
* This implementation requires a set of pages (vertices) with associated ranks and a set
* of directed links (edges) as input and works as follows. <br>
* In each iteration, the rank of every page is evenly distributed to all pages it points to.
* Each page collects the partial ranks of all pages that point to it, sums them up, and apply a dampening factor to the sum.
* The result is the new rank of the page. A new iteration is started with the new ranks of all pages.
* This implementation terminates after a fixed number of iterations.
*
* <p>
* Input files are plain text files must be formatted as follows:
* <ul>
* <li>Pages represented as an (long) ID and a (double) rank separated by new-line characters.<br>
* For example <code>"1 0.4\n2 0.3\n12 0.15\n42 0.05\n63 0.1\n"</code> gives five pages with associated ranks
* (1, 0.4), (2, 0.3), (12, 0.15), (42, 0.05), and (63, 0.1). Ranks should sum up to 1.0.
* <li>Page links are represented as pairs of page IDs which are separated by space
* characters. Edges are separated by new-line characters.<br>
* For example <code>"1 2\n2 12\n1 12\n42 63\n"</code> gives four (directed) edges (1)-(2), (2)-(12), (1)-(12), and (42)-(63).
* For this simple implementation it is required that each page has at least one incoming and one outgoing link (a page can point to itself).
* </ul>
*
* <p>
* This example shows how to use:
* <ul>
* <li>Bulk Iterations
* <li>Default Join
* </ul>
*
*
*/
@SuppressWarnings("serial")
public class SimplePageRank {
public class PageRankBasic {
private static final double DAMPENING_FACTOR = 0.85;
private static final double EPSILON = 0.0001;
// *************************************************************************
// PROGRAM
// *************************************************************************
public static void main(String[] args) throws Exception {
if (args.length < 5) {
System.err.println("Usage: SimpePageRank <vertex with initial rank input> <edges path> <output path> <num vertices> <num iterations>");
return;
}
final String pageWithRankInputPath = args[0];
final String adjacencyListInputPath = args[1];
final String outputPath = args[2];
final int numVertices = Integer.parseInt(args[3]);
final int maxIterations = Integer.parseInt(args[4]);
runPageRank(pageWithRankInputPath, adjacencyListInputPath, outputPath, numVertices, maxIterations);
}
public static void runPageRank(String verticesPath, String edgesPath, String outputPath, int numVertices, int maxIterations) throws Exception {
parseParameters(args);
// set up execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// vertices: vertexID, initialRank
DataSet<Tuple2<Long, Double>> pageWithRankInput = env.readCsvFile(verticesPath).types(Long.class, Double.class);
// edge adjacency list: vertexID, vertexID[]
DataSet<Tuple2<Long, Long[]>> adjacencyListInput = env.readCsvFile(edgesPath).types(Long.class, Long.class)
.groupBy(0).reduceGroup(new OutgoingEdgeCounter());
// get input data
DataSet<Tuple2<Long, Double>> pageWithRankInput = getPageWithRankDataSet(env);
DataSet<Tuple2<Long, Long>> edgeInput = getEdgeDataSet(env);
// build adjecency list from edge input
DataSet<Tuple2<Long, Long[]>> adjacencyListInput =
edgeInput.groupBy(0).reduceGroup(new BuildOutgoingEdgeList());
// set iterative data set
IterativeDataSet<Tuple2<Long, Double>> iteration = pageWithRankInput.iterate(maxIterations);
DataSet<Tuple2<Long, Double>> newRanks = iteration
// join pages with outgoing edges and distribute rank
.join(adjacencyListInput).where(0).equalTo(0).flatMap(new JoinVertexWithEdgesMatch())
// collect and sum ranks
.groupBy(0).aggregate(SUM, 1)
// apply dampening factor
.map(new Dampener(numVertices));
iteration.closeWith(newRanks,
newRanks.join(iteration).where(0).equalTo(0).filter(new EpsilonFilter())) // termination condition
.writeAsCsv(outputPath);
DataSet<Tuple2<Long, Double>> finalPageRanks = iteration.closeWith(
newRanks,
newRanks.join(iteration).where(0).equalTo(0)
// termination condition
.filter(new EpsilonFilter()));
// emit result
if(fileOutput) {
finalPageRanks.writeAsCsv(outputPath, "\n", " ");
} else {
finalPageRanks.print();
}
// execute program
env.execute("Basic Page Rank Example");
env.execute();
}
// --------------------------------------------------------------------------------------------
// The user-defined functions for parts of page rank
// --------------------------------------------------------------------------------------------
// *************************************************************************
// USER FUNCTIONS
// *************************************************************************
/**
* A reduce function that takes a sequence of edges and builds the adjacency list for the vertex where the edges
* originate. Run as a preprocessing step.
*/
public static final class OutgoingEdgeCounter extends GroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long[]>> {
public static final class BuildOutgoingEdgeList extends GroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long[]>> {
private final ArrayList<Long> neighbors = new ArrayList<Long>();
......@@ -151,4 +187,61 @@ public class SimplePageRank {
return Math.abs(value.f0.f1 - value.f1.f1) > EPSILON;
}
}
// *************************************************************************
// UTIL METHODS
// *************************************************************************
private static boolean fileOutput = false;
private static String pageWithRankInputPath = null;
private static String edgeInputPath = null;
private static String outputPath = null;
private static int numVertices = 0;
private static int maxIterations = 10;
private static void parseParameters(String[] args) {
if(args.length > 0) {
if(args.length == 5) {
fileOutput = true;
pageWithRankInputPath = args[0];
edgeInputPath = args[1];
outputPath = args[2];
numVertices = Integer.parseInt(args[3]);
maxIterations = Integer.parseInt(args[4]);
} else {
System.err.println("Usage: PageRankBasic <vertex with initial rank input> <edges path> <output path> <num vertices> <num iterations>");
System.exit(1);
}
} else {
System.out.println("Executing PageRank Basic example with default parameters and built-in default data.");
System.out.println(" Provide parameters to read input data from files.");
System.out.println(" Usage: PageRankBasic <vertex with initial rank input> <edges path> <output path> <num vertices> <num iterations>");
numVertices = PageRankData.getNumberOfPages();
}
}
private static DataSet<Tuple2<Long, Double>> getPageWithRankDataSet(ExecutionEnvironment env) {
if(fileOutput) {
return env.readCsvFile(pageWithRankInputPath)
.fieldDelimiter(' ')
.lineDelimiter("\n")
.types(Long.class, Double.class);
} else {
return PageRankData.getDefaultPageWithRankDataSet(env);
}
}
private static DataSet<Tuple2<Long, Long>> getEdgeDataSet(ExecutionEnvironment env) {
if(fileOutput) {
return env.readCsvFile(edgeInputPath)
.fieldDelimiter(' ')
.lineDelimiter("\n")
.types(Long.class, Long.class);
} else {
return PageRankData.getDefaultEdgeDataSet(env);
}
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
......@@ -10,66 +9,45 @@
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.example.java.wordcount;
import static eu.stratosphere.api.java.aggregation.Aggregations.SUM;
package eu.stratosphere.example.java.graph.util;
import java.util.ArrayList;
import java.util.List;
import eu.stratosphere.api.java.DataSet;
import eu.stratosphere.api.java.ExecutionEnvironment;
import eu.stratosphere.api.java.functions.FlatMapFunction;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.util.Collector;
/**
* Implements the "WordCount" program which takes a collection of strings and counts the number of
* occurrences of each word in these strings. Finally, the result will be written to the
* console.
*/
@SuppressWarnings("serial")
public class WordCountCollection {
/**
* Runs the WordCount program.
*
* @param args Command line parameters, ignored.
*/
public static void main(String[] args) throws Exception {
// get the environment as starting point
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// define the strings to be analyzed
DataSet<String> text = env.fromElements("To be", "or not to be", "or to be still", "and certainly not to be not at all", "is that the question?");
public class ConnectedComponentsData {
public static DataSet<Long> getDefaultVertexDataSet(ExecutionEnvironment env) {
// split the strings into tuple of (word,1), group by field "0" and sum up field "1"
DataSet<Tuple2<String, Integer>> result = text.flatMap(new Tokenizer()).groupBy(0).aggregate(SUM, 1);
return env.fromElements(
1L, 2L, 3L, 4L, 5L,
6L, 7L, 8L, 9L, 10L,
11L, 12L, 13L, 14L, 15L);
}
public static DataSet<Tuple2<Long, Long>> getDefaultEdgeDataSet(ExecutionEnvironment env) {
// print the result on the console
result.print();
List<Tuple2<Long, Long>> data = new ArrayList<Tuple2<Long, Long>>();
data.add(new Tuple2<Long, Long>(1L, 2L));
data.add(new Tuple2<Long, Long>(2L, 3L));
data.add(new Tuple2<Long, Long>(2L, 4L));
data.add(new Tuple2<Long, Long>(3L, 5L));
data.add(new Tuple2<Long, Long>(6L, 7L));
data.add(new Tuple2<Long, Long>(8L, 9L));
data.add(new Tuple2<Long, Long>(8L, 10L));
data.add(new Tuple2<Long, Long>(5L, 11L));
data.add(new Tuple2<Long, Long>(11L, 12L));
data.add(new Tuple2<Long, Long>(10L, 13L));
data.add(new Tuple2<Long, Long>(9L, 14L));
data.add(new Tuple2<Long, Long>(13L, 14L));
data.add(new Tuple2<Long, Long>(1L, 15L));
// execute the defined program
env.execute();
return env.fromCollection(data);
}
/**
* Implements the string tokenizer that splits sentences into words as a user-defined
* FlatMapFunction. The function takes a line (String) and splits it into
* multiple pairs in the form of "(word,1)" (Tuple2<String, Integer>).
*/
public static final class Tokenizer extends FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
// normalize and split the line
String[] tokens = value.toLowerCase().split("\\W+");
// emit the pairs
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<String, Integer>(token, 1));
}
}
}
}
}
......@@ -12,13 +12,14 @@
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.example.java.triangles.util;
package eu.stratosphere.example.java.graph.util;
import eu.stratosphere.api.java.DataSet;
import eu.stratosphere.api.java.ExecutionEnvironment;
import eu.stratosphere.example.java.triangles.util.EdgeDataTypes.Edge;
import eu.stratosphere.example.java.graph.util.EnumTrianglesDataTypes.Edge;
public class EdgeData {
public class EnumTrianglesData {
public static DataSet<Edge> getDefaultEdgeDataSet(ExecutionEnvironment env) {
......
......@@ -12,14 +12,14 @@
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.example.java.triangles.util;
import eu.stratosphere.api.java.functions.MapFunction;
package eu.stratosphere.example.java.graph.util;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.api.java.tuple.Tuple3;
import eu.stratosphere.api.java.tuple.Tuple4;
public class EdgeDataTypes {
public class EnumTrianglesDataTypes {
public static class Edge extends Tuple2<Integer, Integer> {
private static final long serialVersionUID = 1L;
......@@ -110,17 +110,4 @@ public class EdgeDataTypes {
}
public static class TupleEdgeConverter extends MapFunction<Tuple2<Integer, Integer>, Edge> {
private static final long serialVersionUID = 1L;
private final Edge outEdge = new Edge();
@Override
public Edge map(Tuple2<Integer, Integer> t) throws Exception {
outEdge.copyVerticesFromTuple2(t);
return outEdge;
}
}
}
......@@ -12,61 +12,70 @@
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.example.java.broadcastvar;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
package eu.stratosphere.example.java.graph.util;
import java.util.ArrayList;
import java.util.List;
import eu.stratosphere.api.java.DataSet;
import eu.stratosphere.api.java.ExecutionEnvironment;
import eu.stratosphere.api.java.functions.MapFunction;
import eu.stratosphere.configuration.Configuration;
@SuppressWarnings("serial")
public class BroadcastVariableExample {
public static class ToUppercaseMapper extends MapFunction<String, String> {
// Lookup table for Strings to uppercase
private Set<String> toUppercase;
import eu.stratosphere.api.java.tuple.Tuple2;
@Override
public void open(Configuration parameters) throws Exception {
// You can access broadcast variables via `getRuntimeContext().getBroadcastVariable(String)`.
//
// The broadcasted variable is registered under the previously provided name and the data set is accessed
// as a Collection<T> over the broadcasted data set (where T is the type of the broadcasted DataSet<T>).
Collection<String> broadcastedData = getRuntimeContext().getBroadcastVariable("toUppercase");
public class PageRankData {
this.toUppercase = new HashSet<String>(broadcastedData);
}
@Override
public String map(String value) throws Exception {
return this.toUppercase.contains(value) ? value.toUpperCase() : value;
private static int numPages = 15;
public static DataSet<Tuple2<Long, Double>> getDefaultPageWithRankDataSet(ExecutionEnvironment env) {
double initRank = 1.0 / numPages;
List<Tuple2<Long, Double>> data = new ArrayList<Tuple2<Long, Double>>();
for(int i=0; i<numPages; i++) {
data.add(new Tuple2<Long, Double>(i+1L, initRank));
}
return env.fromCollection(data);
}
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// This example program takes the data set `lorem` and uppercases every String, which is also included in the
// `toUppercase` data set.
//
// The `toUppercase` data set is BROADCASTED to the map operator, which creates a lookup table from it. The
// lookup tables is then used in the map method to decide whether to uppercase a given String or not.
DataSet<String> toUppercase = env.fromElements("lorem", "ipsum");
DataSet<String> lorem = env.fromElements("lorem", "ipsum", "dolor", "sit", "amet");
lorem.map(new ToUppercaseMapper())
// You can broadcast a data set to an operator via `withBroadcastSet(DataSet<T>, String)`.
//
// The broadcast variable will be registered at the operator under the provided name.
.withBroadcastSet(toUppercase, "toUppercase")
.print();
env.execute("Broadcast Variable Example");
public static DataSet<Tuple2<Long, Long>> getDefaultEdgeDataSet(ExecutionEnvironment env) {
List<Tuple2<Long, Long>> data = new ArrayList<Tuple2<Long, Long>>();
data.add(new Tuple2<Long, Long>(1L, 2L));
data.add(new Tuple2<Long, Long>(1L, 15L));
data.add(new Tuple2<Long, Long>(2L, 3L));
data.add(new Tuple2<Long, Long>(2L, 4L));
data.add(new Tuple2<Long, Long>(2L, 5L));
data.add(new Tuple2<Long, Long>(2L, 6L));
data.add(new Tuple2<Long, Long>(2L, 7L));
data.add(new Tuple2<Long, Long>(3L, 13L));
data.add(new Tuple2<Long, Long>(4L, 2L));
data.add(new Tuple2<Long, Long>(5L, 11L));
data.add(new Tuple2<Long, Long>(5L, 12L));
data.add(new Tuple2<Long, Long>(6L, 1L));
data.add(new Tuple2<Long, Long>(6L, 7L));
data.add(new Tuple2<Long, Long>(6L, 8L));
data.add(new Tuple2<Long, Long>(7L, 1L));
data.add(new Tuple2<Long, Long>(7L, 8L));
data.add(new Tuple2<Long, Long>(8L, 1L));
data.add(new Tuple2<Long, Long>(8L, 9L));
data.add(new Tuple2<Long, Long>(8L, 10L));
data.add(new Tuple2<Long, Long>(9L, 14L));
data.add(new Tuple2<Long, Long>(9L, 1L));
data.add(new Tuple2<Long, Long>(10L, 1L));
data.add(new Tuple2<Long, Long>(10L, 13L));
data.add(new Tuple2<Long, Long>(11L, 12L));
data.add(new Tuple2<Long, Long>(11L, 1L));
data.add(new Tuple2<Long, Long>(12L, 1L));
data.add(new Tuple2<Long, Long>(13L, 14L));
data.add(new Tuple2<Long, Long>(14L, 12L));
data.add(new Tuple2<Long, Long>(15L, 1L));
return env.fromCollection(data);
}
public static int getNumberOfPages() {
return numPages;
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.example.java.incremental.pagerank;
import java.util.Iterator;
import eu.stratosphere.api.java.DataSet;
import eu.stratosphere.api.java.DeltaIteration;
import eu.stratosphere.api.java.ExecutionEnvironment;
import eu.stratosphere.api.java.functions.GroupReduceFunction;
import eu.stratosphere.api.java.functions.JoinFunction;
import eu.stratosphere.api.java.functions.MapFunction;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.api.java.tuple.Tuple3;
import eu.stratosphere.util.Collector;
public class SimpleDeltaPageRank {
public static final class RankComparisonMatch extends JoinFunction<Tuple2<Long, Double>, Tuple2<Long, Double>, Tuple2<Long, Double>> {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<Long, Double> join(Tuple2<Long, Double> vertexWithDelta,
Tuple2<Long, Double> vertexWithOldRank) throws Exception {
return new Tuple2<Long, Double>(vertexWithOldRank.f0, vertexWithDelta.f1 + vertexWithOldRank.f1);
}
}
// public static final class UpdateRankReduceDelta extends ReduceFunction<Tuple2<Long, Double>> {
//
// private static final long serialVersionUID = 1L;
//
// @Override
// public Tuple2<Long, Double> reduce(Tuple2<Long, Double> value1,
// Tuple2<Long, Double> value2) throws Exception {
//
// double rankSum = value1.f1 + value2.f1;
//
// // ignore small deltas
// if (Math.abs(rankSum) > 0.00001) {
// return new Tuple2<Long, Double>(value1.f0, rankSum);
// }
// return null;
// }
// }
public static final class UpdateRankReduceDelta extends GroupReduceFunction<Tuple2<Long, Double>, Tuple2<Long, Double>> {
private static final long serialVersionUID = 1L;
@Override
public void reduce(Iterator<Tuple2<Long, Double>> values,
Collector<Tuple2<Long, Double>> out) throws Exception {
double rankSum = 0.0;
Tuple2<Long, Double> currentTuple = null;
while (values.hasNext()) {
currentTuple = values.next();
rankSum += currentTuple.f1;
}
// ignore small deltas
if (Math.abs(rankSum) > 0.00001) {
out.collect(new Tuple2<Long, Double>(currentTuple.f0, rankSum));
}
}
}
public static final class PRDependenciesComputationMatchDelta extends JoinFunction<Tuple2<Long, Double>, Tuple3<Long, Long, Long>, Tuple2<Long, Double>> {
private static final long serialVersionUID = 1L;
/*
* (vId, rank) x (srcId, trgId, weight) => (trgId, rank / weight)
*/
@Override
public Tuple2<Long, Double> join(Tuple2<Long, Double> vertexWithRank,
Tuple3<Long, Long, Long> edgeWithWeight) throws Exception {
return new Tuple2<Long, Double>(edgeWithWeight.f1, (Double) (vertexWithRank.f1 / edgeWithWeight.f2));
}
}
public static final class Mapper extends MapFunction<Tuple2<Long, Double>, Tuple2<Long, Double>> {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<Long, Double> map(Tuple2<Long, Double> value)
throws Exception {
return value;
}
}
public static void main(String[] args) throws Exception {
if (args.length < 5) {
System.err.println("Usage: SimpePageRank <DOP> <pageWithRankInputPath> <adjacencyListInputPath> <outputPath> <numIterations>");
return;
}
final int dop = Integer.parseInt(args[0]);
final String solutionSetInputPath = args[1];
final String deltasInputPath = args[2];
final String dependencySetInputPath = args[3];
final String outputPath = args[4];
final int maxIterations = Integer.parseInt(args[5]);
run(dop, solutionSetInputPath, deltasInputPath, dependencySetInputPath, outputPath, maxIterations, true);
}
public static void run(int dop, String solutionSetInputPath, String deltasInputPath, String dependencySetInputPath, String outputPath, int maxIterations, boolean terminationCriterion) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setDegreeOfParallelism(dop);
DataSet<Tuple2<Long, Double>> initialSolutionSet = env.readCsvFile(solutionSetInputPath).fieldDelimiter(' ').types(Long.class, Double.class).map(new Mapper());
DataSet<Tuple2<Long, Double>> initialDeltaSet = env.readCsvFile(deltasInputPath).fieldDelimiter(' ').types(Long.class, Double.class);
DataSet<Tuple3<Long, Long, Long>> dependencySetInput = env.readCsvFile(dependencySetInputPath).fieldDelimiter(' ').types(Long.class, Long.class, Long.class);
int keyPosition = 0;
DeltaIteration<Tuple2<Long, Double>, Tuple2<Long, Double>> iteration = initialSolutionSet.iterateDelta(initialDeltaSet, maxIterations, keyPosition);
DataSet<Tuple2<Long, Double>> updateRanks = iteration.getWorkset().join(dependencySetInput).where(0).equalTo(0).with(new PRDependenciesComputationMatchDelta())
.groupBy(0).reduceGroup(new UpdateRankReduceDelta());
DataSet<Tuple2<Long, Double>> oldRankComparison = updateRanks.join(iteration.getSolutionSet()).where(0).equalTo(0).with(new RankComparisonMatch());
iteration.closeWith(oldRankComparison, updateRanks).writeAsCsv(outputPath);
env.execute();
}
}
......@@ -16,10 +16,9 @@ package eu.stratosphere.example.java.relational;
import eu.stratosphere.api.java.DataSet;
import eu.stratosphere.api.java.ExecutionEnvironment;
import eu.stratosphere.api.java.aggregation.Aggregations;
import eu.stratosphere.api.java.functions.FilterFunction;
import eu.stratosphere.api.java.functions.JoinFunction;
import eu.stratosphere.api.java.functions.MapFunction;
import eu.stratosphere.api.java.functions.ReduceFunction;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.api.java.tuple.Tuple3;
import eu.stratosphere.api.java.tuple.Tuple4;
......@@ -28,19 +27,21 @@ import eu.stratosphere.api.java.tuple.Tuple6;
/**
* This program implements a modified version of the TPC-H query 10.
*
* The original query can be found at
* http://www.tpc.org/tpch/spec/tpch2.16.0.pdf (page 45).
* <a href="http://www.tpc.org/tpch/spec/tpch2.16.0.pdf">http://www.tpc.org/tpch/spec/tpch2.16.0.pdf</a> (page 45).
*
* <p>
* This program implements the following SQL equivalent:
*
* <p>
* <code><pre>
* SELECT
* c_custkey,
* c_custkey,
* c_name,
* c_address,
* n_name,
* c_acctbal
* sum(l_extendedprice * (1 - l_discount)) as revenue,
* SUM(l_extendedprice * (1 - l_discount)) AS revenue,
* FROM
* customer,
* orders,
......@@ -58,144 +59,178 @@ import eu.stratosphere.api.java.tuple.Tuple6;
* c_acctbal,
* n_name,
* c_address
* </pre></code>
*
* <p>
* Compared to the original TPC-H query this version does not print
* c_phone and c_comment, only filters by years greater than 1990 instead of
* a period of 3 months, and does not sort the result by revenue.
*
* <p>
* Input files are plain text CSV files using the pipe character ('|') as field separator
* as generated by the TPC-H data generator which is available at <a href="http://www.tpc.org/tpch/">http://www.tpc.org/tpch/</a>.
*
* <p>
* This example shows how to use:
* <ul>
* <li> tuple data types
* <li> inline-defined functions
* <li> projection and join projection
* <li> build-in aggregation functions
* </ul>
*/
@SuppressWarnings("serial")
public class TPCHQuery10 {
// *************************************************************************
// PROGRAM
// *************************************************************************
public static void main(String[] args) throws Exception {
if (args.length < 5) {
System.err.println("Parameters: <customer-csv path> <orders-csv path> <lineitem-csv path> <nation-csv path> <result path>");
return;
}
final String customerPath = args[0];
final String ordersPath = args[1];
final String lineitemPath = args[2];
final String nationPath = args[3];
final String outPath = args[4];
parseParameters(args);
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// read in customer table file
// customer: custkey, name, address, nationkey, acctbal
DataSet<Tuple5<Integer, String, String, Integer, Double>> customers = env.readCsvFile(customerPath).fieldDelimiter('|')
.includeFields("11110100").types(Integer.class, String.class, String.class, Integer.class, Double.class);
// get customer data set: (custkey, name, address, nationkey, acctbal)
DataSet<Tuple5<Integer, String, String, Integer, Double>> customers = getCustomerDataSet(env);
// read in orders table file
// order: orderkey, custkey, orderdate
DataSet<Tuple3<Integer, Integer, String>> orders = env.readCsvFile(ordersPath).fieldDelimiter('|').includeFields("110010000")
.types(Integer.class, Integer.class, String.class);
// get orders data set: (orderkey, custkey, orderdate)
DataSet<Tuple3<Integer, Integer, String>> orders = getOrdersDataSet(env);
// read in lineitem table file
// lineitem: orderkey, extendedprice, discount, returnflag
DataSet<Tuple4<Integer, Double, Double, String>> lineitems = env.readCsvFile(lineitemPath).fieldDelimiter('|')
.includeFields("1000011010000000").types(Integer.class, Double.class, Double.class, String.class);
// get lineitem data set: (orderkey, extendedprice, discount, returnflag)
DataSet<Tuple4<Integer, Double, Double, String>> lineitems = getLineitemDataSet(env);
// read in nation table file
// nation: nationkey, name
DataSet<Tuple2<Integer, String>> nations = env.readCsvFile(nationPath).fieldDelimiter('|').includeFields("1100")
.types(Integer.class, String.class);
// get nation data set: (nationkey, name)
DataSet<Tuple2<Integer, String>> nations = getNationsDataSet(env);
// orders filtered by year: orderkey, custkey
DataSet<Tuple2<Integer, Integer>> ordersFilteredByYear = orders
// orders filtered by year: (orderkey, custkey)
DataSet<Tuple2<Integer, Integer>> ordersFilteredByYear =
// filter by year
.filter(new FilterFunction<Tuple3<Integer,Integer, String>>() {
@Override
public boolean filter(Tuple3<Integer, Integer, String> t) {
int year = Integer.parseInt(t.f2.substring(0, 4));
return year > 1990;
}
})
// remove date as it is not necessary anymore
.map(new MapFunction<Tuple3<Integer,Integer,String>, Tuple2<Integer, Integer>>() {
@Override
public Tuple2<Integer, Integer> map(Tuple3<Integer, Integer, String> t) {
return new Tuple2<Integer, Integer>(t.f0, t.f1);
}
});
orders.filter(
new FilterFunction<Tuple3<Integer,Integer, String>>() {
@Override
public boolean filter(Tuple3<Integer, Integer, String> t) {
int year = Integer.parseInt(t.f2.substring(0, 4));
return year > 1990;
}
})
// project fields out that are no longer required
.project(0,1).types(Integer.class, Integer.class);
// lineitems filtered by flag: orderkey, extendedprice, discount
DataSet<Tuple3<Integer, Double, Double>> lineitemsFilteredByFlag = lineitems
// lineitems filtered by flag: (orderkey, extendedprice, discount)
DataSet<Tuple3<Integer, Double, Double>> lineitemsFilteredByFlag =
// filter by flag
.filter(new FilterFunction<Tuple4<Integer, Double, Double, String>>() {
@Override
public boolean filter(Tuple4<Integer, Double, Double, String> t)
throws Exception {
return t.f3.equals("R");
}
})
// remove flag as it is not necessary anymore
.map(new MapFunction<Tuple4<Integer, Double, Double, String>, Tuple3<Integer, Double, Double>>() {
@Override
public Tuple3<Integer, Double, Double> map(Tuple4<Integer, Double, Double, String> t) {
return new Tuple3<Integer, Double, Double>(t.f0, t.f1, t.f2);
}
});
lineitems.filter(new FilterFunction<Tuple4<Integer, Double, Double, String>>() {
@Override
public boolean filter(Tuple4<Integer, Double, Double, String> t)
throws Exception {
return t.f3.equals("R");
}
})
// project fields out that are no longer required
.project(0,1,2).types(Integer.class, Double.class, Double.class);
// join orders with lineitems
// custkey, extendedprice, discount
DataSet<Tuple3<Integer, Double, Double>> lineitemsOfCustomerKey = ordersFilteredByYear.joinWithHuge(lineitemsFilteredByFlag)
.where(0).equalTo(0)
.with(new JoinFunction<Tuple2<Integer, Integer>, Tuple3<Integer, Double, Double>, Tuple3<Integer, Double, Double>>() {
@Override
public Tuple3<Integer, Double, Double> join(Tuple2<Integer, Integer> o, Tuple3<Integer, Double, Double> l) {
return new Tuple3<Integer, Double, Double>(o.f1, l.f1, l.f2);
}
});
// join orders with lineitems: (custkey, extendedprice, discount)
DataSet<Tuple3<Integer, Double, Double>> lineitemsOfCustomerKey =
ordersFilteredByYear.joinWithHuge(lineitemsFilteredByFlag)
.where(0).equalTo(0)
.projectFirst(1).projectSecond(1,2)
.types(Integer.class, Double.class, Double.class);
// aggregate for revenue
// custkey, revenue
// aggregate for revenue: (custkey, revenue)
DataSet<Tuple2<Integer, Double>> revenueOfCustomerKey = lineitemsOfCustomerKey
// calculate the revenue for each item
.map(new MapFunction<Tuple3<Integer, Double, Double>, Tuple2<Integer, Double>>() {
@Override
public Tuple2<Integer, Double> map(Tuple3<Integer, Double, Double> t) {
// revenue per item = l_extendedprice * (1 - l_discount)
return new Tuple2<Integer, Double>(t.f0, t.f1 * (1 - t.f2));
}
})
@Override
public Tuple2<Integer, Double> map(Tuple3<Integer, Double, Double> t) {
// revenue per item = l_extendedprice * (1 - l_discount)
return new Tuple2<Integer, Double>(t.f0, t.f1 * (1 - t.f2));
}
})
// aggregate the revenues per item to revenue per customer
.groupBy(0).reduce(new ReduceFunction<Tuple2<Integer,Double>>() {
@Override
public Tuple2<Integer, Double> reduce(Tuple2<Integer, Double> t1, Tuple2<Integer, Double> t2) {
return new Tuple2<Integer, Double>(t1.f0, t1.f1+t2.f1);
}
});
.groupBy(0).aggregate(Aggregations.SUM, 1);
// join customer with nation
// custkey, name, address, nationname, acctbal
// join customer with nation (custkey, name, address, nationname, acctbal)
DataSet<Tuple5<Integer, String, String, String, Double>> customerWithNation = customers
.joinWithTiny(nations)
.where(3)
.equalTo(0)
.with(new JoinFunction<Tuple5<Integer, String, String, Integer, Double>, Tuple2<Integer, String>, Tuple5<Integer, String, String, String, Double>>() {
@Override
public Tuple5<Integer, String, String, String, Double> join(Tuple5<Integer, String, String, Integer, Double> c, Tuple2<Integer, String> n) {
return new Tuple5<Integer, String, String, String, Double>(c.f0, c.f1, c.f2, n.f1, c.f4);
}
});
.joinWithTiny(nations)
.where(3).equalTo(0)
.projectFirst(0,1,2).projectSecond(1).projectFirst(4)
.types(Integer.class, String.class, String.class, String.class, Double.class);
// join customer (with nation) with revenue (custkey, name, address, nationname, acctbal, revenue)
DataSet<Tuple6<Integer, String, String, String, Double, Double>> customerWithRevenue =
customerWithNation.join(revenueOfCustomerKey)
.where(0).equalTo(0)
.projectFirst(0,1,2,3,4).projectSecond(1)
.types(Integer.class, String.class, String.class, String.class, Double.class, Double.class);
// join customer (with nation) with revenue
// custkey, name, address, nationname, acctbal, revenue
DataSet<Tuple6<Integer, String, String, String, Double, Double>> customerWithRevenue = customerWithNation
.join(revenueOfCustomerKey)
.where(0)
.equalTo(0)
.with(new JoinFunction<Tuple5<Integer, String, String, String, Double>, Tuple2<Integer, Double>, Tuple6<Integer, String, String, String, Double, Double>>() {
@Override
public Tuple6<Integer, String, String, String, Double, Double> join(Tuple5<Integer, String, String, String, Double> c, Tuple2<Integer, Double> r) {
return new Tuple6<Integer, String, String, String, Double, Double>(c.f0, c.f1, c.f2, c.f3, c.f4, r.f1);
}
});
// emit result
customerWithRevenue.writeAsCsv(outputPath);
// execute program
env.execute("TPCH Query 10 Example");
}
// *************************************************************************
// UTIL METHODS
// *************************************************************************
private static String customerPath;
private static String ordersPath;
private static String lineitemPath;
private static String nationPath;
private static String outputPath;
private static void parseParameters(String[] programArguments) {
if(programArguments.length > 0) {
if(programArguments.length == 5) {
customerPath = programArguments[0];
ordersPath = programArguments[1];
lineitemPath = programArguments[2];
nationPath = programArguments[3];
outputPath = programArguments[4];
} else {
System.err.println("Usage: TPCHQuery10 <customer-csv path> <orders-csv path> <lineitem-csv path> <nation-csv path> <result path>");
System.exit(1);
}
} else {
System.err.println("This program expects data from the TPC-H benchmark as input data.\n" +
" Due to legal restrictions, we can not ship generated data.\n" +
" You can find the TPC-H data generator at http://www.tpc.org/tpch/.\n" +
" Usage: TPCHQuery10 <customer-csv path> <orders-csv path> <lineitem-csv path> <nation-csv path> <result path>");
System.exit(1);
}
}
private static DataSet<Tuple5<Integer, String, String, Integer, Double>> getCustomerDataSet(ExecutionEnvironment env) {
return env.readCsvFile(customerPath)
.fieldDelimiter('|')
.includeFields("11110100")
.types(Integer.class, String.class, String.class, Integer.class, Double.class);
}
private static DataSet<Tuple3<Integer, Integer, String>> getOrdersDataSet(ExecutionEnvironment env) {
return env.readCsvFile(ordersPath)
.fieldDelimiter('|')
.includeFields("110010000")
.types(Integer.class, Integer.class, String.class);
}
// write the result and execute
customerWithRevenue.writeAsCsv(outPath);
env.execute();
private static DataSet<Tuple4<Integer, Double, Double, String>> getLineitemDataSet(ExecutionEnvironment env) {
return env.readCsvFile(lineitemPath)
.fieldDelimiter('|')
.includeFields("1000011010000000")
.types(Integer.class, Double.class, Double.class, String.class);
}
private static DataSet<Tuple2<Integer, String>> getNationsDataSet(ExecutionEnvironment env) {
return env.readCsvFile(nationPath)
.fieldDelimiter('|')
.includeFields("1100")
.types(Integer.class, String.class);
}
}
......@@ -23,100 +23,196 @@ import java.util.Date;
import eu.stratosphere.api.java.DataSet;
import eu.stratosphere.api.java.ExecutionEnvironment;
import eu.stratosphere.api.java.aggregation.Aggregations;
import eu.stratosphere.api.java.functions.FilterFunction;
import eu.stratosphere.api.java.functions.JoinFunction;
import eu.stratosphere.api.java.functions.ReduceFunction;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.api.java.tuple.Tuple3;
import eu.stratosphere.api.java.tuple.Tuple4;
import eu.stratosphere.api.java.tuple.Tuple5;
/**
* This program implements a modified version of the TPC-H query 3. The
* example demonstrates how to assign names to fields by extending the Tuple class.
* This program implements a modified version of the TPC-H query 3. The
* example demonstrates how to assign names to fields by extending the Tuple class.
* The original query can be found at
* <a href="http://www.tpc.org/tpch/spec/tpch2.16.0.pdf">http://www.tpc.org/tpch/spec/tpch2.16.0.pdf</a> (page 29).
*
* The original query can be found at
* http://www.tpc.org/tpch/spec/tpch2.16.0.pdf (page 29).
* <p>
* This program implements the following SQL equivalent:
*
* This program implements the following SQL equivalent:
* <p>
* <code><pre>
* SELECT
* l_orderkey,
* SUM(l_extendedprice*(1-l_discount)) AS revenue,
* o_orderdate,
* o_shippriority
* FROM customer,
* orders,
* lineitem
* WHERE
* c_mktsegment = '[SEGMENT]'
* AND c_custkey = o_custkey
* AND l_orderkey = o_orderkey
* AND o_orderdate < date '[DATE]'
* AND l_shipdate > date '[DATE]'
* GROUP BY
* l_orderkey,
* o_orderdate,
* o_shippriority;
* </pre></code>
*
* select l_orderkey,
* sum(l_extendedprice*(1-l_discount)) as revenue,
* o_orderdate,
* o_shippriority from customer,
* orders,
* lineitem
* where
* c_mktsegment = '[SEGMENT]' and
* c_custkey = o_custkey and
* l_orderkey = o_orderkey and
* o_orderdate < date '[DATE]' and
* l_shipdate > date '[DATE]'
* group by
* l_orderkey,
* o_orderdate,
* o_shippriority
* order by //not yet
* revenue desc,
* o_orderdate;
* <p>
* Compared to the original TPC-H query this version does not sort the result by revenue
* and orderdate.
*
* <p>
* Input files are plain text CSV files using the pipe character ('|') as field separator
* as generated by the TPC-H data generator which is available at <a href="http://www.tpc.org/tpch/">http://www.tpc.org/tpch/</a>.
*
* <p>
* This example shows how to use:
* <ul>
* <li> custom data type derived from tuple data types
* <li> inline-defined functions
* <li> build-in aggregation functions
* </ul>
*/
@SuppressWarnings("serial")
public class TPCHQuery3 {
// --------------------------------------------------------------------------------------------
// Custom type classes
// --------------------------------------------------------------------------------------------
// *************************************************************************
// PROGRAM
// *************************************************************************
public static class Lineitem extends Tuple4<Integer, Double, Double, String> {
public static void main(String[] args) throws Exception {
parseParameters(args);
public Integer getOrderkey() {
return this.f0;
}
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
public Double getDiscount() {
return this.f2;
}
// get input data
DataSet<Lineitem> li = getLineitemDataSet(env);
DataSet<Order> or = getOrdersDataSet(env);
DataSet<Customer> cust = getCustomerDataSet(env);
// Filter market segment "AUTOMOBILE"
cust = cust.filter(
new FilterFunction<Customer>() {
@Override
public boolean filter(Customer value) {
return value.getMktsegment().equals("AUTOMOBILE");
}
});
public Double getExtendedprice() {
return this.f1;
}
// Filter all Orders with o_orderdate < 12.03.1995
or = or.filter(
new FilterFunction<Order>() {
private DateFormat format = new SimpleDateFormat("yyyy-MM-dd");
private Date date;
{
Calendar cal = Calendar.getInstance();
cal.set(1995, 3, 12);
date = cal.getTime();
}
@Override
public boolean filter(Order value) throws ParseException {
Date orderDate = format.parse(value.getOrderdate());
return orderDate.before(date);
}
});
// Filter all Lineitems with l_shipdate > 12.03.1995
li = li.filter(
new FilterFunction<Lineitem>() {
private DateFormat format = new SimpleDateFormat("yyyy-MM-dd");
private Date date;
{
Calendar cal = Calendar.getInstance();
cal.set(1995, 3, 12);
date = cal.getTime();
}
@Override
public boolean filter(Lineitem value) throws ParseException {
Date shipDate = format.parse(value.getShipdate());
return shipDate.after(date);
}
});
public String getShipdate() {
return this.f3;
}
// Join customers with orders and package them into a ShippingPriorityItem
DataSet<ShippingPriorityItem> customerWithOrders =
cust.join(or)
.where(0)
.equalTo(0)
.with(
new JoinFunction<Customer, Order, ShippingPriorityItem>() {
@Override
public ShippingPriorityItem join(Customer first, Order second) {
return new ShippingPriorityItem(0, 0.0, second.getOrderdate(),
second.getShippriority(), second.getOrderkey());
}
});
// Join the last join result with Orders
DataSet<ShippingPriorityItem> joined =
customerWithOrders.join(li)
.where(4)
.equalTo(0)
.with(
new JoinFunction<ShippingPriorityItem, Lineitem, ShippingPriorityItem>() {
@Override
public ShippingPriorityItem join(ShippingPriorityItem first, Lineitem second) {
first.setL_Orderkey(second.getOrderkey());
first.setRevenue(second.getExtendedprice() * (1 - second.getDiscount()));
return first;
}
});
// Group by l_orderkey, o_orderdate and o_shippriority and compute revenue sum
joined = joined
.groupBy(0, 2, 3)
.aggregate(Aggregations.SUM, 1);
// emit result
joined.writeAsCsv(outputPath, "\n", "|");
// execute program
env.execute("TPCH Query 3 Example");
}
// *************************************************************************
// DATA TYPES
// *************************************************************************
public static class Lineitem extends Tuple4<Integer, Double, Double, String> {
public Integer getOrderkey() { return this.f0; }
public Double getDiscount() { return this.f2; }
public Double getExtendedprice() { return this.f1; }
public String getShipdate() { return this.f3; }
}
public static class Customer extends Tuple2<Integer, String> {
public Integer getCustKey() {
return this.f0;
}
public String getMktsegment() {
return this.f1;
}
public Integer getCustKey() { return this.f0; }
public String getMktsegment() { return this.f1; }
}
public static class Order extends Tuple3<Integer, String, Integer> {
public Integer getOrderkey() {
return this.f0;
}
public String getOrderdate() {
return this.f1;
}
public Integer getShippriority() {
return this.f2;
}
public Integer getOrderkey() { return this.f0; }
public String getOrderdate() { return this.f1; }
public Integer getShippriority() { return this.f2; }
}
public static class ShippingPriorityItem extends Tuple5<Integer, Double, String, Integer, Integer> {
public ShippingPriorityItem() {
}
public ShippingPriorityItem() { }
public ShippingPriorityItem(Integer l_orderkey, Double revenue,
String o_orderdate, Integer o_shippriority, Integer o_orderkey) {
......@@ -127,175 +223,65 @@ public class TPCHQuery3 {
this.f4 = o_orderkey;
}
public Integer getL_Orderkey() {
return this.f0;
}
public void setL_Orderkey(Integer l_orderkey) {
this.f0 = l_orderkey;
}
public Double getRevenue() {
return this.f1;
}
public void setRevenue(Double revenue) {
this.f1 = revenue;
}
public String getOrderdate() {
return this.f2;
}
public Integer getShippriority() {
return this.f3;
}
public Integer getO_Orderkey() {
return this.f4;
}
public Integer getL_Orderkey() { return this.f0; }
public void setL_Orderkey(Integer l_orderkey) { this.f0 = l_orderkey; }
public Double getRevenue() { return this.f1; }
public void setRevenue(Double revenue) { this.f1 = revenue; }
public String getOrderdate() { return this.f2; }
public Integer getShippriority() { return this.f3; }
public Integer getO_Orderkey() { return this.f4; }
}
// *************************************************************************
// UTIL METHODS
// *************************************************************************
// --------------------------------------------------------------------------------------------
// Query program
// --------------------------------------------------------------------------------------------
private static String lineitemPath;
private static String customerPath;
private static String ordersPath;
private static String outputPath;
/*
* This example TPCH3 query uses custom objects.
*/
public static void main(String[] args) throws Exception {
if (args.length < 3) {
System.err.println("Parameters: <lineitem.tbl> <customer.tbl> <orders.tbl> [<output>].");
return;
}
final String lineitemPath = args[0];
final String customerPath = args[1];
final String ordersPath = args[2];
final String resultPath = args.length >= 4 ? args[4] : null;
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
/*
* Read Data from files
*/
DataSet<Lineitem> li = env
.readCsvFile(lineitemPath).fieldDelimiter('|')
.includeFields("1000011000100000")
.tupleType(Lineitem.class);
DataSet<Order> or = env
.readCsvFile(ordersPath).fieldDelimiter('|')
.includeFields("100010010")
.tupleType(Order.class);
DataSet<Customer> cust = env
.readCsvFile(customerPath).fieldDelimiter('|')
.includeFields("10000010")
.tupleType(Customer.class);
private static void parseParameters(String[] programArguments) {
/*
* Filter market segment "AUTOMOBILE"
*/
cust = cust.filter(new FilterFunction<Customer>() {
@Override
public boolean filter(Customer value) {
return value.getMktsegment().equals("AUTOMOBILE");
}
});
/*
* Filter all Orders with o_orderdate < 12.03.1995
*/
or = or.filter(new FilterFunction<Order>() {
private DateFormat format = new SimpleDateFormat("yyyy-MM-dd");
private Date date;
{
Calendar cal = Calendar.getInstance();
cal.set(1995, 3, 12);
date = cal.getTime();
if(programArguments.length > 0) {
if(programArguments.length == 4) {
lineitemPath = programArguments[0];
customerPath = programArguments[1];
ordersPath = programArguments[2];
outputPath = programArguments[3];
} else {
System.err.println("Usage: TPCHQuery3 <lineitem-csv path> <customer-csv path> <orders-csv path> <result path>");
System.exit(1);
}
@Override
public boolean filter(Order value) throws ParseException {
Date orderDate = format.parse(value.getOrderdate());
return orderDate.before(date);
}
});
/*
* Filter all Lineitems with l_shipdate > 12.03.1995
*/
li = li.filter(new FilterFunction<Lineitem>() {
private DateFormat format = new SimpleDateFormat("yyyy-MM-dd");
private Date date;
{
Calendar cal = Calendar.getInstance();
cal.set(1995, 3, 12);
date = cal.getTime();
}
@Override
public boolean filter(Lineitem value) throws ParseException {
Date shipDate = format.parse(value.getShipdate());
return shipDate.after(date);
}
});
/*
* Join customers with orders and package them into a ShippingPriorityItem
*/
DataSet<ShippingPriorityItem> customerWithOrders = cust
.join(or)
.where(0)
.equalTo(0)
.with(new JoinFunction<Customer, Order, ShippingPriorityItem>() {
@Override
public ShippingPriorityItem join(Customer first, Order second) {
return new ShippingPriorityItem(0, 0.0, second.getOrderdate(),
second.getShippriority(), second.getOrderkey());
}
});
/*
* Join the last join result with Orders
*/
DataSet<ShippingPriorityItem> joined = customerWithOrders
.join(li)
.where(4)
.equalTo(0)
.with(new JoinFunction<ShippingPriorityItem, Lineitem, ShippingPriorityItem>() {
@Override
public ShippingPriorityItem join(ShippingPriorityItem first, Lineitem second) {
first.setL_Orderkey(second.getOrderkey());
first.setRevenue(second.getExtendedprice() * (1 - second.getDiscount()));
return first;
}
});
/*
* GroupBy l_orderkey, o_orderdate and o_shippriority
* After that, the reduce function calculates the revenue.
*/
joined = joined
.groupBy(0, 2, 3)
.reduce(new ReduceFunction<TPCHQuery3.ShippingPriorityItem>() {
@Override
public ShippingPriorityItem reduce(ShippingPriorityItem value1, ShippingPriorityItem value2) {
value1.setRevenue(value1.getRevenue() + value2.getRevenue());
return value1;
}
});
if (resultPath == null) {
joined.print();
} else {
joined.writeAsCsv(resultPath, "\n", "|");
System.err.println("This program expects data from the TPC-H benchmark as input data.\n" +
" Due to legal restrictions, we can not ship generated data.\n" +
" You can find the TPC-H data generator at http://www.tpc.org/tpch/.\n" +
" Usage: TPCHQuery3 <lineitem-csv path> <customer-csv path> <orders-csv path> <result path>");
System.exit(1);
}
env.execute();
}
private static DataSet<Lineitem> getLineitemDataSet(ExecutionEnvironment env) {
return env.readCsvFile(lineitemPath)
.fieldDelimiter('|')
.includeFields("1000011000100000")
.tupleType(Lineitem.class);
}
private static DataSet<Customer> getCustomerDataSet(ExecutionEnvironment env) {
return env.readCsvFile(customerPath)
.fieldDelimiter('|')
.includeFields("10000010")
.tupleType(Customer.class);
}
private static DataSet<Order> getOrdersDataSet(ExecutionEnvironment env) {
return env.readCsvFile(ordersPath)
.fieldDelimiter('|')
.includeFields("100010010")
.tupleType(Order.class);
}
}
......@@ -13,6 +13,8 @@
package eu.stratosphere.example.java.relational;
import java.util.Iterator;
import eu.stratosphere.api.java.DataSet;
import eu.stratosphere.api.java.ExecutionEnvironment;
import eu.stratosphere.api.java.functions.CoGroupFunction;
......@@ -20,57 +22,135 @@ import eu.stratosphere.api.java.functions.FilterFunction;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.api.java.tuple.Tuple3;
import eu.stratosphere.example.java.relational.util.WebLogData;
import eu.stratosphere.example.java.relational.util.WebLogDataGenerator;
import eu.stratosphere.util.Collector;
import java.util.Iterator;
/**
* Implements the following relational OLAP query as Stratosphere program:
* This program processes web logs and relational data.
* It implements the following relational query:
*
* <code><pre>
* SELECT r.pageURL, r.pageRank, r.avgDuration
* FROM Documents d JOIN Rankings r
* ON d.url = r.url
* WHERE CONTAINS(d.text, [keywords])
* AND r.rank > [rank]
* AND NOT EXISTS (
* SELECT * FROM Visits v
* WHERE v.destUrl = d.url
* AND v.visitDate < [date]);
* * </pre></code>
* SELECT
* r.pageURL,
* r.pageRank,
* r.avgDuration
* FROM documents d JOIN rankings r
* ON d.url = r.url
* WHERE CONTAINS(d.text, [keywords])
* AND r.rank > [rank]
* AND NOT EXISTS
* (
* SELECT * FROM Visits v
* WHERE v.destUrl = d.url
* AND v.visitDate < [date]
* );
* </pre></code>
*
* Table Schemas: <code><pre>
* <p>
* Input files are plain text CSV files using the pipe character ('|') as field separator.
* The tables referenced in the query can be generated using the {@link WebLogDataGenerator} and
* have the following schemas
* <code><pre>
* CREATE TABLE Documents (
* url VARCHAR(100) PRIMARY KEY,
* contents TEXT );
* url VARCHAR(100) PRIMARY KEY,
* contents TEXT );
*
* CREATE TABLE Rankings (
* pageRank INT,
* pageURL VARCHAR(100) PRIMARY KEY,
* avgDuration INT );
* pageRank INT,
* pageURL VARCHAR(100) PRIMARY KEY,
* avgDuration INT );
*
* CREATE TABLE Visits (
* sourceIP VARCHAR(16),
* destURL VARCHAR(100),
* visitDate DATE,
* adRevenue FLOAT,
* userAgent VARCHAR(64),
* countryCode VARCHAR(3),
* languageCode VARCHAR(6),
* searchWord VARCHAR(32),
* duration INT );
* sourceIP VARCHAR(16),
* destURL VARCHAR(100),
* visitDate DATE,
* adRevenue FLOAT,
* userAgent VARCHAR(64),
* countryCode VARCHAR(3),
* languageCode VARCHAR(6),
* searchWord VARCHAR(32),
* duration INT );
* </pre></code>
*
* <p>
* This example shows how to use:
* <ul>
* <li> tuple data types
* <li> projection and join projection
* <li> the CoGroup transformation for an anti-join
* </ul>
*
*/
@SuppressWarnings("serial")
public class WebLogAnalysis {
// *************************************************************************
// PROGRAM
// *************************************************************************
public static void main(String[] args) throws Exception {
parseParameters(args);
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// get input data
DataSet<Tuple2<String, String>> documents = getDocumentsDataSet(env);
DataSet<Tuple3<Integer, String, Integer>> ranks = getRanksDataSet(env);
DataSet<Tuple2<String, String>> visits = getVisitsDataSet(env);
// Create DataSet for filtering the entries from the documents relation
DataSet<Tuple1<String>> filterDocs = documents
.filter(new FilterDocs())
.project(0).types(String.class);
// Create DataSet for filtering the entries from the ranks relation
DataSet<Tuple3<Integer, String, Integer>> filterRanks = ranks
.filter(new FilterRanks());
// Create DataSet for filtering the entries from the visits relation
DataSet<Tuple1<String>> filterVisits = visits
.filter(new FilterVisits())
.project(0).types(String.class);
// Create DataSet to join the filtered documents and ranks relation
DataSet<Tuple3<Integer, String, Integer>> joinDocsRanks =
filterDocs.join(filterRanks)
.where(0).equalTo(1)
.projectSecond(0,1,2)
.types(Integer.class, String.class, Integer.class);
// Create DataSet to realize a anti join between the joined
// documents and ranks relation and the filtered visits relation
DataSet<Tuple3<Integer, String, Integer>> result =
joinDocsRanks.coGroup(filterVisits)
.where(1).equalTo(0)
.with(new AntiJoinVisits());
// emit result
if(fileOutput) {
result.writeAsCsv(outputPath, "\n", "|");
} else {
result.print();
}
// execute program
env.execute("WebLogAnalysis Example");
}
// *************************************************************************
// USER FUNCTIONS
// *************************************************************************
/**
* MapFunction that filters for documents that contain a certain set of
* keywords.
*/
public static class FilterDocs extends FilterFunction<Tuple2<String, String>> {
private static final String[] KEYWORDS = { " editors ", " oscillations ", " convection " };
private static final String[] KEYWORDS = { " editors ", " oscillations " };
/**
* Filters for documents that contain all of the given keywords and projects the records on the URL field.
......@@ -98,7 +178,7 @@ public class WebLogAnalysis {
*/
public static class FilterRanks extends FilterFunction<Tuple3<Integer, String, Integer>> {
private static final int RANKFILTER = 50;
private static final int RANKFILTER = 40;
/**
* Filters for records of the rank relation where the rank is greater
......@@ -121,7 +201,7 @@ public class WebLogAnalysis {
*/
public static class FilterVisits extends FilterFunction<Tuple2<String, String>> {
private static final int YEARFILTER = 2010;
private static final int YEARFILTER = 2007;
/**
* Filters for records of the visits relation where the year of visit is equal to a
......@@ -169,76 +249,68 @@ public class WebLogAnalysis {
}
}
public static void main(String[] args) throws Exception {
if (args.length < 4) {
System.err.println("Parameters: <docs> <ranks> <visits> <output>.");
System.err.println(" If <output> is \"STDOUT\", prints result to the command line.");
return;
}
// *************************************************************************
// UTIL METHODS
// *************************************************************************
private static boolean fileOutput = false;
private static String documentsPath;
private static String ranksPath;
private static String visitsPath;
private static String outputPath;
private static void parseParameters(String[] args) {
String docsInput = args[0];
String ranksInput = args[1];
String visitsInput = args[2];
String output = args[3];
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
/*
* Output Format:
* 0: URL
* 1: DOCUMENT_TEXT
*/
// Create DataSet for documents relation
DataSet<Tuple2<String, String>> docs = env.readCsvFile(docsInput)
.fieldDelimiter('|')
.types(String.class, String.class);
/*
* Output Format:
* 0: RANK
* 1: URL
* 2: AVG_DURATION
*/
// Create DataSet for ranks relation
DataSet<Tuple3<Integer, String, Integer>> ranks = env.readCsvFile(ranksInput)
.fieldDelimiter('|')
.types(Integer.class, String.class, Integer.class);
/*
* Output Format:
* 0: URL
* 1: DATE
*/
// Create DataSet for visits relation
DataSet<Tuple2<String, String>> visits = env.readCsvFile(visitsInput)
.fieldDelimiter('|')
.includeFields("011000000")
.types(String.class, String.class);
// Create DataSet for filtering the entries from the documents relation
DataSet<Tuple1<String>> filterDocs = docs.filter(new FilterDocs()).project(0).types(String.class);
// Create DataSet for filtering the entries from the ranks relation
DataSet<Tuple3<Integer, String, Integer>> filterRanks = ranks.filter(new FilterRanks());
// Create DataSet for filtering the entries from the visits relation
DataSet<Tuple1<String>> filterVisits = visits.filter(new FilterVisits()).project(0).types(String.class);
// Create DataSet to join the filtered documents and ranks relation
DataSet<Tuple3<Integer, String, Integer>> joinDocsRanks = filterDocs.join(filterRanks)
.where(0).equalTo(1).projectSecond(0,1,2).types(Integer.class, String.class, Integer.class);
// Create DataSet to realize a anti join between the joined
// documents and ranks relation and the filtered visits relation
DataSet<Tuple3<Integer, String, Integer>> antiJoinVisits = joinDocsRanks.coGroup(filterVisits)
.where(1).equalTo(0).with(new AntiJoinVisits());
if (output.equals("STDOUT")) {
antiJoinVisits.print();
if(args.length > 0) {
fileOutput = true;
if(args.length == 4) {
documentsPath = args[0];
ranksPath = args[1];
visitsPath = args[2];
outputPath = args[3];
} else {
System.err.println("Usage: WebLogAnalysis <documents path> <ranks path> <visits path> <result path>");
System.exit(1);
}
} else {
antiJoinVisits.writeAsCsv(output, "\n", "|");
System.out.println("Executing WebLog Analysis example with built-in default data.");
System.out.println(" Provide parameters to read input data from files.");
System.out.println(" Usage: WebLogAnalysis <documents path> <ranks path> <visits path> <result path>");
}
}
private static DataSet<Tuple2<String, String>> getDocumentsDataSet(ExecutionEnvironment env) {
// Create DataSet for documents relation (URL, Doc-Text)
if(fileOutput) {
return env.readCsvFile(documentsPath)
.fieldDelimiter('|')
.types(String.class, String.class);
} else {
return WebLogData.getDocumentDataSet(env);
}
}
private static DataSet<Tuple3<Integer, String, Integer>> getRanksDataSet(ExecutionEnvironment env) {
// Create DataSet for ranks relation (Rank, URL, Avg-Visit-Duration)
if(fileOutput) {
return env.readCsvFile(ranksPath)
.fieldDelimiter('|')
.types(Integer.class, String.class, Integer.class);
} else {
return WebLogData.getRankDataSet(env);
}
}
env.execute();
private static DataSet<Tuple2<String, String>> getVisitsDataSet(ExecutionEnvironment env) {
// Create DataSet for visits relation (URL, Date)
if(fileOutput) {
return env.readCsvFile(visitsPath)
.fieldDelimiter('|')
.includeFields("011000000")
.types(String.class, String.class);
} else {
return WebLogData.getVisitDataSet(env);
}
}
}
......@@ -11,38 +11,48 @@
* specific language governing permissions and limitations under the License.
**********************************************************************************************************************/
package eu.stratosphere.example.java.relational.generator;
package eu.stratosphere.example.java.relational.util;
import java.io.FileWriter;
import java.io.IOException;
import java.util.Calendar;
import java.util.Random;
public class WebLogGenerator {
import eu.stratosphere.example.java.relational.WebLogAnalysis;
/**
* Data generator for the {@link WebLogAnalysis} example program.
*
*/
public class WebLogDataGenerator {
/**
* Main method to generate data for the {@link WebLogAnalysis} example program.
* <p>
* The generator creates to files:
* <ul>
* <li><code>{tmp.dir}/documents</code> for the web documents
* <li><code>{tmp.dir}/ranks</code> for the ranks of the web documents
* <li><code>{tmp.dir}/visits</code> for the logged visits of web documents
* </ul>
*
* @param args
* <ol>
* <li>Int: Number of web documents
* <li>Int: Number of visits
* </ol>
*/
public static void main(String[] args) {
if (args.length != 4) {
if (args.length == 0 || args[0].equals("-h")
|| args[0].equals("--help")) {
// Show help
System.out.println("Usage:");
System.out.println("1:\tWith parameters");
System.out.println("\t<Generator> "
+ "[noDocuments] [noVisits] [outPath] [noFiles]");
System.out.println("2:\tDefault parameters");
System.out.println("\t<Generator> -d");
return;
} else if (args[0].equals("-d")) {
// Default values
args = new String[4];
args[0] = "1000"; // number of documents
args[1] = "10000"; // number of visits
args[2] = "/tmp/stratosphere/"; // path
args[3] = "1"; // number of files
}
// parse parameters
if (args.length < 2) {
System.out.println("WebLogDataGenerator <numberOfDocuments> <numberOfVisits>");
System.exit(1);
}
int noDocs = Integer.parseInt(args[0]);
int noVisits = Integer.parseInt(args[1]);
String[] filterKWs = { "editors", "oscillations", "convection" };
String[] words = { "Lorem", "ipsum", "dolor", "sit", "amet",
......@@ -53,18 +63,15 @@ public class WebLogGenerator {
"ullamcorper", "suscipit", "lobortis", "nisl", "ut", "aliquip",
"ex", "ea", "commodo" };
int noDocs = Integer.parseInt(args[0]);
int noVisits = Integer.parseInt(args[1]);
String path = args[2];
int noFiles = Integer.parseInt(args[3]);
final String outPath = System.getProperty("java.io.tmpdir");
System.out.println("Generating documents files...");
genDocs(noDocs, noFiles, filterKWs, words, path + "docs_");
genDocs(noDocs, filterKWs, words, outPath + "/documents");
System.out.println("Generating ranks files...");
genRanks(noDocs, noFiles, path + "ranks_");
genRanks(noDocs, outPath + "/ranks");
System.out.println("Generating visits files...");
genVisits(noVisits, noDocs, noFiles, path + "visits_");
genVisits(noVisits, noDocs, outPath + "/visits");
System.out.println("Done!");
}
......@@ -76,8 +83,6 @@ public class WebLogGenerator {
*
* @param noDocs
* Number of entries for the documents relation
* @param noFiles
* Number of files for the documents relation
* @param filterKeyWords
* A list of keywords that should be contained
* @param words
......@@ -85,16 +90,12 @@ public class WebLogGenerator {
* @param path
* Output path for the documents relation
*/
public static void genDocs(int noDocs, int noFiles,
String[] filterKeyWords, String[] words, String path) {
private static void genDocs(int noDocs, String[] filterKeyWords, String[] words, String path) {
Random rand = new Random(Calendar.getInstance().getTimeInMillis());
int fileId = 0;
int docsPerFile = (noDocs / noFiles) + 1;
int docsInFile = 0;
try {
FileWriter fw = new FileWriter(path + (fileId++));
FileWriter fw = new FileWriter(path);
for (int i = 0; i < noDocs; i++) {
......@@ -102,8 +103,8 @@ public class WebLogGenerator {
// URL
StringBuilder doc = new StringBuilder("url_" + i + "|");
for (int j = 0; j < wordsInDoc; j++) {
if (rand.nextDouble() > 0.98) {
// Approx. every 50th word is a keyword
if (rand.nextDouble() > 0.9) {
// Approx. every 10th word is a keyword
doc.append(filterKeyWords[rand
.nextInt(filterKeyWords.length)] + " ");
} else {
......@@ -114,13 +115,6 @@ public class WebLogGenerator {
doc.append("|\n");
fw.write(doc.toString());
docsInFile++;
if (docsInFile == docsPerFile) {
fw.close();
fw = new FileWriter(path + (fileId++));
docsInFile = 0;
}
}
fw.close();
......@@ -136,23 +130,17 @@ public class WebLogGenerator {
*
* @param noDocs
* Number of entries in the documents relation
* @param noFiles
* Number of files for the ranks relation
* @param path
* Output path for the ranks relation
*/
public static void genRanks(int noDocs, int noFiles, String path) {
private static void genRanks(int noDocs, String path) {
Random rand = new Random(Calendar.getInstance().getTimeInMillis());
int fileId = 0;
int docsPerFile = (noDocs / noFiles) + 1;
int docsInFile = 0;
try {
FileWriter fw = new FileWriter(path + (fileId++));
FileWriter fw = new FileWriter(path);
for (int i = 0; i < noDocs; i++) {
// Rank
StringBuilder rank = new StringBuilder(rand.nextInt(100) + "|");
// URL
......@@ -161,13 +149,6 @@ public class WebLogGenerator {
rank.append(rand.nextInt(10) + rand.nextInt(50) + "|\n");
fw.write(rank.toString());
docsInFile++;
if (docsInFile == docsPerFile) {
fw.close();
fw = new FileWriter(path + (fileId++));
docsInFile = 0;
}
}
fw.close();
......@@ -185,20 +166,15 @@ public class WebLogGenerator {
* Number of entries for the visits relation
* @param noDocs
* Number of entries in the documents relation
* @param noFiles
* Number of files for the visits relation
* @param path
* Output path for the visits relation
*/
public static void genVisits(int noVisits, int noDocs, int noFiles, String path) {
private static void genVisits(int noVisits, int noDocs, String path) {
Random rand = new Random(Calendar.getInstance().getTimeInMillis());
int fileId = 0;
int visitsPerFile = (noVisits / noFiles) + 1;
int visitsInFile = 0;
try {
FileWriter fw = new FileWriter(path + (fileId++));
FileWriter fw = new FileWriter(path);
for (int i = 0; i < noVisits; i++) {
......@@ -218,13 +194,6 @@ public class WebLogGenerator {
visit.append("0.12|Mozilla Firefox 3.1|de|de|Nothing special|124|\n");
fw.write(visit.toString());
visitsInFile++;
if (visitsInFile == visitsPerFile) {
fw.close();
fw = new FileWriter(path + (fileId++));
visitsInFile = 0;
}
}
fw.close();
......
......@@ -19,49 +19,64 @@ import eu.stratosphere.api.java.ExecutionEnvironment;
import eu.stratosphere.api.java.aggregation.Aggregations;
import eu.stratosphere.api.java.functions.FlatMapFunction;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.example.java.wordcount.util.WordCountData;
import eu.stratosphere.util.Collector;
/**
* Implements a the "WordCount" program that computes a simple word occurrence histogram
* over text files. The histogram is written back to disk as '(word, count)' pairs.
* over text files.
*
* <p>
* The input are plain text files.
*
* <p>
* This example shows how to:
* <ul>
* <li>write a simple Stratosphere program.
* <li>use Tuple data types.
* <li>write and use user-defined functions.
* </ul>
*
*/
@SuppressWarnings("serial")
public class WordCount {
/**
* Runs the WordCount program. Accepts parameters: <input file path> <result file path>.
* Paths must be qualified URIs, i.e., start with "file://..." or "hdfs://...".
*
* @param args Parameters defining the input and output path.
*/
// *************************************************************************
// PROGRAM
// *************************************************************************
public static void main(String[] args) throws Exception {
if (args.length < 2) {
System.err.println("Usage: WordCount <input path> <result path>");
return;
}
final String inputPath = args[0];
final String outputPath = args[1];
parseParameters(args);
// get the environment as starting point
// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// read the text file from given input path
DataSet<String> text = env.readTextFile(inputPath);
// split up the lines in pairs (2-tuples) containing: (word,1)
DataSet<Tuple2<String, Integer>> words = text.flatMap(new Tokenizer());
// get input data
DataSet<String> text = getTextDataSet(env);
// group by the tuple field "0" and sum up tuple field "1"
DataSet<Tuple2<String, Integer>> result = words.groupBy(0).aggregate(Aggregations.SUM, 1);
// write out the result
result.writeAsText(outputPath);
DataSet<Tuple2<String, Integer>> counts =
// split up the lines in pairs (2-tuples) containing: (word,1)
text.flatMap(new Tokenizer())
// group by the tuple field "0" and sum up tuple field "1"
.groupBy(0)
.aggregate(Aggregations.SUM, 1);
// emit result
if(fileOutput) {
counts.writeAsCsv(outputPath, "\n", " ");
} else {
counts.print();
}
// execute the defined program
env.execute("Word Count");
// execute program
env.execute("WordCount Example");
}
// *************************************************************************
// USER FUNCTIONS
// *************************************************************************
/**
* Implements the string tokenizer that splits sentences into words as a user-defined
* FlatMapFunction. The function takes a line (String) and splits it into
......@@ -82,4 +97,41 @@ public class WordCount {
}
}
}
// *************************************************************************
// UTIL METHODS
// *************************************************************************
private static boolean fileOutput = false;
private static String textPath;
private static String outputPath;
private static void parseParameters(String[] args) {
if(args.length > 0) {
// parse input arguments
fileOutput = true;
if(args.length == 2) {
textPath = args[0];
outputPath = args[1];
} else {
System.err.println("Usage: WordCount <text path> <result path>");
System.exit(1);
}
} else {
System.out.println("Executing WordCount example with built-in default data.");
System.out.println(" Provide parameters to read input data from a file.");
System.out.println(" Usage: WordCount <text path> <result path>");
}
}
private static DataSet<String> getTextDataSet(ExecutionEnvironment env) {
if(fileOutput) {
// read the text file from given input path
return env.readTextFile(textPath);
} else {
// get default test text data
return WordCountData.getDefaultTextLineDataSet(env);
}
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.example.java.wordcount.util;
import eu.stratosphere.api.java.DataSet;
import eu.stratosphere.api.java.ExecutionEnvironment;
public class WordCountData {
public static DataSet<String> getDefaultTextLineDataSet(ExecutionEnvironment env) {
return env.fromElements(
"To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer",
"The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles,",
"And by opposing end them?--To die,--to sleep,--",
"No more; and by a sleep to say we end",
"The heartache, and the thousand natural shocks",
"That flesh is heir to,--'tis a consummation",
"Devoutly to be wish'd. To die,--to sleep;--",
"To sleep! perchance to dream:--ay, there's the rub;",
"For in that sleep of death what dreams may come,",
"When we have shuffled off this mortal coil,",
"Must give us pause: there's the respect",
"That makes calamity of so long life;",
"For who would bear the whips and scorns of time,",
"The oppressor's wrong, the proud man's contumely,",
"The pangs of despis'd love, the law's delay,",
"The insolence of office, and the spurns",
"That patient merit of the unworthy takes,",
"When he himself might his quietus make",
"With a bare bodkin? who would these fardels bear,",
"To grunt and sweat under a weary life,",
"But that the dread of something after death,--",
"The undiscover'd country, from whose bourn",
"No traveller returns,--puzzles the will,",
"And makes us rather bear those ills we have",
"Than fly to others that we know not of?",
"Thus conscience does make cowards of us all;",
"And thus the native hue of resolution",
"Is sicklied o'er with the pale cast of thought;",
"And enterprises of great pith and moment,",
"With this regard, their currents turn awry,",
"And lose the name of action.--Soft you now!",
"The fair Ophelia!--Nymph, in thy orisons",
"Be all my sins remember'd."
);
}
}
......@@ -17,17 +17,17 @@ package eu.stratosphere.test.testdata;
public class EnumTriangleData {
public static final String EDGES =
"1,2\n" +
"1,3\n" +
"1,4\n" +
"1,5\n" +
"2,3\n" +
"2,5\n" +
"3,4\n" +
"3,7\n" +
"5,6\n" +
"3,8\n" +
"7,8\n";
"1 2\n" +
"1 3\n" +
"1 4\n" +
"1 5\n" +
"2 3\n" +
"2 5\n" +
"3 4\n" +
"3 7\n" +
"5 6\n" +
"3 8\n" +
"7 8\n";
public static final String TRIANGLES_BY_ID =
"1,2,3\n" +
......
......@@ -25,35 +25,35 @@ public class PageRankData {
"3\n" +
"4";
public static final String VERTICES_WITH_INITIAL_RANK = "1,0.2\n" +
"2,0.2\n" +
"5,0.2\n" +
"3,0.2\n" +
"4,0.2";
public static final String VERTICES_WITH_INITIAL_RANK = "1 0.2\n" +
"2 0.2\n" +
"5 0.2\n" +
"3 0.2\n" +
"4 0.2";
public static final String EDGES = "2,1\n" +
"5,2\n" +
"5,4\n" +
"4,3\n" +
"4,2\n" +
"1,4\n" +
"1,2\n" +
"1,3\n" +
"3,5\n";
public static final String EDGES = "2 1\n" +
"5 2\n" +
"5 4\n" +
"4 3\n" +
"4 2\n" +
"1 4\n" +
"1 2\n" +
"1 3\n" +
"3 5\n";
public static final String RANKS_AFTER_3_ITERATIONS = "1,0.237\n" +
"2,0.248\n" +
"3,0.173\n" +
"4,0.175\n" +
"5,0.165";
public static final String RANKS_AFTER_3_ITERATIONS = "1 0.237\n" +
"2 0.248\n" +
"3 0.173\n" +
"4 0.175\n" +
"5 0.165";
public static final String RANKS_AFTER_EPSILON_0_0001_CONVERGENCE = "1,0.238\n" +
"2,0.244\n" +
"3,0.170\n" +
"4,0.171\n" +
"5,0.174";
public static final String RANKS_AFTER_EPSILON_0_0001_CONVERGENCE = "1 0.238\n" +
"2 0.244\n" +
"3 0.170\n" +
"4 0.171\n" +
"5 0.174";
private PageRankData() {}
}
......@@ -19,13 +19,10 @@ import org.junit.Assert;
import org.junit.Test;
import eu.stratosphere.api.common.Plan;
import eu.stratosphere.api.java.DataSet;
import eu.stratosphere.api.java.ExecutionEnvironment;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.compiler.plan.OptimizedPlan;
import eu.stratosphere.compiler.plandump.PlanJSONDumpGenerator;
import eu.stratosphere.example.java.graph.ConnectedComponents;
import eu.stratosphere.test.compiler.CompilerTestBase;
import eu.stratosphere.test.recordJobs.graph.DeltaPageRankWithInitialDeltas;
import eu.stratosphere.test.recordJobs.kmeans.KMeansBroadcast;
import eu.stratosphere.test.recordJobs.kmeans.KMeansSingleStep;
import eu.stratosphere.test.recordJobs.relational.TPCHQuery3;
......@@ -63,16 +60,8 @@ public class DumpCompiledPlanTest extends CompilerTestBase {
}
@Test
public void dumpConnectedComponents() {
// take the core program and create dummy sources and sinks around it
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
DataSet<Long> vertices = env.fromElements(1L);
@SuppressWarnings("unchecked")
DataSet<Tuple2<Long, Long>> edges = env.fromElements(new Tuple2<Long, Long>(1l, 2l));
ConnectedComponents.doConnectedComponents(vertices, edges, 10).print();
dump(env.createProgramPlan());
public void dumpDeltaPageRank() {
dump(new DeltaPageRankWithInitialDeltas().getPlan(DEFAULT_PARALLELISM_STRING, IN_FILE, IN_FILE, IN_FILE, OUT_FILE, "10"));
}
private void dump(Plan p) {
......
......@@ -21,13 +21,10 @@ import org.junit.Assert;
import org.junit.Test;
import eu.stratosphere.api.common.Plan;
import eu.stratosphere.api.java.DataSet;
import eu.stratosphere.api.java.ExecutionEnvironment;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.compiler.PactCompiler;
import eu.stratosphere.compiler.dag.DataSinkNode;
import eu.stratosphere.compiler.plandump.PlanJSONDumpGenerator;
import eu.stratosphere.example.java.graph.ConnectedComponents;
import eu.stratosphere.test.recordJobs.graph.DeltaPageRankWithInitialDeltas;
import eu.stratosphere.test.recordJobs.kmeans.KMeansBroadcast;
import eu.stratosphere.test.recordJobs.kmeans.KMeansSingleStep;
import eu.stratosphere.test.recordJobs.relational.TPCHQuery3;
......@@ -80,18 +77,9 @@ public class PreviewPlanDumpTest {
}
@Test
public void dumpConnectedComponents() {
// take the core program and create dummy sources and sinks around it
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
DataSet<Long> vertices = env.fromElements(1L);
@SuppressWarnings("unchecked")
DataSet<Tuple2<Long, Long>> edges = env.fromElements(new Tuple2<Long, Long>(1l, 2l));
ConnectedComponents.doConnectedComponents(vertices, edges, 10).print();
Plan p = env.createProgramPlan();
dump(p);
public void dumpDeltaPageRank() {
dump(new DeltaPageRankWithInitialDeltas().getPlan("4", IN_FILE, IN_FILE, IN_FILE, OUT_FILE, "10"));
dump(new DeltaPageRankWithInitialDeltas().getPlan(NO_ARGS));
}
private void dump(Plan p) {
......
......@@ -14,7 +14,7 @@
**********************************************************************************************************************/
package eu.stratosphere.test.exampleJavaPrograms;
import eu.stratosphere.example.java.triangles.EnumTrianglesBasic;
import eu.stratosphere.example.java.graph.EnumTrianglesBasic;
import eu.stratosphere.test.testdata.EnumTriangleData;
import eu.stratosphere.test.util.JavaProgramTestBase;
......
......@@ -14,7 +14,7 @@
**********************************************************************************************************************/
package eu.stratosphere.test.exampleJavaPrograms;
import eu.stratosphere.example.java.triangles.EnumTrianglesOpt;
import eu.stratosphere.example.java.graph.EnumTrianglesOpt;
import eu.stratosphere.test.testdata.EnumTriangleData;
import eu.stratosphere.test.util.JavaProgramTestBase;
......
......@@ -24,7 +24,7 @@ import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.example.java.graph.SimplePageRank;
import eu.stratosphere.example.java.graph.PageRankBasic;
import eu.stratosphere.test.testdata.PageRankData;
import eu.stratosphere.test.util.JavaProgramTestBase;
......@@ -58,7 +58,7 @@ public class PageRankITCase extends JavaProgramTestBase {
@Override
protected void postSubmit() throws Exception {
compareKeyValueParisWithDelta(expectedResult, resultPath, ",", 0.01);
compareKeyValueParisWithDelta(expectedResult, resultPath, " ", 0.01);
}
@Parameters
......@@ -80,12 +80,12 @@ public class PageRankITCase extends JavaProgramTestBase {
switch(progId) {
case 1: {
SimplePageRank.runPageRank(verticesPath, edgesPath, resultPath, PageRankData.NUM_VERTICES, 3);
PageRankBasic.main(new String[] {verticesPath, edgesPath, resultPath, PageRankData.NUM_VERTICES+"", "3"});
return PageRankData.RANKS_AFTER_3_ITERATIONS;
}
case 2: {
// start with a very high number of iteration such that the dynamic convergence criterion must handle termination
SimplePageRank.runPageRank(verticesPath, edgesPath, resultPath, PageRankData.NUM_VERTICES, 1000);
PageRankBasic.main(new String[] {verticesPath, edgesPath, resultPath, PageRankData.NUM_VERTICES+"", "1000"});
return PageRankData.RANKS_AFTER_EPSILON_0_0001_CONVERGENCE;
}
......
......@@ -37,7 +37,7 @@ public class WordCountITCase extends JavaProgramTestBase {
@Override
protected void postSubmit() throws Exception {
compareResultsByLinesInMemory(WordCountData.COUNTS_AS_TUPLES, resultPath);
compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath);
}
@Override
......
/***********************************************************************************************************************
*
* Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.test.javaApiOperators;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Collection;
import java.util.LinkedList;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.example.java.incremental.pagerank.SimpleDeltaPageRank;
import eu.stratosphere.test.util.JavaProgramTestBase;
@RunWith(Parameterized.class)
public class DeltaPageRankITCase extends JavaProgramTestBase {
private static final String INITIAL_VERTICES_WITH_RANK =
"1 0.025\n" +
"2 0.125\n" +
"3 0.0833333333333333\n" +
"4 0.0833333333333333\n" +
"5 0.075\n" +
"6 0.075\n" +
"7 0.183333333333333\n" +
"8 0.15\n" +
"9 0.1\n";
private static final String INITIAL_DELTAS = "1 -0.075\n" +
"2 0.025\n" +
"3 -0.0166666666666667\n" +
"4 -0.0166666666666667\n" +
"5 -0.025\n" +
"6 -0.025\n" +
"7 0.0833333333333333\n" +
"8 0.05\n" +
"9 0\n";
private static final String EDGES = "1 2 2\n" +
"1 3 2\n" +
"2 3 3\n" +
"2 4 3\n" +
"3 1 4\n" +
"3 2 4\n" +
"4 2 2\n" +
"5 6 2\n" +
"6 5 2\n" +
"7 8 2\n" +
"7 9 2\n" +
"8 7 2\n" +
"8 9 2\n" +
"9 7 2\n" +
"9 8 2\n" +
"3 5 4\n" +
"3 6 4\n" +
"4 8 2\n" +
"2 7 3\n" +
"5 7 2\n" +
"6 4 2\n";
private static int NUM_PROGRAMS = 1;
private int curProgId = config.getInteger("ProgramId", -1);
private String resultPath;
protected static String pagesPath;
protected static String edgesPath;
protected static String deltasPath;
private String expectedResult;
public DeltaPageRankITCase(Configuration config) {
super(config);
}
@Override
protected void preSubmit() throws Exception {
resultPath = getTempDirPath("result");
pagesPath = createTempFile("pages.txt", INITIAL_VERTICES_WITH_RANK);
edgesPath = createTempFile("edges.txt", EDGES);
deltasPath = createTempFile("deltas.txt", INITIAL_DELTAS);
}
@Override
protected void testProgram() throws Exception {
expectedResult = TestPrograms.runProgram(curProgId, resultPath);
}
@Override
protected void postSubmit() throws Exception {
compareResultsByLinesInMemory(expectedResult, resultPath);
}
@Parameters
public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
for(int i=1; i <= NUM_PROGRAMS; i++) {
Configuration config = new Configuration();
config.setInteger("ProgramId", i);
tConfigs.add(config);
}
return toParameterList(tConfigs);
}
private static class TestPrograms {
public static String runProgram(int progId, String resultPath) throws Exception {
switch(progId) {
case 1: {
SimpleDeltaPageRank.run(1, pagesPath, deltasPath, edgesPath, resultPath, 4, false);
// return expected result
return "1,0.006987847222222211\n" +
"2,0.032682291666666634\n" +
"3,0.018663194444444395\n" +
"4,0.029340277777777726\n" +
"5,0.02209201388888886\n" +
"6,0.02209201388888886\n" +
"7,0.2621527777777774\n" +
"8,0.2607638888888888\n" +
"9,0.2452256944444444\n";
}
default:
throw new IllegalArgumentException("Invalid program id");
}
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册