From 118979031bc5e05981ee6da7a9fb26fe675293df Mon Sep 17 00:00:00 2001 From: Martin Kiefer Date: Sun, 15 Feb 2015 18:07:52 +0100 Subject: [PATCH] [FLINK-1515] [gelly] Splitted runVertexCentricIteration into createVertexIteration and runVertexIteration to make VertexCentricIteration object accessible to developers. --- .../java/org/apache/flink/graph/Graph.java | 23 +++++++++++++++---- .../flink/graph/library/LabelPropagation.java | 6 +++-- .../apache/flink/graph/library/PageRank.java | 7 ++++-- .../library/SingleSourceShortestPaths.java | 9 +++++--- ...ertexCentricConnectedComponentsITCase.java | 4 +++- 5 files changed, 36 insertions(+), 13 deletions(-) diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java index 55be728647f..1b79cf52ad7 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java @@ -1120,17 +1120,30 @@ public class Graph & Serializable, VV extends Serializab } /** - * Runs a Vertex-Centric iteration on the graph. + * Create a Vertex-Centric iteration on the graph. * * @param vertexUpdateFunction the vertex update function * @param messagingFunction the messaging function * @param maximumNumberOfIterations maximum number of iterations to perform * @return */ - public Graph runVertexCentricIteration(VertexUpdateFunction vertexUpdateFunction, - MessagingFunction messagingFunction, int maximumNumberOfIterations) { - DataSet> newVertices = vertices.runOperation(VertexCentricIteration - .withEdges(edges, vertexUpdateFunction, messagingFunction, maximumNumberOfIterations)); + public VertexCentricIteration createVertexCentricIteration( + VertexUpdateFunction vertexUpdateFunction, + MessagingFunction messagingFunction, + int maximumNumberOfIterations) { + return VertexCentricIteration.withEdges(edges, vertexUpdateFunction, + messagingFunction, maximumNumberOfIterations); + } + + /** + * Runs a Vertex-Centric iteration on the graph. + * + * @param iteration the Vertex-Centric iteration to run + * @return + */ + public Graph runVertexCentricIteration( + VertexCentricIteration iteration) { + DataSet> newVertices = vertices.runOperation(iteration); return new Graph(newVertices, this.edges, this.context); } diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java index 69d7713c6c4..33a04e71fe9 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java @@ -22,6 +22,7 @@ import org.apache.flink.graph.Graph; import org.apache.flink.graph.GraphAlgorithm; import org.apache.flink.graph.spargel.MessageIterator; import org.apache.flink.graph.spargel.MessagingFunction; +import org.apache.flink.graph.spargel.VertexCentricIteration; import org.apache.flink.graph.spargel.VertexUpdateFunction; import org.apache.flink.types.NullValue; @@ -55,8 +56,9 @@ public class LabelPropagation & Serializable> // iteratively adopt the most frequent label among the neighbors // of each vertex - return input.runVertexCentricIteration(new UpdateVertexLabel(), - new SendNewLabelToNeighbors(), maxIterations); + VertexCentricIteration iteration = input.createVertexCentricIteration( + new UpdateVertexLabel(), new SendNewLabelToNeighbors(), maxIterations); + return input.runVertexCentricIteration(iteration); } /** diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java index 39b8ef1a885..46f725fa9f9 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java @@ -25,6 +25,7 @@ import org.apache.flink.graph.Graph; import org.apache.flink.graph.GraphAlgorithm; import org.apache.flink.graph.spargel.MessageIterator; import org.apache.flink.graph.spargel.MessagingFunction; +import org.apache.flink.graph.spargel.VertexCentricIteration; import org.apache.flink.graph.spargel.VertexUpdateFunction; public class PageRank & Serializable> implements @@ -42,8 +43,10 @@ public class PageRank & Serializable> implements @Override public Graph run(Graph network) { - return network.runVertexCentricIteration(new VertexRankUpdater( - numVertices, beta), new RankMessenger(), maxIterations); + VertexCentricIteration iteration = network.createVertexCentricIteration( + new VertexRankUpdater(numVertices, beta), new RankMessenger(), maxIterations); + + return network.runVertexCentricIteration(iteration); } /** diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java index 2f575e78a89..f0690000535 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java @@ -25,6 +25,7 @@ import org.apache.flink.graph.GraphAlgorithm; import org.apache.flink.graph.Vertex; import org.apache.flink.graph.spargel.MessageIterator; import org.apache.flink.graph.spargel.MessagingFunction; +import org.apache.flink.graph.spargel.VertexCentricIteration; import org.apache.flink.graph.spargel.VertexUpdateFunction; import java.io.Serializable; @@ -44,9 +45,11 @@ public class SingleSourceShortestPaths & Serializable> @Override public Graph run(Graph input) { - return input.mapVertices(new InitVerticesMapper(srcVertexId)) - .runVertexCentricIteration(new VertexDistanceUpdater(), - new MinDistanceMessenger(), maxIterations); + VertexCentricIteration iteration = input.mapVertices( + new InitVerticesMapper(srcVertexId)).createVertexCentricIteration(new VertexDistanceUpdater(), + new MinDistanceMessenger(), maxIterations); + + return input.runVertexCentricIteration(iteration); } public static final class InitVerticesMapper & Serializable> diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConnectedComponentsITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConnectedComponentsITCase.java index 380e027a153..8fb9a1162ba 100644 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConnectedComponentsITCase.java +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConnectedComponentsITCase.java @@ -30,6 +30,7 @@ import org.apache.flink.graph.Graph; import org.apache.flink.graph.Vertex; import org.apache.flink.graph.spargel.MessageIterator; import org.apache.flink.graph.spargel.MessagingFunction; +import org.apache.flink.graph.spargel.VertexCentricIteration; import org.apache.flink.graph.spargel.VertexUpdateFunction; import org.apache.flink.test.testdata.ConnectedComponentsData; import org.apache.flink.test.util.JavaProgramTestBase; @@ -64,7 +65,8 @@ public class VertexCentricConnectedComponentsITCase extends JavaProgramTestBase DataSet> initialVertices = vertexIds.map(new IdAssigner()); Graph graph = Graph.fromDataSet(initialVertices, edges, env); - Graph result = graph.runVertexCentricIteration(new CCUpdater(), new CCMessager(), 100); + VertexCentricIteration iteration = graph.createVertexCentricIteration(new CCUpdater(), new CCMessager(), 100); + Graph result = graph.runVertexCentricIteration(iteration); result.getVertices().writeAsCsv(resultPath, "\n", " "); env.execute(); -- GitLab