diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Runner.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Runner.java index e468a58d68e2c680995046e0b54c046db78ab2a2..af2a11c1ab53b485f51f0f89612e1ff915340693 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Runner.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Runner.java @@ -19,8 +19,8 @@ package org.apache.flink.graph; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.io.CsvOutputFormat; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.client.program.ProgramParametrizationException; import org.apache.flink.graph.drivers.AdamicAdar; @@ -46,11 +46,11 @@ import org.apache.flink.graph.drivers.input.RMatGraph; import org.apache.flink.graph.drivers.input.SingletonEdgeGraph; import org.apache.flink.graph.drivers.input.StarGraph; import org.apache.flink.graph.drivers.output.Hash; +import org.apache.flink.graph.drivers.output.Output; import org.apache.flink.graph.drivers.output.Print; import org.apache.flink.graph.drivers.parameter.Parameterized; import org.apache.flink.util.InstantiationUtil; -import org.apache.commons.lang3.StringEscapeUtils; import org.apache.commons.lang3.text.StrBuilder; import java.util.ArrayList; @@ -103,6 +103,11 @@ public class Runner { .addClass(PageRank.class) .addClass(TriangleListing.class); + private static ParameterizedFactory outputFactory = new ParameterizedFactory() + .addClass(org.apache.flink.graph.drivers.output.CSV.class) + .addClass(Hash.class) + .addClass(Print.class); + /** * List available algorithms. This is displayed to the user when no valid * algorithm is given in the program parameterization. @@ -174,16 +179,12 @@ public class Runner { .appendNewLine() .appendln("Available outputs:"); - if (algorithm instanceof org.apache.flink.graph.drivers.output.CSV) { - strBuilder.appendln(" --output csv --output_filename FILENAME [--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter FIELD_DELIMITER]"); - } - - if (algorithm instanceof Hash) { - strBuilder.appendln(" --output hash"); - } - - if (algorithm instanceof Print) { - strBuilder.appendln(" --output print"); + for (Output output : outputFactory) { + strBuilder + .append(" --output ") + .append(output.getName()) + .append(" ") + .appendln(output.getUsage()); } return strBuilder @@ -211,8 +212,11 @@ public class Runner { config.enableObjectReuse(); } - // Usage + // ---------------------------------------------------------------------------------------- + // Usage and configuration + // ---------------------------------------------------------------------------------------- + // algorithm and usage if (!parameters.has(ALGORITHM)) { throw new ProgramParametrizationException(getAlgorithmsListing()); } @@ -224,6 +228,7 @@ public class Runner { throw new ProgramParametrizationException("Unknown algorithm name: " + algorithmName); } + // input and usage if (!parameters.has(INPUT)) { if (!parameters.has(OUTPUT)) { // if neither input nor output is given then print algorithm usage @@ -232,6 +237,12 @@ public class Runner { throw new ProgramParametrizationException("No input given"); } + try { + algorithm.configure(parameters); + } catch (RuntimeException ex) { + throw new ProgramParametrizationException(ex.getMessage()); + } + String inputName = parameters.get(INPUT); Input input = inputFactory.get(inputName); @@ -239,72 +250,52 @@ public class Runner { throw new ProgramParametrizationException("Unknown input type: " + inputName); } - // Input - try { input.configure(parameters); } catch (RuntimeException ex) { throw new ProgramParametrizationException(ex.getMessage()); } - Graph graph = input.create(env); - - // Algorithm - - algorithm.configure(parameters); - algorithm.plan(graph); - - // Output + // output and usage if (!parameters.has(OUTPUT)) { throw new ProgramParametrizationException("No output given"); } String outputName = parameters.get(OUTPUT); - String executionNamePrefix = input.getIdentity() + " -> " + algorithmName + " -> "; + Output output = outputFactory.get(outputName); - System.out.println(); + if (output == null) { + throw new ProgramParametrizationException("Unknown output type: " + outputName); + } + + try { + output.configure(parameters); + } catch (RuntimeException ex) { + throw new ProgramParametrizationException(ex.getMessage()); + } - switch (outputName.toLowerCase()) { - case "csv": - if (algorithm instanceof org.apache.flink.graph.drivers.output.CSV) { - String filename = parameters.getRequired("output_filename"); + // ---------------------------------------------------------------------------------------- + // Execute + // ---------------------------------------------------------------------------------------- - String lineDelimiter = StringEscapeUtils.unescapeJava( - parameters.get("output_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER)); + // Create input + Graph graph = input.create(env); - String fieldDelimiter = StringEscapeUtils.unescapeJava( - parameters.get("output_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER)); + // Run algorithm + DataSet results = algorithm.plan(graph); - org.apache.flink.graph.drivers.output.CSV c = (org.apache.flink.graph.drivers.output.CSV) algorithm; - c.writeCSV(filename, lineDelimiter, fieldDelimiter); + // Output + String executionName = input.getIdentity() + " ⇨ " + algorithmName + " ⇨ " + output.getName(); - env.execute(executionNamePrefix + "CSV"); - } else { - throw new ProgramParametrizationException("Algorithm does not support output type 'CSV'"); - } - break; - - case "hash": - if (algorithm instanceof Hash) { - Hash h = (Hash) algorithm; - h.hash(executionNamePrefix + "Hash"); - } else { - throw new ProgramParametrizationException("Algorithm does not support output type 'hash'"); - } - break; - - case "print": - if (algorithm instanceof Print) { - Print h = (Print) algorithm; - h.print(executionNamePrefix + "Print"); - } else { - throw new ProgramParametrizationException("Algorithm does not support output type 'print'"); - } - break; + System.out.println(); - default: - throw new ProgramParametrizationException("Unknown output type: " + outputName); + if (results == null) { + env.execute(executionName); + } else { + output.write(executionName, System.out, results); } + + algorithm.printAnalytics(System.out); } /** @@ -336,7 +327,7 @@ public class Runner { */ public T get(String name) { for (T instance : this) { - if (name.equals(instance.getName())) { + if (name.equalsIgnoreCase(instance.getName())) { return instance; } } diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/AdamicAdar.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/AdamicAdar.java index c5867ed6f5ee7d058f804f428028f43012357fe6..e439ccd9403b3cbc075bc601f2c3c77b7bfa2412 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/AdamicAdar.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/AdamicAdar.java @@ -20,11 +20,8 @@ package org.apache.flink.graph.drivers; import org.apache.flink.api.java.DataSet; import org.apache.flink.graph.Graph; -import org.apache.flink.graph.drivers.output.CSV; -import org.apache.flink.graph.drivers.output.Print; import org.apache.flink.graph.drivers.parameter.DoubleParameter; import org.apache.flink.graph.drivers.parameter.LongParameter; -import org.apache.flink.graph.library.similarity.AdamicAdar.Result; import org.apache.flink.types.CopyableValue; import org.apache.commons.lang3.text.StrBuilder; @@ -36,8 +33,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; * Driver for {@link org.apache.flink.graph.library.similarity.AdamicAdar}. */ public class AdamicAdar, VV, EV> -extends SimpleDriver> -implements CSV, Print { +extends DriverBase { private DoubleParameter minRatio = new DoubleParameter(this, "minimum_ratio") .setDefaultValue(0.0) @@ -50,11 +46,6 @@ implements CSV, Print { private LongParameter littleParallelism = new LongParameter(this, "little_parallelism") .setDefaultValue(PARALLELISM_DEFAULT); - @Override - public String getName() { - return this.getClass().getSimpleName(); - } - @Override public String getShortDescription() { return "similarity score weighted by centerpoint degree"; @@ -72,7 +63,7 @@ implements CSV, Print { } @Override - protected DataSet> simplePlan(Graph graph) throws Exception { + public DataSet plan(Graph graph) throws Exception { int lp = littleParallelism.getValue().intValue(); return graph diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java index bcd8ec48479ccff44a10f7cc96c11acca49a02b2..14e953ac6b12365ff908a6ad6ab26cc29cf73ba8 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java @@ -22,9 +22,6 @@ import org.apache.flink.api.java.DataSet; import org.apache.flink.graph.Graph; import org.apache.flink.graph.GraphAnalytic; import org.apache.flink.graph.asm.result.PrintableResult; -import org.apache.flink.graph.drivers.output.CSV; -import org.apache.flink.graph.drivers.output.Hash; -import org.apache.flink.graph.drivers.output.Print; import org.apache.flink.graph.drivers.parameter.ChoiceParameter; import org.apache.flink.graph.drivers.parameter.LongParameter; import org.apache.flink.types.CopyableValue; @@ -32,6 +29,8 @@ import org.apache.flink.types.CopyableValue; import org.apache.commons.lang3.text.StrBuilder; import org.apache.commons.lang3.text.WordUtils; +import java.io.PrintStream; + import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; /** @@ -45,8 +44,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; * @see org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient */ public class ClusteringCoefficient & CopyableValue, VV, EV> -extends SimpleDriver -implements CSV, Hash, Print { +extends DriverBase { private static final String DIRECTED = "directed"; @@ -62,11 +60,6 @@ implements CSV, Hash, Print { private GraphAnalytic averageClusteringCoefficient; - @Override - public String getName() { - return this.getClass().getSimpleName(); - } - @Override public String getShortDescription() { return "measure the connectedness of vertex neighborhoods"; @@ -87,7 +80,7 @@ implements CSV, Hash, Print { } @Override - protected DataSet simplePlan(Graph graph) throws Exception { + public DataSet plan(Graph graph) throws Exception { int lp = littleParallelism.getValue().intValue(); switch (order.getValue()) { @@ -127,25 +120,8 @@ implements CSV, Hash, Print { } @Override - public void hash(String executionName) throws Exception { - super.hash(executionName); - printAnalytics(); - } - - @Override - public void print(String executionName) throws Exception { - super.print(executionName); - printAnalytics(); - } - - @Override - public void writeCSV(String filename, String lineDelimiter, String fieldDelimiter) { - super.writeCSV(filename, lineDelimiter, fieldDelimiter); - printAnalytics(); - } - - private void printAnalytics() { - System.out.println(globalClusteringCoefficient.getResult().toPrintableString()); - System.out.println(averageClusteringCoefficient.getResult().toPrintableString()); + public void printAnalytics(PrintStream out) { + out.println(globalClusteringCoefficient.getResult().toPrintableString()); + out.println(averageClusteringCoefficient.getResult().toPrintableString()); } } diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ConnectedComponents.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ConnectedComponents.java index 8a158a25ce64fea1823719baccfde1a7fe170c4b..32f94c125984b4caf8ebe624d2763f3b6daa8082 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ConnectedComponents.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ConnectedComponents.java @@ -22,17 +22,8 @@ import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.graph.Graph; import org.apache.flink.graph.Vertex; -import org.apache.flink.graph.asm.dataset.ChecksumHashCode; -import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum; -import org.apache.flink.graph.asm.dataset.Collect; -import org.apache.flink.graph.drivers.output.CSV; -import org.apache.flink.graph.drivers.output.Hash; -import org.apache.flink.graph.drivers.output.Print; -import org.apache.flink.graph.drivers.parameter.ParameterizedBase; import org.apache.flink.graph.library.GSAConnectedComponents; -import java.util.List; - /** * Driver for {@link org.apache.flink.graph.library.GSAConnectedComponents}. * @@ -40,15 +31,7 @@ import java.util.List; * handle object reuse (see FLINK-5891). */ public class ConnectedComponents, VV, EV> -extends ParameterizedBase -implements Driver, CSV, Hash, Print { - - private DataSet> components; - - @Override - public String getName() { - return this.getClass().getSimpleName(); - } +extends DriverBase { @Override public String getShortDescription() { @@ -61,37 +44,19 @@ implements Driver, CSV, Hash, Print { } @Override - public void plan(Graph graph) throws Exception { - components = graph + public DataSet plan(Graph graph) throws Exception { + return graph .mapVertices(new MapVertices()) .run(new GSAConnectedComponents(Integer.MAX_VALUE)); } - @Override - public void hash(String executionName) throws Exception { - Checksum checksum = new ChecksumHashCode>() - .run(components) - .execute(executionName); - - System.out.println(checksum); - } - - @Override - public void print(String executionName) throws Exception { - List> results = new Collect>().run(components).execute(executionName); - - for (Vertex result : results) { - System.out.println(result); - } - } - - @Override - public void writeCSV(String filename, String lineDelimiter, String fieldDelimiter) { - components - .writeAsCsv(filename, lineDelimiter, fieldDelimiter) - .name("CSV: " + filename); - } - + /** + * Initialize vertices into separate components by setting each vertex + * value to the vertex ID. + * + * @param vertex ID type + * @param vertex value type + */ private static final class MapVertices implements MapFunction, T> { @Override diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Driver.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Driver.java index fda9079786991ba27a355a7b93d248362cbbfa39..4ec76536eb78484856231ab9ad10223f181cf84c 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Driver.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Driver.java @@ -18,14 +18,17 @@ package org.apache.flink.graph.drivers; +import org.apache.flink.api.java.DataSet; import org.apache.flink.graph.Graph; import org.apache.flink.graph.GraphAlgorithm; import org.apache.flink.graph.GraphAnalytic; import org.apache.flink.graph.drivers.parameter.Parameterized; +import java.io.PrintStream; + /** - * A driver for one or more {@link GraphAlgorithm}s and/or - * {@link GraphAnalytic}s. + * A driver for one or more {@link GraphAlgorithm} and/or + * {@link GraphAnalytic}. * *

It is preferable to include multiple, overlapping algorithms/analytics in * the same driver both for simplicity and since this examples module @@ -59,8 +62,20 @@ extends Parameterized { *

Drivers are first configured, next planned, and finally the chosen * output method is called. * + *

A {@code null} value should be returned when the {@link Driver} does + * not execute a {@link GraphAlgorithm} but only executes a + * {@link GraphAnalytic}. + * * @param graph input graph * @throws Exception on error */ - void plan(Graph graph) throws Exception; + DataSet plan(Graph graph) throws Exception; + + /** + * Analytic results are summaries so are always printed to the console + * irrespective of the chosen {@code Output}. + * + * @param out output stream for printing results + */ + void printAnalytics(PrintStream out); } diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/DriverBase.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/DriverBase.java new file mode 100644 index 0000000000000000000000000000000000000000..38e4ea8498c69e0aa695e1d0a3a0844da3801c1d --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/DriverBase.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.drivers; + +import org.apache.flink.graph.drivers.parameter.ParameterizedBase; + +import java.io.PrintStream; + +/** + * Base class for example drivers. + * + * @param graph ID type + * @param vertex value type + * @param edge value type + */ +public abstract class DriverBase +extends ParameterizedBase +implements Driver { + + @Override + public String getName() { + return this.getClass().getSimpleName(); + } + + @Override + public void printAnalytics(PrintStream out) { + // analytics are optionally executed by drivers overriding this method + } +} diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/EdgeList.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/EdgeList.java index 5da02847f09682c32d00c250fb3cc363a126bb77..287e222aed3779b0423f870c6db249985d9ccb2b 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/EdgeList.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/EdgeList.java @@ -25,18 +25,9 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.ValueTypeInfo; import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; -import org.apache.flink.graph.asm.dataset.ChecksumHashCode; -import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum; -import org.apache.flink.graph.asm.dataset.Collect; -import org.apache.flink.graph.drivers.output.CSV; -import org.apache.flink.graph.drivers.output.Hash; -import org.apache.flink.graph.drivers.output.Print; -import org.apache.flink.graph.drivers.parameter.ParameterizedBase; import org.apache.flink.graph.utils.EdgeToTuple2Map; import org.apache.flink.types.NullValue; -import java.util.List; - /** * Convert a {@link Graph} to the {@link DataSet} of {@link Edge}. * @@ -45,15 +36,7 @@ import java.util.List; * @param edge value type */ public class EdgeList -extends ParameterizedBase -implements Driver, CSV, Hash, Print { - - private DataSet> edges; - - @Override - public String getName() { - return this.getClass().getSimpleName(); - } +extends DriverBase { @Override public String getShortDescription() { @@ -66,47 +49,15 @@ implements Driver, CSV, Hash, Print { } @Override - public void plan(Graph graph) throws Exception { - edges = graph - .getEdges(); - } - - @Override - public void hash(String executionName) throws Exception { - Checksum checksum = new ChecksumHashCode>() - .run(edges) - .execute(executionName); - - System.out.println(checksum); - } - - @Override - public void print(String executionName) throws Exception { - List> records = new Collect>().run(edges).execute(executionName); + public DataSet plan(Graph graph) throws Exception { + DataSet> edges = graph.getEdges(); if (hasNullValueEdges(edges)) { - for (Edge result : records) { - System.out.println("(" + result.f0 + "," + result.f1 + ")"); - } - } else { - for (Edge result : records) { - System.out.println(result); - } - } - } - - @Override - public void writeCSV(String filename, String lineDelimiter, String fieldDelimiter) { - if (hasNullValueEdges(edges)) { - edges + return edges .map(new EdgeToTuple2Map()) - .name("Edge to Tuple2") - .writeAsCsv(filename, lineDelimiter, fieldDelimiter) - .name("CSV: " + filename); + .name("Edge to Tuple2"); } else { - edges - .writeAsCsv(filename, lineDelimiter, fieldDelimiter) - .name("CSV: " + filename); + return edges; } } diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java index ea0222504eede6a506390a858729ff60f62a124b..f4da13c656a7ead9202c33e754668ef8fb096c3e 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java @@ -18,16 +18,16 @@ package org.apache.flink.graph.drivers; +import org.apache.flink.api.java.DataSet; import org.apache.flink.graph.Graph; import org.apache.flink.graph.GraphAnalytic; import org.apache.flink.graph.asm.result.PrintableResult; -import org.apache.flink.graph.drivers.output.Hash; -import org.apache.flink.graph.drivers.output.Print; import org.apache.flink.graph.drivers.parameter.ChoiceParameter; -import org.apache.flink.graph.drivers.parameter.ParameterizedBase; import org.apache.commons.lang3.text.StrBuilder; +import java.io.PrintStream; + /** * Driver for directed and undirected graph metrics analytics. * @@ -37,8 +37,7 @@ import org.apache.commons.lang3.text.StrBuilder; * @see org.apache.flink.graph.library.metric.undirected.VertexMetrics */ public class GraphMetrics, VV, EV> -extends ParameterizedBase -implements Driver, Hash, Print { +extends DriverBase { private static final String DIRECTED = "directed"; @@ -51,11 +50,6 @@ implements Driver, Hash, Print { private GraphAnalytic edgeMetrics; - @Override - public String getName() { - return this.getClass().getSimpleName(); - } - @Override public String getShortDescription() { return "compute vertex and edge metrics"; @@ -87,7 +81,7 @@ implements Driver, Hash, Print { } @Override - public void plan(Graph graph) throws Exception { + public DataSet plan(Graph graph) throws Exception { switch (order.getValue()) { case DIRECTED: vertexMetrics = graph @@ -105,22 +99,17 @@ implements Driver, Hash, Print { .run(new org.apache.flink.graph.library.metric.undirected.EdgeMetrics()); break; } - } - @Override - public void hash(String executionName) throws Exception { - print(executionName); + return null; } @Override - public void print(String executionName) throws Exception { - vertexMetrics.execute(executionName); - - System.out.print("Vertex metrics:\n "); - System.out.println(vertexMetrics.getResult().toPrintableString().replace(";", "\n ")); + public void printAnalytics(PrintStream out) { + out.print("Vertex metrics:\n "); + out.println(vertexMetrics.getResult().toPrintableString().replace(";", "\n ")); - System.out.println(); - System.out.print("Edge metrics:\n "); - System.out.println(edgeMetrics.getResult().toPrintableString().replace(";", "\n ")); + out.println(); + out.print("Edge metrics:\n "); + out.println(edgeMetrics.getResult().toPrintableString().replace(";", "\n ")); } } diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java index 6f24c091bec14c3730c005610ae23891ff600b8a..1987421ca8466820ce3e0ce6591eefa7840568cc 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java @@ -20,10 +20,7 @@ package org.apache.flink.graph.drivers; import org.apache.flink.api.java.DataSet; import org.apache.flink.graph.Graph; -import org.apache.flink.graph.drivers.output.CSV; -import org.apache.flink.graph.drivers.output.Print; import org.apache.flink.graph.drivers.parameter.IterationConvergence; -import org.apache.flink.graph.library.linkanalysis.HITS.Result; import org.apache.commons.lang3.text.StrBuilder; import org.apache.commons.lang3.text.WordUtils; @@ -32,18 +29,12 @@ import org.apache.commons.lang3.text.WordUtils; * Driver for {@link org.apache.flink.graph.library.linkanalysis.HITS}. */ public class HITS -extends SimpleDriver> -implements CSV, Print { +extends DriverBase { private static final int DEFAULT_ITERATIONS = 10; private IterationConvergence iterationConvergence = new IterationConvergence(this, DEFAULT_ITERATIONS); - @Override - public String getName() { - return this.getClass().getSimpleName(); - } - @Override public String getShortDescription() { return "score vertices as hubs and authorities"; @@ -61,7 +52,7 @@ implements CSV, Print { } @Override - protected DataSet> simplePlan(Graph graph) throws Exception { + public DataSet plan(Graph graph) throws Exception { return graph .run(new org.apache.flink.graph.library.linkanalysis.HITS( iterationConvergence.getValue().iterations, diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java index f6e10f06c3e1b0cf24081ff5e4634394b6bc66e5..8f6cfb7fb34f96eb384476175d306423432af7bd 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java @@ -20,12 +20,8 @@ package org.apache.flink.graph.drivers; import org.apache.flink.api.java.DataSet; import org.apache.flink.graph.Graph; -import org.apache.flink.graph.drivers.output.CSV; -import org.apache.flink.graph.drivers.output.Hash; -import org.apache.flink.graph.drivers.output.Print; import org.apache.flink.graph.drivers.parameter.BooleanParameter; import org.apache.flink.graph.drivers.parameter.LongParameter; -import org.apache.flink.graph.library.similarity.JaccardIndex.Result; import org.apache.flink.types.CopyableValue; import org.apache.commons.lang3.text.StrBuilder; @@ -37,8 +33,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; * Driver for {@link org.apache.flink.graph.library.similarity.JaccardIndex}. */ public class JaccardIndex, VV, EV> -extends SimpleDriver> -implements CSV, Hash, Print { +extends DriverBase { private LongParameter minNumerator = new LongParameter(this, "minimum_numerator") .setDefaultValue(0) @@ -56,15 +51,10 @@ implements CSV, Hash, Print { .setDefaultValue(1) .setMinimumValue(1); - private LongParameter littleParallelism = new LongParameter(this, "little_parallelism") - .setDefaultValue(PARALLELISM_DEFAULT); - private BooleanParameter mirrorResults = new BooleanParameter(this, "mirror_results"); - @Override - public String getName() { - return this.getClass().getSimpleName(); - } + private LongParameter littleParallelism = new LongParameter(this, "little_parallelism") + .setDefaultValue(PARALLELISM_DEFAULT); @Override public String getShortDescription() { @@ -85,7 +75,7 @@ implements CSV, Hash, Print { } @Override - protected DataSet> simplePlan(Graph graph) throws Exception { + public DataSet plan(Graph graph) throws Exception { int lp = littleParallelism.getValue().intValue(); return graph diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/PageRank.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/PageRank.java index b2602b95995588ff4b6d2dabe386f1cc3c81f677..224dea8798a3d276e39cdadb2aa0e45f01e0a2ba 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/PageRank.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/PageRank.java @@ -20,11 +20,8 @@ package org.apache.flink.graph.drivers; import org.apache.flink.api.java.DataSet; import org.apache.flink.graph.Graph; -import org.apache.flink.graph.drivers.output.CSV; -import org.apache.flink.graph.drivers.output.Print; import org.apache.flink.graph.drivers.parameter.DoubleParameter; import org.apache.flink.graph.drivers.parameter.IterationConvergence; -import org.apache.flink.graph.library.linkanalysis.PageRank.Result; import org.apache.commons.lang3.text.StrBuilder; @@ -32,8 +29,7 @@ import org.apache.commons.lang3.text.StrBuilder; * @see org.apache.flink.graph.library.linkanalysis.PageRank */ public class PageRank -extends SimpleDriver> -implements CSV, Print { +extends DriverBase { private static final int DEFAULT_ITERATIONS = 10; @@ -44,11 +40,6 @@ implements CSV, Print { private IterationConvergence iterationConvergence = new IterationConvergence(this, DEFAULT_ITERATIONS); - @Override - public String getName() { - return this.getClass().getSimpleName(); - } - @Override public String getShortDescription() { return "score vertices by the number and quality of incoming links"; @@ -66,7 +57,7 @@ implements CSV, Print { } @Override - protected DataSet> simplePlan(Graph graph) throws Exception { + public DataSet plan(Graph graph) throws Exception { return graph .run(new org.apache.flink.graph.library.linkanalysis.PageRank( dampingFactor.getValue(), diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/SimpleDriver.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/SimpleDriver.java deleted file mode 100644 index a5ace263ae6526bb34e4b18f475edff63afd7f64..0000000000000000000000000000000000000000 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/SimpleDriver.java +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.drivers; - -import org.apache.flink.api.java.DataSet; -import org.apache.flink.graph.Graph; -import org.apache.flink.graph.asm.dataset.ChecksumHashCode; -import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum; -import org.apache.flink.graph.asm.dataset.Collect; -import org.apache.flink.graph.asm.result.PrintableResult; -import org.apache.flink.graph.drivers.parameter.ParameterizedBase; - -import java.util.List; - -/** - * A base driver storing a single result {@link DataSet} with values - * implementing {@link PrintableResult}. - * - * @param algorithm's result type - */ -public abstract class SimpleDriver -extends ParameterizedBase -implements Driver { - - private DataSet result; - - protected DataSet getResult() { - return result; - } - - /** - * Plan the algorithm and return the result {@link DataSet}. - * - * @param graph input graph - * @return driver output - * @throws Exception on error - */ - protected abstract DataSet simplePlan(Graph graph) throws Exception; - - @Override - public void plan(Graph graph) throws Exception { - result = simplePlan(graph); - } - - /** - * Print hash of execution results. - * - *

Does *not* implement/override {@code Hash} since {@link Driver} - * implementations designate the appropriate outputs. - * - * @param executionName job name - * @throws Exception on error - */ - public void hash(String executionName) throws Exception { - Checksum checksum = new ChecksumHashCode() - .run(result) - .execute(executionName); - - System.out.println(checksum); - } - - /** - * Print execution results. - * - *

Does *not* implement/override {@code Print} since {@link Driver} - * implementations designate the appropriate outputs. - * - * @param executionName job name - * @throws Exception on error - */ - public void print(String executionName) throws Exception { - List results = new Collect().run(result).execute(executionName); - - for (R result : results) { - System.out.println(result.toPrintableString()); - } - } - - /** - * Write execution results to file using CSV format. - * - *

Does *not* implement/override {@code CSV} since {@link Driver} - * implementations designate the appropriate outputs. - * - * @param filename output filename - * @param lineDelimiter CSV delimiter between lines - * @param fieldDelimiter CSV delimiter between fields - */ - public void writeCSV(String filename, String lineDelimiter, String fieldDelimiter) { - result - .writeAsCsv(filename, lineDelimiter, fieldDelimiter) - .name("CSV: " + filename); - } -} diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java index 1c9bdc583419ebc59c59a458820d4be1e99e3a2c..86a61d552f6fc8e82f660ba62c74e64f238e755e 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java @@ -22,9 +22,6 @@ import org.apache.flink.api.java.DataSet; import org.apache.flink.graph.Graph; import org.apache.flink.graph.GraphAnalytic; import org.apache.flink.graph.asm.result.PrintableResult; -import org.apache.flink.graph.drivers.output.CSV; -import org.apache.flink.graph.drivers.output.Hash; -import org.apache.flink.graph.drivers.output.Print; import org.apache.flink.graph.drivers.parameter.BooleanParameter; import org.apache.flink.graph.drivers.parameter.ChoiceParameter; import org.apache.flink.graph.drivers.parameter.LongParameter; @@ -33,6 +30,8 @@ import org.apache.flink.types.CopyableValue; import org.apache.commons.lang3.text.StrBuilder; import org.apache.commons.lang3.text.WordUtils; +import java.io.PrintStream; + import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; /** @@ -44,8 +43,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; * @see org.apache.flink.graph.library.clustering.undirected.TriadicCensus */ public class TriangleListing & CopyableValue, VV, EV> -extends SimpleDriver -implements CSV, Hash, Print { +extends DriverBase { private static final String DIRECTED = "directed"; @@ -63,11 +61,6 @@ implements CSV, Hash, Print { private GraphAnalytic triadicCensus; - @Override - public String getName() { - return this.getClass().getSimpleName(); - } - @Override public String getShortDescription() { return "list triangles"; @@ -85,7 +78,7 @@ implements CSV, Hash, Print { } @Override - protected DataSet simplePlan(Graph graph) throws Exception { + public DataSet plan(Graph graph) throws Exception { int lp = littleParallelism.getValue().intValue(); switch (order.getValue()) { @@ -123,27 +116,10 @@ implements CSV, Hash, Print { } @Override - public void hash(String executionName) throws Exception { - super.hash(executionName); - printAnalytics(); - } - - @Override - public void print(String executionName) throws Exception { - super.print(executionName); - printAnalytics(); - } - - @Override - public void writeCSV(String filename, String lineDelimiter, String fieldDelimiter) { - super.writeCSV(filename, lineDelimiter, fieldDelimiter); - printAnalytics(); - } - - private void printAnalytics() { + public void printAnalytics(PrintStream out) { if (computeTriadicCensus.getValue()) { - System.out.print("Triadic census:\n "); - System.out.println(triadicCensus.getResult().toPrintableString().replace(";", "\n ")); + out.print("Triadic census:\n "); + out.println(triadicCensus.getResult().toPrintableString().replace(";", "\n ")); } } } diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CSV.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CSV.java index b3f88f6d5b99e6db72908d2aced57fb0b00ee319..697da97d50df107eed1b6fb7cdccfe3f9f6e469e 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CSV.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CSV.java @@ -24,7 +24,6 @@ import org.apache.flink.client.program.ProgramParametrizationException; import org.apache.flink.graph.Graph; import org.apache.flink.graph.GraphCsvReader; import org.apache.flink.graph.drivers.parameter.ChoiceParameter; -import org.apache.flink.graph.drivers.parameter.ParameterizedBase; import org.apache.flink.graph.drivers.parameter.Simplify; import org.apache.flink.graph.drivers.parameter.StringParameter; import org.apache.flink.types.IntValue; @@ -41,8 +40,7 @@ import org.apache.commons.lang3.text.WordUtils; * @param key type */ public class CSV> -extends ParameterizedBase -implements Input { +extends InputBase { private static final String INTEGER = "integer"; @@ -67,11 +65,6 @@ implements Input { private Simplify simplify = new Simplify(this); - @Override - public String getName() { - return CSV.class.getSimpleName(); - } - @Override public String getIdentity() { return WordUtils.capitalize(getName()) + WordUtils.capitalize(type.getValue()) + " (" + inputFilename + ")"; diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CirculantGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CirculantGraph.java index a5a2540adacb5a274db3dc3039b98a73b4c47406..a9d05a2edb4d8437c3ed71691e797395f013667a 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CirculantGraph.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CirculantGraph.java @@ -51,14 +51,9 @@ extends GeneratedGraph { private List offsetRanges = new ArrayList<>(); - @Override - public String getName() { - return CirculantGraph.class.getSimpleName(); - } - @Override public String getUsage() { - return "--" + PREFIX + "0 offset:length [--" + PREFIX + "1 offset:length [--" + PREFIX + "2 ...]]" + return "--" + PREFIX + "0 offset:length [--" + PREFIX + "1 offset:length [--" + PREFIX + "2 ...]] " + super.getUsage(); } diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CompleteGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CompleteGraph.java index dc85df44b4fc834cc3b223173b1986477e70e644..64bae73d35544989541f06efaa8e1999559f440e 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CompleteGraph.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CompleteGraph.java @@ -39,11 +39,6 @@ extends GeneratedGraph { private LongParameter littleParallelism = new LongParameter(this, "little_parallelism") .setDefaultValue(PARALLELISM_DEFAULT); - @Override - public String getName() { - return CompleteGraph.class.getSimpleName(); - } - @Override public String getIdentity() { return getTypeName() + " " + getName() + " (" + vertexCount.getValue() + ")"; diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CycleGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CycleGraph.java index 9ef67c35a12c24687f317680605046cd2ae6c8d8..d84cfca5a84bea2a0ce6e3ee47a1c4ed8405b220 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CycleGraph.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CycleGraph.java @@ -39,11 +39,6 @@ extends GeneratedGraph { private LongParameter littleParallelism = new LongParameter(this, "little_parallelism") .setDefaultValue(PARALLELISM_DEFAULT); - @Override - public String getName() { - return CycleGraph.class.getSimpleName(); - } - @Override public String getIdentity() { return getTypeName() + " " + getName() + " (" + vertexCount + ")"; diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/EchoGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/EchoGraph.java index c9b0874562da580443e7b053d2a85899c10b5fc2..5ca2f2fe100fd2aeba465d4ed0ca380b22870155 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/EchoGraph.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/EchoGraph.java @@ -43,11 +43,6 @@ extends GeneratedGraph { private LongParameter littleParallelism = new LongParameter(this, "little_parallelism") .setDefaultValue(PARALLELISM_DEFAULT); - @Override - public String getName() { - return EchoGraph.class.getSimpleName(); - } - @Override public String getIdentity() { return getTypeName() + " " + getName() + " (" + vertexCount.getValue() + ":" + vertexDegree.getValue() + ")"; diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/EmptyGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/EmptyGraph.java index e7b59421a2be9797638955f70d2146248d068aca..6feb3c88921c175115c87c777f9352209118e0d3 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/EmptyGraph.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/EmptyGraph.java @@ -35,11 +35,6 @@ extends GeneratedGraph { private LongParameter vertexCount = new LongParameter(this, "vertex_count") .setMinimumValue(MINIMUM_VERTEX_COUNT); - @Override - public String getName() { - return EmptyGraph.class.getSimpleName(); - } - @Override public String getIdentity() { return getTypeName() + " " + getName() + " (" + vertexCount + ")"; diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GeneratedGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GeneratedGraph.java index d4467df1f63e89fce58eec017ea344071d8b49e5..a0446eee90dfe619bd0781b597a70235a4080fb0 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GeneratedGraph.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GeneratedGraph.java @@ -26,7 +26,6 @@ import org.apache.flink.graph.asm.translate.TranslateGraphIds; import org.apache.flink.graph.asm.translate.translators.LongValueToStringValue; import org.apache.flink.graph.asm.translate.translators.LongValueToUnsignedIntValue; import org.apache.flink.graph.drivers.parameter.ChoiceParameter; -import org.apache.flink.graph.drivers.parameter.ParameterizedBase; import org.apache.flink.types.ByteValue; import org.apache.flink.types.CharValue; import org.apache.flink.types.LongValue; @@ -41,8 +40,7 @@ import org.apache.commons.lang3.text.WordUtils; * @param graph ID type */ public abstract class GeneratedGraph -extends ParameterizedBase -implements Input { +extends InputBase { private static final String BYTE = "byte"; private static final String NATIVE_BYTE = "nativeByte"; diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GridGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GridGraph.java index a3aabd923116bce353ab0940e7e14e2b6b97330a..1b6bac1dd5d026454f5dbace29bd9b0c5b9940a2 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GridGraph.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GridGraph.java @@ -47,14 +47,9 @@ extends GeneratedGraph { private LongParameter littleParallelism = new LongParameter(this, "little_parallelism") .setDefaultValue(PARALLELISM_DEFAULT); - @Override - public String getName() { - return GridGraph.class.getSimpleName(); - } - @Override public String getUsage() { - return "--" + PREFIX + "0 size:wrap_endpoints [--" + PREFIX + " size:wrap_endpoints [--" + PREFIX + " ...]]" + return "--" + PREFIX + "0 size:wrap_endpoints [--" + PREFIX + " size:wrap_endpoints [--" + PREFIX + " ...]] " + super.getUsage(); } diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/HypercubeGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/HypercubeGraph.java index 8d1e8b166a9537ab822aeeda9fde05f8e789c492..1be65bd51264eb435c62ba154f21fc4799f29f96 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/HypercubeGraph.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/HypercubeGraph.java @@ -40,11 +40,6 @@ extends GeneratedGraph { private LongParameter littleParallelism = new LongParameter(this, "little_parallelism") .setDefaultValue(PARALLELISM_DEFAULT); - @Override - public String getName() { - return HypercubeGraph.class.getSimpleName(); - } - @Override public String getIdentity() { return getTypeName() + " " + getName() + " (" + dimensions + ")"; diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/InputBase.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/InputBase.java new file mode 100644 index 0000000000000000000000000000000000000000..60b5217df290322e26a3eac72db5eff501c78303 --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/InputBase.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.graph.drivers.input; + +import org.apache.flink.graph.drivers.parameter.ParameterizedBase; + +/** + * Base class for inputs. + * + * @param graph ID type + * @param vertex value type + * @param edge value type + */ +public abstract class InputBase +extends ParameterizedBase +implements Input { + + @Override + public String getName() { + return this.getClass().getSimpleName(); + } +} diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/PathGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/PathGraph.java index 9e02056c7c405b67d70ea186a5e4f217f60d4f7d..7f3a3e507e6a98c344aa8441b83ec6e3d7a7f208 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/PathGraph.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/PathGraph.java @@ -39,11 +39,6 @@ extends GeneratedGraph { private LongParameter littleParallelism = new LongParameter(this, "little_parallelism") .setDefaultValue(PARALLELISM_DEFAULT); - @Override - public String getName() { - return PathGraph.class.getSimpleName(); - } - @Override public String getIdentity() { return getTypeName() + " " + getName() + " (" + vertexCount + ")"; diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/RMatGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/RMatGraph.java index 3b7508988b5a83ff133abc26a8ff5d0ecf73748f..66ba8887d9659170f60f2750950098643f32bda3 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/RMatGraph.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/RMatGraph.java @@ -82,11 +82,6 @@ extends GeneratedMultiGraph { private LongParameter littleParallelism = new LongParameter(this, "little_parallelism") .setDefaultValue(PARALLELISM_DEFAULT); - @Override - public String getName() { - return this.getClass().getSimpleName(); - } - @Override public String getIdentity() { return getTypeName() + " " + getName() + diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/SingletonEdgeGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/SingletonEdgeGraph.java index 65e0196ec065906f32dd42134f0c75b743fdd27b..44da3f3de5bb7f7126441eda8ddf60d43913a512 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/SingletonEdgeGraph.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/SingletonEdgeGraph.java @@ -40,11 +40,6 @@ extends GeneratedGraph { private LongParameter littleParallelism = new LongParameter(this, "little_parallelism") .setDefaultValue(PARALLELISM_DEFAULT); - @Override - public String getName() { - return SingletonEdgeGraph.class.getSimpleName(); - } - @Override public String getIdentity() { return getTypeName() + " " + getName() + " (" + vertexPairCount + ")"; diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/StarGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/StarGraph.java index b37dc493a4163ec6d203a60188a3f8597de8bbaf..d488b59885da85a4bd7f9cd8b75d1ece4dea83a8 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/StarGraph.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/StarGraph.java @@ -39,11 +39,6 @@ extends GeneratedGraph { private LongParameter littleParallelism = new LongParameter(this, "little_parallelism") .setDefaultValue(PARALLELISM_DEFAULT); - @Override - public String getName() { - return StarGraph.class.getSimpleName(); - } - @Override public String getIdentity() { return getTypeName() + " " + getName() + " (" + vertexCount + ")"; diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/output/CSV.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/output/CSV.java index 5d1faeb845e74fd10905e71474fb33825f463874..9b6aae671ec1710ae7a988d7d4ce30493dc2dc93 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/output/CSV.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/output/CSV.java @@ -18,17 +18,32 @@ package org.apache.flink.graph.drivers.output; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.io.CsvOutputFormat; +import org.apache.flink.graph.drivers.parameter.StringParameter; + +import java.io.PrintStream; + /** * Write algorithm output to file using CSV format. + * + * @param result Type */ -public interface CSV { - - /** - * Write execution results to file using CSV format. - * - * @param filename output filename - * @param lineDelimiter CSV delimiter between lines - * @param fieldDelimiter CSV delimiter between fields - */ - void writeCSV(String filename, String lineDelimiter, String fieldDelimiter); +public class CSV +extends OutputBase { + + private StringParameter filename = new StringParameter(this, "output_filename"); + + private StringParameter lineDelimiter = new StringParameter(this, "output_line_delimiter") + .setDefaultValue(CsvOutputFormat.DEFAULT_LINE_DELIMITER); + + private StringParameter fieldDelimiter = new StringParameter(this, "output_field_delimiter") + .setDefaultValue(CsvOutputFormat.DEFAULT_FIELD_DELIMITER); + + @Override + public void write(String executionName, PrintStream out, DataSet data) throws Exception { + data + .writeAsCsv(filename.getValue(), lineDelimiter.getValue(), fieldDelimiter.getValue()) + .name("CSV: " + filename.getValue()); + } } diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/output/Hash.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/output/Hash.java index e1c399eaf4181fb2f5f08409c25d5e963ec84f0b..a853339558336541eb296874612d017276dc1ff6 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/output/Hash.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/output/Hash.java @@ -18,16 +18,33 @@ package org.apache.flink.graph.drivers.output; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.graph.asm.dataset.ChecksumHashCode; +import org.apache.flink.graph.drivers.parameter.BooleanParameter; + +import java.io.PrintStream; + /** * Print hash of algorithm output. + * + * @param result Type */ -public interface Hash { - - /** - * Print hash of execution results. - * - * @param executionName job name - * @throws Exception on error - */ - void hash(String executionName) throws Exception; +public class Hash +extends OutputBase { + + private BooleanParameter printExecutionPlan = new BooleanParameter(this, "__print_execution_plan"); + + @Override + public void write(String executionName, PrintStream out, DataSet data) throws Exception { + ChecksumHashCode checksumHashCode = new ChecksumHashCode().run(data); + + if (printExecutionPlan.getValue()) { + System.out.println(data.getExecutionEnvironment().getExecutionPlan()); + } + + ChecksumHashCode.Checksum checksum = checksumHashCode + .execute(executionName); + + out.println(checksum); + } } diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/output/Output.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/output/Output.java new file mode 100644 index 0000000000000000000000000000000000000000..710adce3a16362989e358a0f30da54688c5ecca3 --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/output/Output.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.drivers.output; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.graph.GraphAlgorithm; +import org.apache.flink.graph.drivers.parameter.Parameterized; + +import java.io.PrintStream; + +/** + * Output writer for a {@link GraphAlgorithm} result. + */ +public interface Output +extends Parameterized { + + /** + * Write the output {@link DataSet}. + * + * @param executionName job name + * @param out output printer + * @param data the output + */ + void write(String executionName, PrintStream out, DataSet data) throws Exception; +} diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/output/OutputBase.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/output/OutputBase.java new file mode 100644 index 0000000000000000000000000000000000000000..cbeff30c65585fd04e062c5e5e7bdcae162c9585 --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/output/OutputBase.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.graph.drivers.output; + +import org.apache.flink.graph.drivers.parameter.ParameterizedBase; + +/** + * Base class for outputs. + * + * @param result Type + */ +public abstract class OutputBase +extends ParameterizedBase +implements Output { + + @Override + public String getName() { + return this.getClass().getSimpleName(); + } +} diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/output/Print.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/output/Print.java index be421b0ee143264b1b1a114f0494c3efafad604b..623fed493586ad061442d57497bbfed0f0ab7c49 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/output/Print.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/output/Print.java @@ -18,16 +18,46 @@ package org.apache.flink.graph.drivers.output; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.graph.asm.dataset.Collect; +import org.apache.flink.graph.asm.result.PrintableResult; +import org.apache.flink.graph.drivers.parameter.BooleanParameter; + +import java.io.PrintStream; +import java.util.List; + /** * Print algorithm output. + * + * @param result Type */ -public interface Print { - - /** - * Print execution results. - * - * @param executionName job name - * @throws Exception on error - */ - void print(String executionName) throws Exception; +public class Print +extends OutputBase { + + private BooleanParameter printExecutionPlan = new BooleanParameter(this, "__print_execution_plan"); + + @Override + public void write(String executionName, PrintStream out, DataSet data) throws Exception { + Collect collector = new Collect().run(data); + + if (printExecutionPlan.getValue()) { + System.out.println(data.getExecutionEnvironment().getExecutionPlan()); + } + + List results = collector.execute(executionName); + + if (results.size() == 0) { + return; + } + + if (results.get(0) instanceof PrintableResult) { + for (Object result : results) { + System.out.println(((PrintableResult) result).toPrintableString()); + } + } else { + for (Object result : results) { + System.out.println(result); + } + } + } } diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/IterationConvergence.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/IterationConvergence.java index f02c53699996562830c813c74022f731321285e4..6730987ba00ee167c58c928a416ad778d2b8f901 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/IterationConvergence.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/IterationConvergence.java @@ -53,7 +53,12 @@ implements Parameter { @Override public String getUsage() { - return "[--iterations ITERATIONS] [--convergence_threshold CONVERGENCE_THRESHOLD]"; + return "[--iterations ITERATIONS] [--convergence_threshold CONVERGENCE_THRESHOLD] "; + } + + @Override + public boolean isHidden() { + return false; } @Override diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/Parameter.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/Parameter.java index 9dbac4b46b4a2283e61724d65f80f06e9eb85a75..a0fc0cf3aab06c7e6e23160237792dc80b4ec177 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/Parameter.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/Parameter.java @@ -39,6 +39,15 @@ public interface Parameter { */ String getUsage(); + /** + * A hidden parameter is parsed from the command-line configuration but is + * not printed in the usage string. This can be used for power-user options + * not displayed to the general user. + * + * @return whether this parameter should be hidden from standard usage + */ + boolean isHidden(); + /** * Read and parse the parameter value from command-line arguments. * diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/ParameterizedBase.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/ParameterizedBase.java index a3991cf235b7725fa6fb953c172ecc096eb5f7ce..ba6f6afefd4b77d741fc4ab6a541036d8b0db39e 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/ParameterizedBase.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/ParameterizedBase.java @@ -51,9 +51,11 @@ implements Parameterized { // print parameters as ordered list for (Parameter parameter : parameters) { - strBuilder - .append(parameter.getUsage()) - .append(" "); + if (!parameter.isHidden()) { + strBuilder + .append(parameter.getUsage()) + .append(" "); + } } return strBuilder.toString(); diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/SimpleParameter.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/SimpleParameter.java index 93469ac92fc1acfb54202a44df65faebea78d017..170e691f6b0edbf385415de8903dcabb80b3a929 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/SimpleParameter.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/SimpleParameter.java @@ -65,6 +65,11 @@ implements Parameter { return hasDefaultValue ? "[" + option + "]" : option; } + @Override + public boolean isHidden() { + return name.startsWith("__"); + } + @Override public T getValue() { return value; diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/Simplify.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/Simplify.java index 4d9e48106c10cdad8821ac261eef64e8c4baac13..9fc937c3d7cc4a7620059be81f5d5e119ee2264b 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/Simplify.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/Simplify.java @@ -67,7 +67,12 @@ implements Parameter { @Override public String getUsage() { - return "[--simplify ]"; + return "[--simplify ] "; + } + + @Override + public boolean isHidden() { + return false; } @Override diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/EdgeListITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/EdgeListITCase.java index 15f729311fd34436cbda44ac75799fff3e140fa7..a56a19bb91bb20cf94a1dc2d6ad157fea9066d8e 100644 --- a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/EdgeListITCase.java +++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/EdgeListITCase.java @@ -60,16 +60,16 @@ extends DriverBaseITCase { case "integer": case "nativeInteger": case "nativeLong": - checksum = 0x0000000000344448L; + checksum = 0x000000000001ae80L; break; case "long": - checksum = 0x0000000000a19d48L; + checksum = 0x0000000000053580L; break; case "string": case "nativeString": - checksum = 0x000000000c47ca48L; + checksum = 0x0000000000656880L; break; default: @@ -114,16 +114,16 @@ extends DriverBaseITCase { case "integer": case "nativeInteger": case "nativeLong": - checksum = 0x000000000217bbe2L; + checksum = 0x0000000000113ca0L; break; case "long": - checksum = 0x0000000006788c22L; + checksum = 0x0000000000356460L; break; case "string": case "nativeString": - checksum = 0x000000007ddfd962L; + checksum = 0x00000000040f6f20L; break; default: @@ -158,16 +158,16 @@ extends DriverBaseITCase { case "integer": case "nativeInteger": case "nativeLong": - checksum = 0x00000000001a2224L; + checksum = 0x000000000000d740L; break; case "long": - checksum = 0x000000000050cea4L; + checksum = 0x0000000000029ac0L; break; case "string": case "nativeString": - checksum = 0x000000000623e524L; + checksum = 0x000000000032b440L; break; default: @@ -202,16 +202,16 @@ extends DriverBaseITCase { case "integer": case "nativeInteger": case "nativeLong": - checksum = 0x0000000000a9ddeaL; + checksum = 0x0000000000057720L; break; case "long": - checksum = 0x00000000020d3f2aL; + checksum = 0x000000000010ede0L; break; case "string": case "nativeString": - checksum = 0x0000000027e9516aL; + checksum = 0x00000000014993a0L; break; default: @@ -253,16 +253,16 @@ extends DriverBaseITCase { case "integer": case "nativeInteger": case "nativeLong": - checksum = 0x00000000001ca34aL; + checksum = 0x000000000000eba0L; break; case "long": - checksum = 0x000000000071408aL; + checksum = 0x000000000003a660L; break; case "string": case "nativeString": - checksum = 0x00000000081ee80aL; + checksum = 0x0000000000430ee0L; break; default: @@ -297,16 +297,16 @@ extends DriverBaseITCase { case "integer": case "nativeInteger": case "nativeLong": - checksum = 0x00000000035df180L; + checksum = 0x00000000001bc800L; break; case "long": - checksum = 0x0000000005a52180L; + checksum = 0x00000000002e9800L; break; case "string": case "nativeString": - checksum = 0x0000000273474480L; + checksum = 0x00000000143c1500L; break; default: @@ -341,16 +341,16 @@ extends DriverBaseITCase { case "integer": case "nativeInteger": case "nativeLong": - checksum = 0x00000000001982daL; + checksum = 0x000000000000d220L; break; case "long": - checksum = 0x00000000004ee21aL; + checksum = 0x0000000000028ae0L; break; case "string": case "nativeString": - checksum = 0x00000000060a065aL; + checksum = 0x000000000031dea0L; break; default: @@ -385,16 +385,16 @@ extends DriverBaseITCase { case "integer": case "nativeInteger": case "nativeLong": - checksum = 0x0000000003bf67f7L; + checksum = 0x00000000001ee529L; break; case "long": - checksum = 0x0000000008f467f7L; + checksum = 0x000000000049e529L; break; case "string": case "nativeString": - checksum = 0x00000001660861bdL; + checksum = 0x000000000b8c9aa3L; break; default: @@ -429,16 +429,16 @@ extends DriverBaseITCase { case "integer": case "nativeInteger": case "nativeLong": - checksum = 0x00000000029aafb3L; + checksum = 0x00000000001579bdL; break; case "long": - checksum = 0x000000000592e9b3L; + checksum = 0x00000000002dffbdL; break; case "string": case "nativeString": - checksum = 0x000000011b079691L; + checksum = 0x0000000009213f1fL; break; default: @@ -473,16 +473,16 @@ extends DriverBaseITCase { case "integer": case "nativeInteger": case "nativeLong": - checksum = 0x0000000004627ab6L; + checksum = 0x0000000000242920L; break; case "long": - checksum = 0x0000000009193576L; + checksum = 0x00000000004b1660L; break; case "string": case "nativeString": - checksum = 0x00000001e9adcf56L; + checksum = 0x000000000fcbc080L; break; default: @@ -517,16 +517,16 @@ extends DriverBaseITCase { case "integer": case "nativeInteger": case "nativeLong": - checksum = 0x000000000034d5a4L; + checksum = 0x000000000001b3c0L; break; case "long": - checksum = 0x00000000006b8224L; + checksum = 0x0000000000037740L; break; case "string": case "nativeString": - checksum = 0x000000000757c6a4L; + checksum = 0x00000000003ca2c0L; break; default: @@ -561,16 +561,16 @@ extends DriverBaseITCase { case "integer": case "nativeInteger": case "nativeLong": - checksum = 0x00000000000d195aL; + checksum = 0x0000000000006ba0L; break; case "long": - checksum = 0x000000000042789aL; + checksum = 0x0000000000022460L; break; case "string": case "nativeString": - checksum = 0x00000000032f0adaL; + checksum = 0x00000000001a4a20L; break; default: