diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java index 55be728647f2e9aef33be0c28f8df878e2ead9b7..1b79cf52ad7b0e5df9e195d80ed0a5ec0f103509 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java @@ -1120,17 +1120,30 @@ public class Graph & Serializable, VV extends Serializab } /** - * Runs a Vertex-Centric iteration on the graph. + * Create a Vertex-Centric iteration on the graph. * * @param vertexUpdateFunction the vertex update function * @param messagingFunction the messaging function * @param maximumNumberOfIterations maximum number of iterations to perform * @return */ - public Graph runVertexCentricIteration(VertexUpdateFunction vertexUpdateFunction, - MessagingFunction messagingFunction, int maximumNumberOfIterations) { - DataSet> newVertices = vertices.runOperation(VertexCentricIteration - .withEdges(edges, vertexUpdateFunction, messagingFunction, maximumNumberOfIterations)); + public VertexCentricIteration createVertexCentricIteration( + VertexUpdateFunction vertexUpdateFunction, + MessagingFunction messagingFunction, + int maximumNumberOfIterations) { + return VertexCentricIteration.withEdges(edges, vertexUpdateFunction, + messagingFunction, maximumNumberOfIterations); + } + + /** + * Runs a Vertex-Centric iteration on the graph. + * + * @param iteration the Vertex-Centric iteration to run + * @return + */ + public Graph runVertexCentricIteration( + VertexCentricIteration iteration) { + DataSet> newVertices = vertices.runOperation(iteration); return new Graph(newVertices, this.edges, this.context); } diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java index 69d7713c6c4ccac54e9674e5843a3b3367096d96..33a04e71fe9c2bfbfae42dd4dc10711dab701492 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java @@ -22,6 +22,7 @@ import org.apache.flink.graph.Graph; import org.apache.flink.graph.GraphAlgorithm; import org.apache.flink.graph.spargel.MessageIterator; import org.apache.flink.graph.spargel.MessagingFunction; +import org.apache.flink.graph.spargel.VertexCentricIteration; import org.apache.flink.graph.spargel.VertexUpdateFunction; import org.apache.flink.types.NullValue; @@ -55,8 +56,9 @@ public class LabelPropagation & Serializable> // iteratively adopt the most frequent label among the neighbors // of each vertex - return input.runVertexCentricIteration(new UpdateVertexLabel(), - new SendNewLabelToNeighbors(), maxIterations); + VertexCentricIteration iteration = input.createVertexCentricIteration( + new UpdateVertexLabel(), new SendNewLabelToNeighbors(), maxIterations); + return input.runVertexCentricIteration(iteration); } /** diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java index 39b8ef1a88554e17cb6f0a98bdfcc3d95030dd74..46f725fa9f98ac4c63a72bf6e3f425f1e4acf352 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java @@ -25,6 +25,7 @@ import org.apache.flink.graph.Graph; import org.apache.flink.graph.GraphAlgorithm; import org.apache.flink.graph.spargel.MessageIterator; import org.apache.flink.graph.spargel.MessagingFunction; +import org.apache.flink.graph.spargel.VertexCentricIteration; import org.apache.flink.graph.spargel.VertexUpdateFunction; public class PageRank & Serializable> implements @@ -42,8 +43,10 @@ public class PageRank & Serializable> implements @Override public Graph run(Graph network) { - return network.runVertexCentricIteration(new VertexRankUpdater( - numVertices, beta), new RankMessenger(), maxIterations); + VertexCentricIteration iteration = network.createVertexCentricIteration( + new VertexRankUpdater(numVertices, beta), new RankMessenger(), maxIterations); + + return network.runVertexCentricIteration(iteration); } /** diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java index 2f575e78a891ffe654e2154797f2a4ceff1b14ae..f0690000535616a72282a014499557638783dbcc 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java @@ -25,6 +25,7 @@ import org.apache.flink.graph.GraphAlgorithm; import org.apache.flink.graph.Vertex; import org.apache.flink.graph.spargel.MessageIterator; import org.apache.flink.graph.spargel.MessagingFunction; +import org.apache.flink.graph.spargel.VertexCentricIteration; import org.apache.flink.graph.spargel.VertexUpdateFunction; import java.io.Serializable; @@ -44,9 +45,11 @@ public class SingleSourceShortestPaths & Serializable> @Override public Graph run(Graph input) { - return input.mapVertices(new InitVerticesMapper(srcVertexId)) - .runVertexCentricIteration(new VertexDistanceUpdater(), - new MinDistanceMessenger(), maxIterations); + VertexCentricIteration iteration = input.mapVertices( + new InitVerticesMapper(srcVertexId)).createVertexCentricIteration(new VertexDistanceUpdater(), + new MinDistanceMessenger(), maxIterations); + + return input.runVertexCentricIteration(iteration); } public static final class InitVerticesMapper & Serializable> diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConnectedComponentsITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConnectedComponentsITCase.java index 380e027a153ccfdd356ba5f1754c4b6f411d1f8c..8fb9a1162ba8a90a8d762d82fec4fa177aab717c 100644 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConnectedComponentsITCase.java +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConnectedComponentsITCase.java @@ -30,6 +30,7 @@ import org.apache.flink.graph.Graph; import org.apache.flink.graph.Vertex; import org.apache.flink.graph.spargel.MessageIterator; import org.apache.flink.graph.spargel.MessagingFunction; +import org.apache.flink.graph.spargel.VertexCentricIteration; import org.apache.flink.graph.spargel.VertexUpdateFunction; import org.apache.flink.test.testdata.ConnectedComponentsData; import org.apache.flink.test.util.JavaProgramTestBase; @@ -64,7 +65,8 @@ public class VertexCentricConnectedComponentsITCase extends JavaProgramTestBase DataSet> initialVertices = vertexIds.map(new IdAssigner()); Graph graph = Graph.fromDataSet(initialVertices, edges, env); - Graph result = graph.runVertexCentricIteration(new CCUpdater(), new CCMessager(), 100); + VertexCentricIteration iteration = graph.createVertexCentricIteration(new CCUpdater(), new CCMessager(), 100); + Graph result = graph.runVertexCentricIteration(iteration); result.getVertices().writeAsCsv(resultPath, "\n", " "); env.execute();