提交 cf88407a 编写于 作者: V vasia 提交者: Andra Lungu

[FLINK-2563] [gelly] extended the run() method of GraphAlgorithm interface to...

[FLINK-2563] [gelly] extended the run() method of GraphAlgorithm interface to return an arbitrary type

This closes #1042
上级 c6391dbf
......@@ -649,8 +649,9 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
jtuple.f1))
}
def run(algorithm: GraphAlgorithm[K, VV, EV]) = {
wrapGraph(jgraph.run(algorithm))
def run[T: TypeInformation : ClassTag](algorithm: GraphAlgorithm[K, VV, EV, T]):
T = {
jgraph.run(algorithm)
}
/**
......
......@@ -432,7 +432,6 @@ public class Graph<K, VV, EV> {
* @param returnType the explicit return type.
* @return a new graph
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
public <NV> Graph<K, VV, NV> mapEdges(final MapFunction<Edge<K, EV>, NV> mapper, TypeInformation<Edge<K,NV>> returnType) {
DataSet<Edge<K, NV>> mappedEdges = edges.map(
new MapFunction<Edge<K, EV>, Edge<K, NV>>() {
......@@ -1451,7 +1450,13 @@ public class Graph<K, VV, EV> {
return new Graph<K, VV, EV>(newVertices, this.edges, this.context);
}
public Graph<K, VV, EV> run(GraphAlgorithm<K, VV, EV> algorithm) throws Exception {
/**
* @param algorithm the algorithm to run on the Graph
* @param <T> the return type
* @return the result of the graph algorithm
* @throws Exception
*/
public <T> T run(GraphAlgorithm<K, VV, EV, T> algorithm) throws Exception {
return algorithm.run(this);
}
......
......@@ -22,8 +22,9 @@ package org.apache.flink.graph;
* @param <K> key type
* @param <VV> vertex value type
* @param <EV> edge value type
* @param <T> the return type
*/
public interface GraphAlgorithm<K, VV, EV> {
public interface GraphAlgorithm<K, VV, EV, T> {
public Graph<K, VV, EV> run(Graph<K, VV, EV> input) throws Exception;
public T run(Graph<K, VV, EV> input) throws Exception;
}
......@@ -80,8 +80,6 @@ public class ConnectedComponents implements ProgramDescription {
} else {
verticesWithMinIds.print();
}
}
@Override
......
......@@ -44,7 +44,8 @@ import java.util.TreeMap;
*
* @see <a href="http://arxiv.org/pdf/0808.2633.pdf">article explaining the algorithm in detail</a>
*/
public class CommunityDetection implements GraphAlgorithm<Long, Long, Double> {
public class CommunityDetection implements
GraphAlgorithm<Long, Long, Double, Graph<Long, Long, Double>> {
private Integer maxIterations;
......
......@@ -37,7 +37,8 @@ import org.apache.flink.types.NullValue;
* is reached.
*/
@SuppressWarnings("serial")
public class ConnectedComponents implements GraphAlgorithm<Long, Long, NullValue>{
public class ConnectedComponents implements
GraphAlgorithm<Long, Long, NullValue, Graph<Long, Long, NullValue>> {
private Integer maxIterations;
......
......@@ -29,7 +29,8 @@ import org.apache.flink.types.NullValue;
/**
* This is an implementation of the Connected Components algorithm, using a gather-sum-apply iteration.
*/
public class GSAConnectedComponents implements GraphAlgorithm<Long, Long, NullValue> {
public class GSAConnectedComponents implements
GraphAlgorithm<Long, Long, NullValue, Graph<Long, Long, NullValue>> {
private Integer maxIterations;
......
......@@ -36,7 +36,7 @@ import org.apache.flink.graph.gsa.SumFunction;
*
* The implementation assumes that each page has at least one incoming and one outgoing link.
*/
public class GSAPageRank<K> implements GraphAlgorithm<K, Double, Double> {
public class GSAPageRank<K> implements GraphAlgorithm<K, Double, Double, Graph<K, Double, Double>> {
private double beta;
private int maxIterations;
......
......@@ -30,7 +30,8 @@ import org.apache.flink.graph.gsa.Neighbor;
/**
* This is an implementation of the Single Source Shortest Paths algorithm, using a gather-sum-apply iteration
*/
public class GSASingleSourceShortestPaths<K> implements GraphAlgorithm<K, Double, Double> {
public class GSASingleSourceShortestPaths<K> implements
GraphAlgorithm<K, Double, Double, Graph<K, Double, Double>> {
private final K srcVertexId;
private final Integer maxIterations;
......
......@@ -42,7 +42,8 @@ import java.util.Map.Entry;
*/
@SuppressWarnings("serial")
public class LabelPropagation<K extends Comparable<K>> implements GraphAlgorithm<K, Long, NullValue> {
public class LabelPropagation<K extends Comparable<K>> implements
GraphAlgorithm<K, Long, NullValue, Graph<K, Long, NullValue>> {
private final int maxIterations;
......
......@@ -37,7 +37,8 @@ import org.apache.flink.graph.spargel.VertexUpdateFunction;
*
* The implementation assumes that each page has at least one incoming and one outgoing link.
*/
public class PageRank<K> implements GraphAlgorithm<K, Double, Double> {
public class PageRank<K> implements
GraphAlgorithm<K, Double, Double, Graph<K, Double, Double>> {
private double beta;
private int maxIterations;
......
......@@ -31,7 +31,8 @@ import org.apache.flink.graph.spargel.VertexUpdateFunction;
* This is an implementation of the Single-Source-Shortest Paths algorithm, using a vertex-centric iteration.
*/
@SuppressWarnings("serial")
public class SingleSourceShortestPaths<K> implements GraphAlgorithm<K, Double, Double> {
public class SingleSourceShortestPaths<K> implements
GraphAlgorithm<K, Double, Double, Graph<K, Double, Double>> {
private final K srcVertexId;
private final Integer maxIterations;
......
......@@ -369,6 +369,7 @@ public class GatherSumApplyConfigurationITCase extends MultipleProgramsTestBase
}
}
@SuppressWarnings("serial")
private static final class GetReachableVertices extends GatherFunction<HashSet<Long>, Long, HashSet<Long>> {
@Override
......@@ -377,6 +378,7 @@ public class GatherSumApplyConfigurationITCase extends MultipleProgramsTestBase
}
}
@SuppressWarnings("serial")
private static final class FindAllReachableVertices extends SumFunction<HashSet<Long>, Long, HashSet<Long>> {
@Override
public HashSet<Long> sum(HashSet<Long> newSet, HashSet<Long> currentSet) {
......@@ -388,6 +390,7 @@ public class GatherSumApplyConfigurationITCase extends MultipleProgramsTestBase
}
}
@SuppressWarnings("serial")
private static final class UpdateReachableVertices extends ApplyFunction<Long, HashSet<Long>, HashSet<Long>> {
@Override
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册