提交 11897903 编写于 作者: M Martin Kiefer 提交者: Vasia Kalavri

[FLINK-1515] [gelly] Splitted runVertexCentricIteration into...

[FLINK-1515] [gelly] Splitted runVertexCentricIteration into createVertexIteration and runVertexIteration to make VertexCentricIteration object accessible to developers.
上级 40ea8a46
...@@ -1120,17 +1120,30 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab ...@@ -1120,17 +1120,30 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
} }
/** /**
* Runs a Vertex-Centric iteration on the graph. * Create a Vertex-Centric iteration on the graph.
* *
* @param vertexUpdateFunction the vertex update function * @param vertexUpdateFunction the vertex update function
* @param messagingFunction the messaging function * @param messagingFunction the messaging function
* @param maximumNumberOfIterations maximum number of iterations to perform * @param maximumNumberOfIterations maximum number of iterations to perform
* @return * @return
*/ */
public <M> Graph<K, VV, EV> runVertexCentricIteration(VertexUpdateFunction<K, VV, M> vertexUpdateFunction, public <M> VertexCentricIteration<K, VV, M, EV> createVertexCentricIteration(
MessagingFunction<K, VV, M, EV> messagingFunction, int maximumNumberOfIterations) { VertexUpdateFunction<K, VV, M> vertexUpdateFunction,
DataSet<Vertex<K, VV>> newVertices = vertices.runOperation(VertexCentricIteration MessagingFunction<K, VV, M, EV> messagingFunction,
.withEdges(edges, vertexUpdateFunction, messagingFunction, maximumNumberOfIterations)); int maximumNumberOfIterations) {
return VertexCentricIteration.withEdges(edges, vertexUpdateFunction,
messagingFunction, maximumNumberOfIterations);
}
/**
* Runs a Vertex-Centric iteration on the graph.
*
* @param iteration the Vertex-Centric iteration to run
* @return
*/
public <M> Graph<K, VV, EV> runVertexCentricIteration(
VertexCentricIteration<K, VV, M, EV> iteration) {
DataSet<Vertex<K, VV>> newVertices = vertices.runOperation(iteration);
return new Graph<K, VV, EV>(newVertices, this.edges, this.context); return new Graph<K, VV, EV>(newVertices, this.edges, this.context);
} }
......
...@@ -22,6 +22,7 @@ import org.apache.flink.graph.Graph; ...@@ -22,6 +22,7 @@ import org.apache.flink.graph.Graph;
import org.apache.flink.graph.GraphAlgorithm; import org.apache.flink.graph.GraphAlgorithm;
import org.apache.flink.graph.spargel.MessageIterator; import org.apache.flink.graph.spargel.MessageIterator;
import org.apache.flink.graph.spargel.MessagingFunction; import org.apache.flink.graph.spargel.MessagingFunction;
import org.apache.flink.graph.spargel.VertexCentricIteration;
import org.apache.flink.graph.spargel.VertexUpdateFunction; import org.apache.flink.graph.spargel.VertexUpdateFunction;
import org.apache.flink.types.NullValue; import org.apache.flink.types.NullValue;
...@@ -55,8 +56,9 @@ public class LabelPropagation<K extends Comparable<K> & Serializable> ...@@ -55,8 +56,9 @@ public class LabelPropagation<K extends Comparable<K> & Serializable>
// iteratively adopt the most frequent label among the neighbors // iteratively adopt the most frequent label among the neighbors
// of each vertex // of each vertex
return input.runVertexCentricIteration(new UpdateVertexLabel<K>(), VertexCentricIteration<K, Long, Long, NullValue> iteration = input.createVertexCentricIteration(
new SendNewLabelToNeighbors<K>(), maxIterations); new UpdateVertexLabel<K>(), new SendNewLabelToNeighbors<K>(), maxIterations);
return input.runVertexCentricIteration(iteration);
} }
/** /**
......
...@@ -25,6 +25,7 @@ import org.apache.flink.graph.Graph; ...@@ -25,6 +25,7 @@ import org.apache.flink.graph.Graph;
import org.apache.flink.graph.GraphAlgorithm; import org.apache.flink.graph.GraphAlgorithm;
import org.apache.flink.graph.spargel.MessageIterator; import org.apache.flink.graph.spargel.MessageIterator;
import org.apache.flink.graph.spargel.MessagingFunction; import org.apache.flink.graph.spargel.MessagingFunction;
import org.apache.flink.graph.spargel.VertexCentricIteration;
import org.apache.flink.graph.spargel.VertexUpdateFunction; import org.apache.flink.graph.spargel.VertexUpdateFunction;
public class PageRank<K extends Comparable<K> & Serializable> implements public class PageRank<K extends Comparable<K> & Serializable> implements
...@@ -42,8 +43,10 @@ public class PageRank<K extends Comparable<K> & Serializable> implements ...@@ -42,8 +43,10 @@ public class PageRank<K extends Comparable<K> & Serializable> implements
@Override @Override
public Graph<K, Double, Double> run(Graph<K, Double, Double> network) { public Graph<K, Double, Double> run(Graph<K, Double, Double> network) {
return network.runVertexCentricIteration(new VertexRankUpdater<K>( VertexCentricIteration<K, Double, Double, Double> iteration = network.createVertexCentricIteration(
numVertices, beta), new RankMessenger<K>(), maxIterations); new VertexRankUpdater<K>(numVertices, beta), new RankMessenger<K>(), maxIterations);
return network.runVertexCentricIteration(iteration);
} }
/** /**
......
...@@ -25,6 +25,7 @@ import org.apache.flink.graph.GraphAlgorithm; ...@@ -25,6 +25,7 @@ import org.apache.flink.graph.GraphAlgorithm;
import org.apache.flink.graph.Vertex; import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.spargel.MessageIterator; import org.apache.flink.graph.spargel.MessageIterator;
import org.apache.flink.graph.spargel.MessagingFunction; import org.apache.flink.graph.spargel.MessagingFunction;
import org.apache.flink.graph.spargel.VertexCentricIteration;
import org.apache.flink.graph.spargel.VertexUpdateFunction; import org.apache.flink.graph.spargel.VertexUpdateFunction;
import java.io.Serializable; import java.io.Serializable;
...@@ -44,9 +45,11 @@ public class SingleSourceShortestPaths<K extends Comparable<K> & Serializable> ...@@ -44,9 +45,11 @@ public class SingleSourceShortestPaths<K extends Comparable<K> & Serializable>
@Override @Override
public Graph<K, Double, Double> run(Graph<K, Double, Double> input) { public Graph<K, Double, Double> run(Graph<K, Double, Double> input) {
return input.mapVertices(new InitVerticesMapper<K>(srcVertexId)) VertexCentricIteration<K, Double, Double, Double> iteration = input.mapVertices(
.runVertexCentricIteration(new VertexDistanceUpdater<K>(), new InitVerticesMapper<K>(srcVertexId)).createVertexCentricIteration(new VertexDistanceUpdater<K>(),
new MinDistanceMessenger<K>(), maxIterations); new MinDistanceMessenger<K>(), maxIterations);
return input.runVertexCentricIteration(iteration);
} }
public static final class InitVerticesMapper<K extends Comparable<K> & Serializable> public static final class InitVerticesMapper<K extends Comparable<K> & Serializable>
......
...@@ -30,6 +30,7 @@ import org.apache.flink.graph.Graph; ...@@ -30,6 +30,7 @@ import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex; import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.spargel.MessageIterator; import org.apache.flink.graph.spargel.MessageIterator;
import org.apache.flink.graph.spargel.MessagingFunction; import org.apache.flink.graph.spargel.MessagingFunction;
import org.apache.flink.graph.spargel.VertexCentricIteration;
import org.apache.flink.graph.spargel.VertexUpdateFunction; import org.apache.flink.graph.spargel.VertexUpdateFunction;
import org.apache.flink.test.testdata.ConnectedComponentsData; import org.apache.flink.test.testdata.ConnectedComponentsData;
import org.apache.flink.test.util.JavaProgramTestBase; import org.apache.flink.test.util.JavaProgramTestBase;
...@@ -64,7 +65,8 @@ public class VertexCentricConnectedComponentsITCase extends JavaProgramTestBase ...@@ -64,7 +65,8 @@ public class VertexCentricConnectedComponentsITCase extends JavaProgramTestBase
DataSet<Vertex<Long, Long>> initialVertices = vertexIds.map(new IdAssigner()); DataSet<Vertex<Long, Long>> initialVertices = vertexIds.map(new IdAssigner());
Graph<Long, Long, NullValue> graph = Graph.fromDataSet(initialVertices, edges, env); Graph<Long, Long, NullValue> graph = Graph.fromDataSet(initialVertices, edges, env);
Graph<Long, Long, NullValue> result = graph.runVertexCentricIteration(new CCUpdater(), new CCMessager(), 100); VertexCentricIteration<Long, Long, Long, NullValue> iteration = graph.createVertexCentricIteration(new CCUpdater(), new CCMessager(), 100);
Graph<Long, Long, NullValue> result = graph.runVertexCentricIteration(iteration);
result.getVertices().writeAsCsv(resultPath, "\n", " "); result.getVertices().writeAsCsv(resultPath, "\n", " ");
env.execute(); env.execute();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册