提交 5ae84273 编写于 作者: V vasia

[gelly] made the number of vertices an optional parameter of PageRank; added...

[gelly] made the number of vertices an optional parameter of PageRank; added the edge weight initialization to the library methods
上级 8f35988f
......@@ -1145,7 +1145,6 @@ public class Graph<K, VV, EV> {
* @return the new graph containing the existing vertices and edges plus the
* newly added edge
*/
@SuppressWarnings("unchecked")
public Graph<K, VV, EV> addEdge(Vertex<K, VV> source, Vertex<K, VV> target, EV edgeValue) {
Graph<K, VV, EV> partialGraph = fromCollection(Arrays.asList(source, target),
Arrays.asList(new Edge<K, EV>(source.f0, target.f0, edgeValue)),
......
......@@ -18,6 +18,9 @@
package org.apache.flink.graph.library;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.GraphAlgorithm;
import org.apache.flink.graph.gsa.ApplyFunction;
......@@ -27,23 +30,46 @@ import org.apache.flink.graph.gsa.SumFunction;
/**
* This is an implementation of a simple PageRank algorithm, using a gather-sum-apply iteration.
* The user can define the damping factor and the maximum number of iterations.
* If the number of vertices of the input graph is known, it should be provided as a parameter
* to speed up computation. Otherwise, the algorithm will first execute a job to count the vertices.
*
* The implementation assumes that each page has at least one incoming and one outgoing link.
*/
public class GSAPageRank<K> implements GraphAlgorithm<K, Double, Double> {
private double beta;
private int maxIterations;
private long numberOfVertices;
/**
* @param beta the damping factor
* @param maxIterations the maximum number of iterations
*/
public GSAPageRank(double beta, int maxIterations) {
this.beta = beta;
this.maxIterations = maxIterations;
}
public GSAPageRank(double beta, long numVertices, int maxIterations) {
this.beta = beta;
this.numberOfVertices = numVertices;
this.maxIterations = maxIterations;
}
@Override
public Graph<K, Double, Double> run(Graph<K, Double, Double> network) throws Exception {
final long numberOfVertices = network.numberOfVertices();
if (numberOfVertices == 0) {
numberOfVertices = network.numberOfVertices();
}
DataSet<Tuple2<K, Long>> vertexOutDegrees = network.outDegrees();
return network.runGatherSumApplyIteration(new GatherRanks(numberOfVertices), new SumRanks(),
Graph<K, Double, Double> networkWithWeights = network
.joinWithEdgesOnSource(vertexOutDegrees, new InitWeightsMapper());
return networkWithWeights.runGatherSumApplyIteration(new GatherRanks(numberOfVertices), new SumRanks(),
new UpdateRanks<K>(beta, numberOfVertices), maxIterations);
}
......@@ -97,4 +123,11 @@ public class GSAPageRank<K> implements GraphAlgorithm<K, Double, Double> {
setResult((1-beta)/numVertices + beta * rankSum);
}
}
@SuppressWarnings("serial")
private static final class InitWeightsMapper implements MapFunction<Tuple2<Double, Long>, Double> {
public Double map(Tuple2<Double, Long> value) {
return value.f0 / value.f1;
}
}
}
......@@ -18,6 +18,9 @@
package org.apache.flink.graph.library;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.GraphAlgorithm;
......@@ -28,23 +31,52 @@ import org.apache.flink.graph.spargel.VertexUpdateFunction;
/**
* This is an implementation of a simple PageRank algorithm, using a vertex-centric iteration.
* The user can define the damping factor and the maximum number of iterations.
* If the number of vertices of the input graph is known, it should be provided as a parameter
* to speed up computation. Otherwise, the algorithm will first execute a job to count the vertices.
*
* The implementation assumes that each page has at least one incoming and one outgoing link.
*/
public class PageRank<K> implements GraphAlgorithm<K, Double, Double> {
private double beta;
private int maxIterations;
private long numberOfVertices;
/**
* @param beta the damping factor
* @param maxIterations the maximum number of iterations
*/
public PageRank(double beta, int maxIterations) {
this.beta = beta;
this.maxIterations = maxIterations;
this.numberOfVertices = 0;
}
/**
* @param beta the damping factor
* @param maxIterations the maximum number of iterations
* @param numVertices the number of vertices in the input
*/
public PageRank(double beta, long numVertices, int maxIterations) {
this.beta = beta;
this.maxIterations = maxIterations;
this.numberOfVertices = numVertices;
}
@Override
public Graph<K, Double, Double> run(Graph<K, Double, Double> network) throws Exception {
final long numberOfVertices = network.numberOfVertices();
if (numberOfVertices == 0) {
numberOfVertices = network.numberOfVertices();
}
DataSet<Tuple2<K, Long>> vertexOutDegrees = network.outDegrees();
return network.runVertexCentricIteration(new VertexRankUpdater<K>(beta, numberOfVertices),
Graph<K, Double, Double> networkWithWeights = network
.joinWithEdgesOnSource(vertexOutDegrees, new InitWeightsMapper());
return networkWithWeights.runVertexCentricIteration(new VertexRankUpdater<K>(beta, numberOfVertices),
new RankMessenger<K>(numberOfVertices), maxIterations);
}
......@@ -102,4 +134,12 @@ public class PageRank<K> implements GraphAlgorithm<K, Double, Double> {
}
}
}
@SuppressWarnings("serial")
private static final class InitWeightsMapper implements MapFunction<Tuple2<Double, Long>, Double> {
public Double map(Tuple2<Double, Long> value) {
return value.f0 / value.f1;
}
}
}
......@@ -22,9 +22,7 @@ import java.util.Arrays;
import java.util.List;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.example.utils.PageRankData;
......@@ -51,13 +49,8 @@ public class PageRankITCase extends MultipleProgramsTestBase {
Graph<Long, Double, Double> inputGraph = Graph.fromDataSet(
PageRankData.getDefaultEdgeDataSet(env), new InitMapper(), env);
DataSet<Tuple2<Long, Long>> vertexOutDegrees = inputGraph.outDegrees();
Graph<Long, Double, Double> networkWithWeights = inputGraph
.joinWithEdgesOnSource(vertexOutDegrees, new InitWeightsMapper());
List<Vertex<Long, Double>> result = networkWithWeights.run(new PageRank<Long>(0.85, 3))
List<Vertex<Long, Double>> result = inputGraph.run(new PageRank<Long>(0.85, 3))
.getVertices().collect();
compareWithDelta(result, expectedResult, 0.01);
......@@ -69,13 +62,34 @@ public class PageRankITCase extends MultipleProgramsTestBase {
Graph<Long, Double, Double> inputGraph = Graph.fromDataSet(
PageRankData.getDefaultEdgeDataSet(env), new InitMapper(), env);
DataSet<Tuple2<Long, Long>> vertexOutDegrees = inputGraph.outDegrees();
Graph<Long, Double, Double> networkWithWeights = inputGraph
.joinWithEdgesOnSource(vertexOutDegrees, new InitWeightsMapper());
List<Vertex<Long, Double>> result = inputGraph.run(new GSAPageRank<Long>(0.85, 3))
.getVertices().collect();
compareWithDelta(result, expectedResult, 0.01);
}
@Test
public void testPageRankWithThreeIterationsAndNumOfVertices() throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, Double, Double> inputGraph = Graph.fromDataSet(
PageRankData.getDefaultEdgeDataSet(env), new InitMapper(), env);
List<Vertex<Long, Double>> result = inputGraph.run(new PageRank<Long>(0.85, 5, 3))
.getVertices().collect();
compareWithDelta(result, expectedResult, 0.01);
}
@Test
public void testGSAPageRankWithThreeIterationsAndNumOfVertices() throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
List<Vertex<Long, Double>> result = networkWithWeights.run(new GSAPageRank<Long>(0.85, 3))
Graph<Long, Double, Double> inputGraph = Graph.fromDataSet(
PageRankData.getDefaultEdgeDataSet(env), new InitMapper(), env);
List<Vertex<Long, Double>> result = inputGraph.run(new GSAPageRank<Long>(0.85, 5, 3))
.getVertices().collect();
compareWithDelta(result, expectedResult, 0.01);
......@@ -115,11 +129,4 @@ public class PageRankITCase extends MultipleProgramsTestBase {
return 1.0;
}
}
@SuppressWarnings("serial")
private static final class InitWeightsMapper implements MapFunction<Tuple2<Double, Long>, Double> {
public Double map(Tuple2<Double, Long> value) {
return value.f0 / value.f1;
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册