提交 ce2163e6 编写于 作者: V vasia

[gelly] removes generic type constraints

This closes #657
上级 ddb2b347
...@@ -18,8 +18,6 @@ ...@@ -18,8 +18,6 @@
package org.apache.flink.graph; package org.apache.flink.graph;
import java.io.Serializable;
import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple3;
/** /**
...@@ -30,8 +28,7 @@ import org.apache.flink.api.java.tuple.Tuple3; ...@@ -30,8 +28,7 @@ import org.apache.flink.api.java.tuple.Tuple3;
* @param <K> the key type for the sources and target vertices * @param <K> the key type for the sources and target vertices
* @param <V> the edge value type * @param <V> the edge value type
*/ */
public class Edge<K extends Comparable<K> & Serializable, V extends Serializable> public class Edge<K, V> extends Tuple3<K, K, V>{
extends Tuple3<K, K, V>{
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
......
...@@ -32,8 +32,7 @@ import org.apache.flink.util.Collector; ...@@ -32,8 +32,7 @@ import org.apache.flink.util.Collector;
* @param <EV> the edge value type * @param <EV> the edge value type
* @param <O> the type of the return value * @param <O> the type of the return value
*/ */
public interface EdgesFunction<K extends Comparable<K> & Serializable, public interface EdgesFunction<K, EV, O> extends Function, Serializable {
EV extends Serializable, O> extends Function, Serializable {
void iterateEdges(Iterable<Tuple2<K, Edge<K, EV>>> edges, Collector<O> out) throws Exception; void iterateEdges(Iterable<Tuple2<K, Edge<K, EV>>> edges, Collector<O> out) throws Exception;
} }
...@@ -33,8 +33,7 @@ import org.apache.flink.util.Collector; ...@@ -33,8 +33,7 @@ import org.apache.flink.util.Collector;
* @param <EV> the edge value type * @param <EV> the edge value type
* @param <O> the type of the return value * @param <O> the type of the return value
*/ */
public interface EdgesFunctionWithVertexValue<K extends Comparable<K> & Serializable, public interface EdgesFunctionWithVertexValue<K, VV, EV, O> extends Function, Serializable {
VV extends Serializable, EV extends Serializable, O> extends Function, Serializable {
void iterateEdges(Vertex<K, VV> v, Iterable<Edge<K, EV>> edges, Collector<O> out) throws Exception; void iterateEdges(Vertex<K, VV> v, Iterable<Edge<K, EV>> edges, Collector<O> out) throws Exception;
} }
...@@ -18,7 +18,6 @@ ...@@ -18,7 +18,6 @@
package org.apache.flink.graph; package org.apache.flink.graph;
import java.io.Serializable;
import java.util.Collection; import java.util.Collection;
import java.util.Iterator; import java.util.Iterator;
import java.util.NoSuchElementException; import java.util.NoSuchElementException;
...@@ -30,17 +29,13 @@ import org.apache.flink.api.common.functions.FilterFunction; ...@@ -30,17 +29,13 @@ import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatJoinFunction; import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.aggregation.Aggregations;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst; import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple3;
...@@ -77,7 +72,7 @@ import org.apache.flink.types.NullValue; ...@@ -77,7 +72,7 @@ import org.apache.flink.types.NullValue;
* @param <EV> the value type for edges * @param <EV> the value type for edges
*/ */
@SuppressWarnings("serial") @SuppressWarnings("serial")
public class Graph<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable> { public class Graph<K, VV, EV> {
private final ExecutionEnvironment context; private final ExecutionEnvironment context;
private final DataSet<Vertex<K, VV>> vertices; private final DataSet<Vertex<K, VV>> vertices;
...@@ -104,9 +99,8 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab ...@@ -104,9 +99,8 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
* @param context the flink execution environment. * @param context the flink execution environment.
* @return the newly created graph. * @return the newly created graph.
*/ */
public static <K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable> Graph<K, VV, EV> fromCollection( public static <K, VV, EV> Graph<K, VV, EV> fromCollection(Collection<Vertex<K, VV>> vertices,
Collection<Vertex<K, VV>> vertices, Collection<Edge<K, EV>> edges, Collection<Edge<K, EV>> edges, ExecutionEnvironment context) {
ExecutionEnvironment context) {
return fromDataSet(context.fromCollection(vertices), return fromDataSet(context.fromCollection(vertices),
context.fromCollection(edges), context); context.fromCollection(edges), context);
...@@ -121,8 +115,8 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab ...@@ -121,8 +115,8 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
* @param context the flink execution environment. * @param context the flink execution environment.
* @return the newly created graph. * @return the newly created graph.
*/ */
public static <K extends Comparable<K> & Serializable, EV extends Serializable> Graph<K, NullValue, EV> fromCollection( public static <K, EV> Graph<K, NullValue, EV> fromCollection(Collection<Edge<K, EV>> edges,
Collection<Edge<K, EV>> edges, ExecutionEnvironment context) { ExecutionEnvironment context) {
return fromDataSet(context.fromCollection(edges), context); return fromDataSet(context.fromCollection(edges), context);
} }
...@@ -138,9 +132,8 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab ...@@ -138,9 +132,8 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
* @param context the flink execution environment. * @param context the flink execution environment.
* @return the newly created graph. * @return the newly created graph.
*/ */
public static <K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable> Graph<K, VV, EV> fromCollection( public static <K, VV, EV> Graph<K, VV, EV> fromCollection(Collection<Edge<K, EV>> edges,
Collection<Edge<K, EV>> edges, final MapFunction<K, VV> mapper, final MapFunction<K, VV> mapper,ExecutionEnvironment context) {
ExecutionEnvironment context) {
return fromDataSet(context.fromCollection(edges), mapper, context); return fromDataSet(context.fromCollection(edges), mapper, context);
} }
...@@ -153,9 +146,8 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab ...@@ -153,9 +146,8 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
* @param context the flink execution environment. * @param context the flink execution environment.
* @return the newly created graph. * @return the newly created graph.
*/ */
public static <K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable> Graph<K, VV, EV> fromDataSet( public static <K, VV, EV> Graph<K, VV, EV> fromDataSet(DataSet<Vertex<K, VV>> vertices,
DataSet<Vertex<K, VV>> vertices, DataSet<Edge<K, EV>> edges, DataSet<Edge<K, EV>> edges, ExecutionEnvironment context) {
ExecutionEnvironment context) {
return new Graph<K, VV, EV>(vertices, edges, context); return new Graph<K, VV, EV>(vertices, edges, context);
} }
...@@ -169,7 +161,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab ...@@ -169,7 +161,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
* @param context the flink execution environment. * @param context the flink execution environment.
* @return the newly created graph. * @return the newly created graph.
*/ */
public static <K extends Comparable<K> & Serializable, EV extends Serializable> Graph<K, NullValue, EV> fromDataSet( public static <K, EV> Graph<K, NullValue, EV> fromDataSet(
DataSet<Edge<K, EV>> edges, ExecutionEnvironment context) { DataSet<Edge<K, EV>> edges, ExecutionEnvironment context) {
DataSet<Vertex<K, NullValue>> vertices = edges.flatMap(new EmitSrcAndTarget<K, EV>()).distinct(); DataSet<Vertex<K, NullValue>> vertices = edges.flatMap(new EmitSrcAndTarget<K, EV>()).distinct();
...@@ -177,8 +169,8 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab ...@@ -177,8 +169,8 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
return new Graph<K, NullValue, EV>(vertices, edges, context); return new Graph<K, NullValue, EV>(vertices, edges, context);
} }
private static final class EmitSrcAndTarget<K extends Comparable<K> & Serializable, EV extends Serializable> private static final class EmitSrcAndTarget<K, EV> implements FlatMapFunction<
implements FlatMapFunction<Edge<K, EV>, Vertex<K, NullValue>> { Edge<K, EV>, Vertex<K, NullValue>> {
public void flatMap(Edge<K, EV> edge, Collector<Vertex<K, NullValue>> out) { public void flatMap(Edge<K, EV> edge, Collector<Vertex<K, NullValue>> out) {
out.collect(new Vertex<K, NullValue>(edge.f0, NullValue.getInstance())); out.collect(new Vertex<K, NullValue>(edge.f0, NullValue.getInstance()));
...@@ -197,8 +189,8 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab ...@@ -197,8 +189,8 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
* @param context the flink execution environment. * @param context the flink execution environment.
* @return the newly created graph. * @return the newly created graph.
*/ */
public static <K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable> Graph<K, VV, EV> fromDataSet( public static <K, VV, EV> Graph<K, VV, EV> fromDataSet(DataSet<Edge<K, EV>> edges,
DataSet<Edge<K, EV>> edges, final MapFunction<K, VV> mapper, ExecutionEnvironment context) { final MapFunction<K, VV> mapper, ExecutionEnvironment context) {
TypeInformation<K> keyType = ((TupleTypeInfo<?>) edges.getType()).getTypeAt(0); TypeInformation<K> keyType = ((TupleTypeInfo<?>) edges.getType()).getTypeAt(0);
...@@ -220,8 +212,8 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab ...@@ -220,8 +212,8 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
return new Graph<K, VV, EV>(vertices, edges, context); return new Graph<K, VV, EV>(vertices, edges, context);
} }
private static final class EmitSrcAndTargetAsTuple1<K extends Comparable<K> & Serializable, EV extends Serializable> private static final class EmitSrcAndTargetAsTuple1<K, EV> implements FlatMapFunction<
implements FlatMapFunction<Edge<K, EV>, Tuple1<K>> { Edge<K, EV>, Tuple1<K>> {
public void flatMap(Edge<K, EV> edge, Collector<Tuple1<K>> out) { public void flatMap(Edge<K, EV> edge, Collector<Tuple1<K>> out) {
out.collect(new Tuple1<K>(edge.f0)); out.collect(new Tuple1<K>(edge.f0));
...@@ -240,8 +232,8 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab ...@@ -240,8 +232,8 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
* @param context the flink execution environment. * @param context the flink execution environment.
* @return the newly created graph. * @return the newly created graph.
*/ */
public static <K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable> Graph<K, VV, EV> fromTupleDataSet( public static <K, VV, EV> Graph<K, VV, EV> fromTupleDataSet(DataSet<Tuple2<K, VV>> vertices,
DataSet<Tuple2<K, VV>> vertices, DataSet<Tuple3<K, K, EV>> edges, ExecutionEnvironment context) { DataSet<Tuple3<K, K, EV>> edges, ExecutionEnvironment context) {
DataSet<Vertex<K, VV>> vertexDataSet = vertices.map(new Tuple2ToVertexMap<K, VV>()); DataSet<Vertex<K, VV>> vertexDataSet = vertices.map(new Tuple2ToVertexMap<K, VV>());
DataSet<Edge<K, EV>> edgeDataSet = edges.map(new Tuple3ToEdgeMap<K, EV>()); DataSet<Edge<K, EV>> edgeDataSet = edges.map(new Tuple3ToEdgeMap<K, EV>());
...@@ -259,8 +251,8 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab ...@@ -259,8 +251,8 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
* @param context the flink execution environment. * @param context the flink execution environment.
* @return the newly created graph. * @return the newly created graph.
*/ */
public static <K extends Comparable<K> & Serializable, EV extends Serializable> Graph<K, NullValue, EV> fromTupleDataSet( public static <K, EV> Graph<K, NullValue, EV> fromTupleDataSet(DataSet<Tuple3<K, K, EV>> edges,
DataSet<Tuple3<K, K, EV>> edges, ExecutionEnvironment context) { ExecutionEnvironment context) {
DataSet<Edge<K, EV>> edgeDataSet = edges.map(new Tuple3ToEdgeMap<K, EV>()); DataSet<Edge<K, EV>> edgeDataSet = edges.map(new Tuple3ToEdgeMap<K, EV>());
return fromDataSet(edgeDataSet, context); return fromDataSet(edgeDataSet, context);
...@@ -278,8 +270,8 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab ...@@ -278,8 +270,8 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
* @param context the flink execution environment. * @param context the flink execution environment.
* @return the newly created graph. * @return the newly created graph.
*/ */
public static <K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable> Graph<K, VV, EV> fromTupleDataSet( public static <K, VV, EV> Graph<K, VV, EV> fromTupleDataSet(DataSet<Tuple3<K, K, EV>> edges,
DataSet<Tuple3<K, K, EV>> edges, final MapFunction<K, VV> mapper, ExecutionEnvironment context) { final MapFunction<K, VV> mapper, ExecutionEnvironment context) {
DataSet<Edge<K, EV>> edgeDataSet = edges.map(new Tuple3ToEdgeMap<K, EV>()); DataSet<Edge<K, EV>> edgeDataSet = edges.map(new Tuple3ToEdgeMap<K, EV>());
return fromDataSet(edgeDataSet, mapper, context); return fromDataSet(edgeDataSet, mapper, context);
...@@ -367,7 +359,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab ...@@ -367,7 +359,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
* @return a new graph * @return a new graph
*/ */
@SuppressWarnings({ "unchecked", "rawtypes" }) @SuppressWarnings({ "unchecked", "rawtypes" })
public <NV extends Serializable> Graph<K, NV, EV> mapVertices(final MapFunction<Vertex<K, VV>, NV> mapper) { public <NV> Graph<K, NV, EV> mapVertices(final MapFunction<Vertex<K, VV>, NV> mapper) {
TypeInformation<K> keyType = ((TupleTypeInfo<?>) vertices.getType()).getTypeAt(0); TypeInformation<K> keyType = ((TupleTypeInfo<?>) vertices.getType()).getTypeAt(0);
...@@ -393,7 +385,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab ...@@ -393,7 +385,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
* @return a new graph * @return a new graph
*/ */
@SuppressWarnings({ "unchecked", "rawtypes" }) @SuppressWarnings({ "unchecked", "rawtypes" })
public <NV extends Serializable> Graph<K, VV, NV> mapEdges(final MapFunction<Edge<K, EV>, NV> mapper) { public <NV> Graph<K, VV, NV> mapEdges(final MapFunction<Edge<K, EV>, NV> mapper) {
TypeInformation<K> keyType = ((TupleTypeInfo<?>) edges.getType()).getTypeAt(0); TypeInformation<K> keyType = ((TupleTypeInfo<?>) edges.getType()).getTypeAt(0);
...@@ -430,7 +422,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab ...@@ -430,7 +422,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
return new Graph<K, VV, EV>(resultedVertices, this.edges, this.context); return new Graph<K, VV, EV>(resultedVertices, this.edges, this.context);
} }
private static final class ApplyCoGroupToVertexValues<K extends Comparable<K> & Serializable, VV extends Serializable, T> private static final class ApplyCoGroupToVertexValues<K, VV, T>
implements CoGroupFunction<Vertex<K, VV>, Tuple2<K, T>, Vertex<K, VV>> { implements CoGroupFunction<Vertex<K, VV>, Tuple2<K, T>, Vertex<K, VV>> {
private MapFunction<Tuple2<VV, T>, VV> mapper; private MapFunction<Tuple2<VV, T>, VV> mapper;
...@@ -479,7 +471,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab ...@@ -479,7 +471,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
return new Graph<K, VV, EV>(this.vertices, resultedEdges, this.context); return new Graph<K, VV, EV>(this.vertices, resultedEdges, this.context);
} }
private static final class ApplyCoGroupToEdgeValues<K extends Comparable<K> & Serializable, EV extends Serializable, T> private static final class ApplyCoGroupToEdgeValues<K, EV, T>
implements CoGroupFunction<Edge<K, EV>, Tuple3<K, K, T>, Edge<K, EV>> { implements CoGroupFunction<Edge<K, EV>, Tuple3<K, K, T>, Edge<K, EV>> {
private MapFunction<Tuple2<EV, T>, EV> mapper; private MapFunction<Tuple2<EV, T>, EV> mapper;
...@@ -530,7 +522,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab ...@@ -530,7 +522,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
return new Graph<K, VV, EV>(this.vertices, resultedEdges, this.context); return new Graph<K, VV, EV>(this.vertices, resultedEdges, this.context);
} }
private static final class ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget<K extends Comparable<K> & Serializable, EV extends Serializable, T> private static final class ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget<K, EV, T>
implements CoGroupFunction<Edge<K, EV>, Tuple2<K, T>, Edge<K, EV>> { implements CoGroupFunction<Edge<K, EV>, Tuple2<K, T>, Edge<K, EV>> {
private MapFunction<Tuple2<EV, T>, EV> mapper; private MapFunction<Tuple2<EV, T>, EV> mapper;
...@@ -643,8 +635,8 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab ...@@ -643,8 +635,8 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
} }
@ForwardedFieldsFirst("0->0;1->1;2->2") @ForwardedFieldsFirst("0->0;1->1;2->2")
private static final class ProjectEdge<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable> private static final class ProjectEdge<K, VV, EV> implements FlatJoinFunction<
implements FlatJoinFunction<Edge<K, EV>, Vertex<K, VV>, Edge<K, EV>> { Edge<K, EV>, Vertex<K, VV>, Edge<K, EV>> {
public void join(Edge<K, EV> first, Vertex<K, VV> second, Collector<Edge<K, EV>> out) { public void join(Edge<K, EV> first, Vertex<K, VV> second, Collector<Edge<K, EV>> out) {
out.collect(first); out.collect(first);
} }
...@@ -660,7 +652,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab ...@@ -660,7 +652,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
return vertices.coGroup(edges).where(0).equalTo(0).with(new CountNeighborsCoGroup<K, VV, EV>()); return vertices.coGroup(edges).where(0).equalTo(0).with(new CountNeighborsCoGroup<K, VV, EV>());
} }
private static final class CountNeighborsCoGroup<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable> private static final class CountNeighborsCoGroup<K, VV, EV>
implements CoGroupFunction<Vertex<K, VV>, Edge<K, EV>, Tuple2<K, Long>> { implements CoGroupFunction<Vertex<K, VV>, Edge<K, EV>, Tuple2<K, Long>> {
@SuppressWarnings("unused") @SuppressWarnings("unused")
public void coGroup(Iterable<Vertex<K, VV>> vertex, Iterable<Edge<K, EV>> outEdges, public void coGroup(Iterable<Vertex<K, VV>> vertex, Iterable<Edge<K, EV>> outEdges,
...@@ -772,8 +764,8 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab ...@@ -772,8 +764,8 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
} }
} }
private static final class ProjectVertexIdMap<K extends Comparable<K> & Serializable, EV extends Serializable> private static final class ProjectVertexIdMap<K, EV> implements MapFunction<
implements MapFunction<Edge<K, EV>, Tuple2<K, Edge<K, EV>>> { Edge<K, EV>, Tuple2<K, Edge<K, EV>>> {
private int fieldPosition; private int fieldPosition;
...@@ -787,8 +779,8 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab ...@@ -787,8 +779,8 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
} }
} }
private static final class ProjectVertexWithEdgeValueMap<K extends Comparable<K> & Serializable, EV extends Serializable> private static final class ProjectVertexWithEdgeValueMap<K, EV> implements MapFunction<
implements MapFunction<Edge<K, EV>, Tuple2<K, EV>> { Edge<K, EV>, Tuple2<K, EV>> {
private int fieldPosition; private int fieldPosition;
...@@ -802,8 +794,8 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab ...@@ -802,8 +794,8 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
} }
} }
private static final class ApplyGroupReduceFunction<K extends Comparable<K> & Serializable, EV extends Serializable, T> private static final class ApplyGroupReduceFunction<K, EV, T> implements GroupReduceFunction<
implements GroupReduceFunction<Tuple2<K, Edge<K, EV>>, T>, ResultTypeQueryable<T> { Tuple2<K, Edge<K, EV>>, T>, ResultTypeQueryable<T> {
private EdgesFunction<K, EV, T> function; private EdgesFunction<K, EV, T> function;
...@@ -821,32 +813,35 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab ...@@ -821,32 +813,35 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
} }
} }
private static final class EmitOneEdgePerNode<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable> private static final class EmitOneEdgePerNode<K, VV, EV> implements FlatMapFunction<
implements FlatMapFunction<Edge<K, EV>, Tuple2<K, Edge<K, EV>>> { Edge<K, EV>, Tuple2<K, Edge<K, EV>>> {
public void flatMap(Edge<K, EV> edge, Collector<Tuple2<K, Edge<K, EV>>> out) { public void flatMap(Edge<K, EV> edge, Collector<Tuple2<K, Edge<K, EV>>> out) {
out.collect(new Tuple2<K, Edge<K, EV>>(edge.getSource(), edge)); out.collect(new Tuple2<K, Edge<K, EV>>(edge.getSource(), edge));
out.collect(new Tuple2<K, Edge<K, EV>>(edge.getTarget(), edge)); out.collect(new Tuple2<K, Edge<K, EV>>(edge.getTarget(), edge));
} }
} }
private static final class EmitOneVertexWithEdgeValuePerNode<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable> private static final class EmitOneVertexWithEdgeValuePerNode<K, VV, EV> implements FlatMapFunction<
implements FlatMapFunction<Edge<K, EV>, Tuple2<K, EV>> { Edge<K, EV>, Tuple2<K, EV>> {
public void flatMap(Edge<K, EV> edge, Collector<Tuple2<K, EV>> out) { public void flatMap(Edge<K, EV> edge, Collector<Tuple2<K, EV>> out) {
out.collect(new Tuple2<K, EV>(edge.getSource(), edge.getValue())); out.collect(new Tuple2<K, EV>(edge.getSource(), edge.getValue()));
out.collect(new Tuple2<K, EV>(edge.getTarget(), edge.getValue())); out.collect(new Tuple2<K, EV>(edge.getTarget(), edge.getValue()));
} }
} }
private static final class EmitOneEdgeWithNeighborPerNode<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable> private static final class EmitOneEdgeWithNeighborPerNode<K, VV, EV> implements FlatMapFunction<
implements FlatMapFunction<Edge<K, EV>, Tuple3<K, K, Edge<K, EV>>> { Edge<K, EV>, Tuple3<K, K, Edge<K, EV>>> {
public void flatMap(Edge<K, EV> edge, Collector<Tuple3<K, K, Edge<K, EV>>> out) { public void flatMap(Edge<K, EV> edge, Collector<Tuple3<K, K, Edge<K, EV>>> out) {
out.collect(new Tuple3<K, K, Edge<K, EV>>(edge.getSource(), edge.getTarget(), edge)); out.collect(new Tuple3<K, K, Edge<K, EV>>(edge.getSource(), edge.getTarget(), edge));
out.collect(new Tuple3<K, K, Edge<K, EV>>(edge.getTarget(), edge.getSource(), edge)); out.collect(new Tuple3<K, K, Edge<K, EV>>(edge.getTarget(), edge.getSource(), edge));
} }
} }
private static final class ApplyCoGroupFunction<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable, T> private static final class ApplyCoGroupFunction<K, VV, EV, T> implements CoGroupFunction<
implements CoGroupFunction<Vertex<K, VV>, Edge<K, EV>, T>, ResultTypeQueryable<T> { Vertex<K, VV>, Edge<K, EV>, T>, ResultTypeQueryable<T> {
private EdgesFunctionWithVertexValue<K, VV, EV, T> function; private EdgesFunctionWithVertexValue<K, VV, EV, T> function;
...@@ -866,7 +861,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab ...@@ -866,7 +861,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
} }
} }
private static final class ApplyCoGroupFunctionOnAllEdges<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable, T> private static final class ApplyCoGroupFunctionOnAllEdges<K, VV, EV, T>
implements CoGroupFunction<Vertex<K, VV>, Tuple2<K, Edge<K, EV>>, T>, ResultTypeQueryable<T> { implements CoGroupFunction<Vertex<K, VV>, Tuple2<K, Edge<K, EV>>, T>, ResultTypeQueryable<T> {
private EdgesFunctionWithVertexValue<K, VV, EV, T> function; private EdgesFunctionWithVertexValue<K, VV, EV, T> function;
...@@ -915,7 +910,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab ...@@ -915,7 +910,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
} }
@ForwardedFields("0->1;1->0;2->2") @ForwardedFields("0->1;1->0;2->2")
private static final class ReverseEdgesMap<K extends Comparable<K> & Serializable, EV extends Serializable> private static final class ReverseEdgesMap<K, EV>
implements MapFunction<Edge<K, EV>, Edge<K, EV>> { implements MapFunction<Edge<K, EV>, Edge<K, EV>> {
public Edge<K, EV> map(Edge<K, EV> value) { public Edge<K, EV> map(Edge<K, EV> value) {
...@@ -955,7 +950,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab ...@@ -955,7 +950,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
return vertices.map(new ExtractVertexIDMapper<K, VV>()); return vertices.map(new ExtractVertexIDMapper<K, VV>());
} }
private static final class ExtractVertexIDMapper<K extends Comparable<K> & Serializable, VV extends Serializable> private static final class ExtractVertexIDMapper<K, VV>
implements MapFunction<Vertex<K, VV>, K> { implements MapFunction<Vertex<K, VV>, K> {
@Override @Override
public K map(Vertex<K, VV> vertex) { public K map(Vertex<K, VV> vertex) {
...@@ -970,7 +965,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab ...@@ -970,7 +965,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
return edges.map(new ExtractEdgeIDsMapper<K, EV>()); return edges.map(new ExtractEdgeIDsMapper<K, EV>());
} }
private static final class ExtractEdgeIDsMapper<K extends Comparable<K> & Serializable, EV extends Serializable> private static final class ExtractEdgeIDsMapper<K, EV>
implements MapFunction<Edge<K, EV>, Tuple2<K, K>> { implements MapFunction<Edge<K, EV>, Tuple2<K, K>> {
@Override @Override
public Tuple2<K, K> map(Edge<K, EV> edge) throws Exception { public Tuple2<K, K> map(Edge<K, EV> edge) throws Exception {
...@@ -978,67 +973,6 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab ...@@ -978,67 +973,6 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
} }
} }
/**
* Checks the weak connectivity of a graph.
*
* @param maxIterations
* the maximum number of iterations for the inner delta iteration
* @return true if the graph is weakly connected.
*/
public boolean isWeaklyConnected(int maxIterations) throws Exception {
// first, convert to an undirected graph
Graph<K, VV, EV> graph = this.getUndirected();
DataSet<K> vertexIds = graph.getVertexIds();
DataSet<Tuple2<K, K>> verticesWithInitialIds = vertexIds
.map(new DuplicateVertexIDMapper<K>());
DataSet<Tuple2<K, K>> edgeIds = graph.getEdgeIds();
DeltaIteration<Tuple2<K, K>, Tuple2<K, K>> iteration = verticesWithInitialIds
.iterateDelta(verticesWithInitialIds, maxIterations, 0);
DataSet<Tuple2<K, K>> changes = iteration.getWorkset()
.join(edgeIds, JoinHint.REPARTITION_SORT_MERGE)
.where(0).equalTo(0).with(new FindNeighborsJoin<K>())
.groupBy(0).aggregate(Aggregations.MIN, 1)
.join(iteration.getSolutionSet(), JoinHint.REPARTITION_SORT_MERGE).where(0).equalTo(0)
.with(new VertexWithNewComponentJoin<K>());
DataSet<Tuple2<K, K>> components = iteration.closeWith(changes, changes);
return components.groupBy(1).reduceGroup(new EmitFirstReducer<K>()).count() == 1;
}
private static final class DuplicateVertexIDMapper<K> implements MapFunction<K, Tuple2<K, K>> {
@Override
public Tuple2<K, K> map(K k) {
return new Tuple2<K, K>(k, k);
}
}
private static final class FindNeighborsJoin<K> implements JoinFunction<Tuple2<K, K>, Tuple2<K, K>, Tuple2<K, K>> {
@Override
public Tuple2<K, K> join(Tuple2<K, K> vertexWithComponent, Tuple2<K, K> edge) {
return new Tuple2<K, K>(edge.f1, vertexWithComponent.f1);
}
}
private static final class VertexWithNewComponentJoin<K extends Comparable<K>>
implements FlatJoinFunction<Tuple2<K, K>, Tuple2<K, K>, Tuple2<K, K>> {
@Override
public void join(Tuple2<K, K> candidate, Tuple2<K, K> old, Collector<Tuple2<K, K>> out) {
if (candidate.f1.compareTo(old.f1) < 0) {
out.collect(candidate);
}
}
}
private static final class EmitFirstReducer<K> implements GroupReduceFunction<Tuple2<K, K>, Tuple2<K, K>> {
public void reduce(Iterable<Tuple2<K, K>> values, Collector<Tuple2<K, K>> out) {
out.collect(values.iterator().next());
}
}
/** /**
* Adds the input vertex and edges to the graph. If the vertex already * Adds the input vertex and edges to the graph. If the vertex already
* exists in the graph, it will not be added again, but the given edges * exists in the graph, it will not be added again, but the given edges
...@@ -1098,7 +1032,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab ...@@ -1098,7 +1032,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
return new Graph<K, VV, EV>(newVertices, newEdges, this.context); return new Graph<K, VV, EV>(newVertices, newEdges, this.context);
} }
private static final class RemoveVertexFilter<K extends Comparable<K> & Serializable, VV extends Serializable> private static final class RemoveVertexFilter<K, VV>
implements FilterFunction<Vertex<K, VV>> { implements FilterFunction<Vertex<K, VV>> {
private Vertex<K, VV> vertexToRemove; private Vertex<K, VV> vertexToRemove;
...@@ -1113,7 +1047,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab ...@@ -1113,7 +1047,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
} }
} }
private static final class VertexRemovalEdgeFilter<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable> private static final class VertexRemovalEdgeFilter<K, VV, EV>
implements FilterFunction<Edge<K, EV>> { implements FilterFunction<Edge<K, EV>> {
private Vertex<K, VV> vertexToRemove; private Vertex<K, VV> vertexToRemove;
...@@ -1147,7 +1081,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab ...@@ -1147,7 +1081,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
return new Graph<K, VV, EV>(this.vertices, newEdges, this.context); return new Graph<K, VV, EV>(this.vertices, newEdges, this.context);
} }
private static final class EdgeRemovalEdgeFilter<K extends Comparable<K> & Serializable, EV extends Serializable> private static final class EdgeRemovalEdgeFilter<K, EV>
implements FilterFunction<Edge<K, EV>> { implements FilterFunction<Edge<K, EV>> {
private Edge<K, EV> edgeToRemove; private Edge<K, EV> edgeToRemove;
...@@ -1336,7 +1270,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab ...@@ -1336,7 +1270,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
} }
} }
private static final class ApplyNeighborGroupReduceFunction<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable, T> private static final class ApplyNeighborGroupReduceFunction<K, VV, EV, T>
implements GroupReduceFunction<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>, T>, ResultTypeQueryable<T> { implements GroupReduceFunction<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>, T>, ResultTypeQueryable<T> {
private NeighborsFunction<K, VV, EV, T> function; private NeighborsFunction<K, VV, EV, T> function;
...@@ -1355,7 +1289,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab ...@@ -1355,7 +1289,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
} }
} }
private static final class ProjectVertexWithNeighborValueJoin<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable> private static final class ProjectVertexWithNeighborValueJoin<K, VV, EV>
implements FlatJoinFunction<Edge<K, EV>, Vertex<K, VV>, Tuple2<K, VV>> { implements FlatJoinFunction<Edge<K, EV>, Vertex<K, VV>, Tuple2<K, VV>> {
private int fieldPosition; private int fieldPosition;
...@@ -1371,8 +1305,9 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab ...@@ -1371,8 +1305,9 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
} }
} }
private static final class ProjectVertexIdJoin<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable> private static final class ProjectVertexIdJoin<K, VV, EV> implements FlatJoinFunction<
implements FlatJoinFunction<Edge<K, EV>, Vertex<K, VV>, Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> { Edge<K, EV>, Vertex<K, VV>, Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> {
private int fieldPosition; private int fieldPosition;
public ProjectVertexIdJoin(int position) { public ProjectVertexIdJoin(int position) {
this.fieldPosition = position; this.fieldPosition = position;
...@@ -1384,8 +1319,8 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab ...@@ -1384,8 +1319,8 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
} }
} }
private static final class ProjectNeighborValue<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable> private static final class ProjectNeighborValue<K, VV, EV> implements FlatJoinFunction<
implements FlatJoinFunction<Tuple3<K, K, Edge<K, EV>>, Vertex<K, VV>, Tuple2<K, VV>> { Tuple3<K, K, Edge<K, EV>>, Vertex<K, VV>, Tuple2<K, VV>> {
public void join(Tuple3<K, K, Edge<K, EV>> keysWithEdge, Vertex<K, VV> neighbor, public void join(Tuple3<K, K, Edge<K, EV>> keysWithEdge, Vertex<K, VV> neighbor,
Collector<Tuple2<K, VV>> out) { Collector<Tuple2<K, VV>> out) {
...@@ -1394,8 +1329,8 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab ...@@ -1394,8 +1329,8 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
} }
} }
private static final class ProjectEdgeWithNeighbor<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable> private static final class ProjectEdgeWithNeighbor<K, VV, EV> implements FlatJoinFunction<
implements FlatJoinFunction<Tuple3<K, K, Edge<K, EV>>, Vertex<K, VV>, Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> { Tuple3<K, K, Edge<K, EV>>, Vertex<K, VV>, Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> {
public void join(Tuple3<K, K, Edge<K, EV>> keysWithEdge, Vertex<K, VV> neighbor, public void join(Tuple3<K, K, Edge<K, EV>> keysWithEdge, Vertex<K, VV> neighbor,
Collector<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> out) { Collector<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> out) {
...@@ -1403,8 +1338,8 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab ...@@ -1403,8 +1338,8 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
} }
} }
private static final class ApplyNeighborCoGroupFunction<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable, T> private static final class ApplyNeighborCoGroupFunction<K, VV, EV, T> implements CoGroupFunction<
implements CoGroupFunction<Vertex<K, VV>, Tuple2<Edge<K, EV>, Vertex<K, VV>>, T>, ResultTypeQueryable<T> { Vertex<K, VV>, Tuple2<Edge<K, EV>, Vertex<K, VV>>, T>, ResultTypeQueryable<T> {
private NeighborsFunctionWithVertexValue<K, VV, EV, T> function; private NeighborsFunctionWithVertexValue<K, VV, EV, T> function;
...@@ -1423,7 +1358,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab ...@@ -1423,7 +1358,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
} }
} }
private static final class ApplyCoGroupFunctionOnAllNeighbors<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable, T> private static final class ApplyCoGroupFunctionOnAllNeighbors<K, VV, EV, T>
implements CoGroupFunction<Vertex<K, VV>, Tuple3<K, Edge<K, EV>, Vertex<K, VV>>, T>, ResultTypeQueryable<T> { implements CoGroupFunction<Vertex<K, VV>, Tuple3<K, Edge<K, EV>, Vertex<K, VV>>, T>, ResultTypeQueryable<T> {
private NeighborsFunctionWithVertexValue<K, VV, EV, T> function; private NeighborsFunctionWithVertexValue<K, VV, EV, T> function;
...@@ -1513,8 +1448,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab ...@@ -1513,8 +1448,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
} }
@ForwardedFields("f0") @ForwardedFields("f0")
private static final class ApplyNeighborReduceFunction<K extends Comparable<K> & Serializable, VV extends Serializable> private static final class ApplyNeighborReduceFunction<K, VV> implements ReduceFunction<Tuple2<K, VV>> {
implements ReduceFunction<Tuple2<K, VV>> {
private ReduceNeighborsFunction<VV> function; private ReduceNeighborsFunction<VV> function;
...@@ -1561,8 +1495,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab ...@@ -1561,8 +1495,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
} }
@ForwardedFields("f0") @ForwardedFields("f0")
private static final class ApplyReduceFunction<K extends Comparable<K> & Serializable, EV extends Serializable> private static final class ApplyReduceFunction<K, EV> implements ReduceFunction<Tuple2<K, EV>> {
implements ReduceFunction<Tuple2<K, EV>> {
private ReduceEdgesFunction<EV> function; private ReduceEdgesFunction<EV> function;
......
...@@ -18,14 +18,12 @@ ...@@ -18,14 +18,12 @@
package org.apache.flink.graph; package org.apache.flink.graph;
import java.io.Serializable;
/** /**
* @param <K> key type * @param <K> key type
* @param <VV> vertex value type * @param <VV> vertex value type
* @param <EV> edge value type * @param <EV> edge value type
*/ */
public interface GraphAlgorithm<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable> { public interface GraphAlgorithm<K, VV, EV> {
public Graph<K, VV, EV> run(Graph<K, VV, EV> input) throws Exception; public Graph<K, VV, EV> run(Graph<K, VV, EV> input) throws Exception;
} }
...@@ -34,8 +34,7 @@ import org.apache.flink.util.Collector; ...@@ -34,8 +34,7 @@ import org.apache.flink.util.Collector;
* @param <EV> the edge value type * @param <EV> the edge value type
* @param <O> the type of the return value * @param <O> the type of the return value
*/ */
public interface NeighborsFunction<K extends Comparable<K> & Serializable, VV extends Serializable, public interface NeighborsFunction<K, VV, EV, O> extends Function, Serializable {
EV extends Serializable, O> extends Function, Serializable {
void iterateNeighbors(Iterable<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> neighbors, Collector<O> out) throws Exception; void iterateNeighbors(Iterable<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> neighbors, Collector<O> out) throws Exception;
} }
...@@ -34,8 +34,7 @@ import org.apache.flink.util.Collector; ...@@ -34,8 +34,7 @@ import org.apache.flink.util.Collector;
* @param <EV> the edge value type * @param <EV> the edge value type
* @param <O> the type of the return value * @param <O> the type of the return value
*/ */
public interface NeighborsFunctionWithVertexValue<K extends Comparable<K> & Serializable, VV extends Serializable, public interface NeighborsFunctionWithVertexValue<K, VV, EV, O> extends Function, Serializable {
EV extends Serializable, O> extends Function, Serializable {
void iterateNeighbors(Vertex<K, VV> vertex, Iterable<Tuple2<Edge<K, EV>, Vertex<K, VV>>> neighbors, Collector<O> out) throws Exception; void iterateNeighbors(Vertex<K, VV> vertex, Iterable<Tuple2<Edge<K, EV>, Vertex<K, VV>>> neighbors, Collector<O> out) throws Exception;
} }
...@@ -28,7 +28,7 @@ import java.io.Serializable; ...@@ -28,7 +28,7 @@ import java.io.Serializable;
* *
* @param <EV> the edge value type * @param <EV> the edge value type
*/ */
public interface ReduceEdgesFunction<EV extends Serializable> extends Function, Serializable { public interface ReduceEdgesFunction<EV> extends Function, Serializable {
EV reduceEdges(EV firstEdgeValue, EV secondEdgeValue); EV reduceEdges(EV firstEdgeValue, EV secondEdgeValue);
} }
...@@ -29,7 +29,7 @@ import java.io.Serializable; ...@@ -29,7 +29,7 @@ import java.io.Serializable;
* *
* @param <VV> the vertex value type * @param <VV> the vertex value type
*/ */
public interface ReduceNeighborsFunction <VV extends Serializable> extends Function, Serializable { public interface ReduceNeighborsFunction <VV> extends Function, Serializable {
VV reduceNeighbors(VV firstNeighborValue, VV secondNeighborValue); VV reduceNeighbors(VV firstNeighborValue, VV secondNeighborValue);
} }
...@@ -20,8 +20,6 @@ package org.apache.flink.graph; ...@@ -20,8 +20,6 @@ package org.apache.flink.graph;
import org.apache.flink.api.java.tuple.Tuple5; import org.apache.flink.api.java.tuple.Tuple5;
import java.io.Serializable;
/** /**
* A Triplet stores and retrieves the edges along with their corresponding source and target vertices. * A Triplet stores and retrieves the edges along with their corresponding source and target vertices.
* Triplets can be obtained from the input graph via the {@link org.apache.flink.graph.Graph#getTriplets()} method. * Triplets can be obtained from the input graph via the {@link org.apache.flink.graph.Graph#getTriplets()} method.
...@@ -30,8 +28,7 @@ import java.io.Serializable; ...@@ -30,8 +28,7 @@ import java.io.Serializable;
* @param <VV> the vertex value type * @param <VV> the vertex value type
* @param <EV> the edge value type * @param <EV> the edge value type
*/ */
public class Triplet <K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable> public class Triplet <K, VV, EV> extends Tuple5<K, K, VV, VV, EV> {
extends Tuple5<K, K, VV, VV, EV> {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
......
...@@ -18,8 +18,6 @@ ...@@ -18,8 +18,6 @@
package org.apache.flink.graph; package org.apache.flink.graph;
import java.io.Serializable;
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple2;
/** /**
...@@ -29,8 +27,7 @@ import org.apache.flink.api.java.tuple.Tuple2; ...@@ -29,8 +27,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
* @param <K> * @param <K>
* @param <V> * @param <V>
*/ */
public class Vertex<K extends Comparable<K> & Serializable, V extends Serializable> public class Vertex<K, V> extends Tuple2<K, V> {
extends Tuple2<K, V> {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
......
...@@ -25,8 +25,7 @@ import org.apache.flink.util.Collector; ...@@ -25,8 +25,7 @@ import org.apache.flink.util.Collector;
import java.io.Serializable; import java.io.Serializable;
@SuppressWarnings("serial") @SuppressWarnings("serial")
public abstract class ApplyFunction<K extends Comparable<K> & Serializable, VV extends Serializable, M> public abstract class ApplyFunction<K, VV, M> implements Serializable {
implements Serializable {
public abstract void apply(M newValue, VV currentValue); public abstract void apply(M newValue, VV currentValue);
......
...@@ -23,7 +23,7 @@ import org.apache.flink.api.common.functions.IterationRuntimeContext; ...@@ -23,7 +23,7 @@ import org.apache.flink.api.common.functions.IterationRuntimeContext;
import java.io.Serializable; import java.io.Serializable;
@SuppressWarnings("serial") @SuppressWarnings("serial")
public abstract class GatherFunction<VV extends Serializable, EV extends Serializable, M> implements Serializable { public abstract class GatherFunction<VV, EV, M> implements Serializable {
public abstract M gather(Neighbor<VV, EV> neighbor); public abstract M gather(Neighbor<VV, EV> neighbor);
......
...@@ -39,8 +39,6 @@ import org.apache.flink.graph.Edge; ...@@ -39,8 +39,6 @@ import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Vertex; import org.apache.flink.graph.Vertex;
import org.apache.flink.util.Collector; import org.apache.flink.util.Collector;
import java.io.Serializable;
/** /**
* This class represents iterative graph computations, programmed in a gather-sum-apply perspective. * This class represents iterative graph computations, programmed in a gather-sum-apply perspective.
* *
...@@ -49,8 +47,7 @@ import java.io.Serializable; ...@@ -49,8 +47,7 @@ import java.io.Serializable;
* @param <EV> The type of the edge value in the graph * @param <EV> The type of the edge value in the graph
* @param <M> The intermediate type used by the gather, sum and apply functions * @param <M> The intermediate type used by the gather, sum and apply functions
*/ */
public class GatherSumApplyIteration<K extends Comparable<K> & Serializable, public class GatherSumApplyIteration<K, VV, EV, M> implements CustomUnaryOperation<Vertex<K, VV>,
VV extends Serializable, EV extends Serializable, M> implements CustomUnaryOperation<Vertex<K, VV>,
Vertex<K, VV>> { Vertex<K, VV>> {
private DataSet<Vertex<K, VV>> vertexDataSet; private DataSet<Vertex<K, VV>> vertexDataSet;
...@@ -159,10 +156,10 @@ public class GatherSumApplyIteration<K extends Comparable<K> & Serializable, ...@@ -159,10 +156,10 @@ public class GatherSumApplyIteration<K extends Comparable<K> & Serializable,
* *
* @return An in stance of the gather-sum-apply graph computation operator. * @return An in stance of the gather-sum-apply graph computation operator.
*/ */
public static final <K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable, M> public static final <K, VV, EV, M> GatherSumApplyIteration<K, VV, EV, M>
GatherSumApplyIteration<K, VV, EV, M> withEdges(DataSet<Edge<K, EV>> edges, withEdges(DataSet<Edge<K, EV>> edges, GatherFunction<VV, EV, M> gather,
GatherFunction<VV, EV, M> gather, SumFunction<VV, EV, M> sum, ApplyFunction<K, VV, M> apply, SumFunction<VV, EV, M> sum, ApplyFunction<K, VV, M> apply, int maximumNumberOfIterations) {
int maximumNumberOfIterations) {
return new GatherSumApplyIteration<K, VV, EV, M>(gather, sum, apply, edges, maximumNumberOfIterations); return new GatherSumApplyIteration<K, VV, EV, M>(gather, sum, apply, edges, maximumNumberOfIterations);
} }
...@@ -172,8 +169,7 @@ public class GatherSumApplyIteration<K extends Comparable<K> & Serializable, ...@@ -172,8 +169,7 @@ public class GatherSumApplyIteration<K extends Comparable<K> & Serializable,
@SuppressWarnings("serial") @SuppressWarnings("serial")
@ForwardedFields("f0") @ForwardedFields("f0")
private static final class GatherUdf<K extends Comparable<K> & Serializable, VV extends Serializable, private static final class GatherUdf<K, VV, EV, M> extends RichMapFunction<Tuple2<K, Neighbor<VV, EV>>,
EV extends Serializable, M> extends RichMapFunction<Tuple2<K, Neighbor<VV, EV>>,
Tuple2<K, M>> implements ResultTypeQueryable<Tuple2<K, M>> { Tuple2<K, M>> implements ResultTypeQueryable<Tuple2<K, M>> {
private final GatherFunction<VV, EV, M> gatherFunction; private final GatherFunction<VV, EV, M> gatherFunction;
...@@ -210,8 +206,7 @@ public class GatherSumApplyIteration<K extends Comparable<K> & Serializable, ...@@ -210,8 +206,7 @@ public class GatherSumApplyIteration<K extends Comparable<K> & Serializable,
} }
@SuppressWarnings("serial") @SuppressWarnings("serial")
private static final class SumUdf<K extends Comparable<K> & Serializable, VV extends Serializable, private static final class SumUdf<K, VV, EV, M> extends RichReduceFunction<Tuple2<K, M>>
EV extends Serializable, M> extends RichReduceFunction<Tuple2<K, M>>
implements ResultTypeQueryable<Tuple2<K, M>>{ implements ResultTypeQueryable<Tuple2<K, M>>{
private final SumFunction<VV, EV, M> sumFunction; private final SumFunction<VV, EV, M> sumFunction;
...@@ -249,8 +244,7 @@ public class GatherSumApplyIteration<K extends Comparable<K> & Serializable, ...@@ -249,8 +244,7 @@ public class GatherSumApplyIteration<K extends Comparable<K> & Serializable,
} }
@SuppressWarnings("serial") @SuppressWarnings("serial")
private static final class ApplyUdf<K extends Comparable<K> & Serializable, private static final class ApplyUdf<K, VV, EV, M> extends RichFlatJoinFunction<Tuple2<K, M>,
VV extends Serializable, EV extends Serializable, M> extends RichFlatJoinFunction<Tuple2<K, M>,
Vertex<K, VV>, Vertex<K, VV>> implements ResultTypeQueryable<Vertex<K, VV>> { Vertex<K, VV>, Vertex<K, VV>> implements ResultTypeQueryable<Vertex<K, VV>> {
private final ApplyFunction<K, VV, M> applyFunction; private final ApplyFunction<K, VV, M> applyFunction;
...@@ -289,8 +283,7 @@ public class GatherSumApplyIteration<K extends Comparable<K> & Serializable, ...@@ -289,8 +283,7 @@ public class GatherSumApplyIteration<K extends Comparable<K> & Serializable,
@SuppressWarnings("serial") @SuppressWarnings("serial")
@ForwardedFieldsSecond("f1->f0") @ForwardedFieldsSecond("f1->f0")
private static final class ProjectKeyWithNeighbor<K extends Comparable<K> & Serializable, private static final class ProjectKeyWithNeighbor<K, VV, EV> implements FlatJoinFunction<
VV extends Serializable, EV extends Serializable> implements FlatJoinFunction<
Vertex<K, VV>, Edge<K, EV>, Tuple2<K, Neighbor<VV, EV>>> { Vertex<K, VV>, Edge<K, EV>, Tuple2<K, Neighbor<VV, EV>>> {
public void join(Vertex<K, VV> vertex, Edge<K, EV> edge, Collector<Tuple2<K, Neighbor<VV, EV>>> out) { public void join(Vertex<K, VV> vertex, Edge<K, EV> edge, Collector<Tuple2<K, Neighbor<VV, EV>>> out) {
......
...@@ -20,8 +20,6 @@ package org.apache.flink.graph.gsa; ...@@ -20,8 +20,6 @@ package org.apache.flink.graph.gsa;
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple2;
import java.io.Serializable;
/** /**
* This class represents a <sourceVertex, edge> pair * This class represents a <sourceVertex, edge> pair
* This is a wrapper around Tuple2<VV, EV> for convenience in the GatherFunction * This is a wrapper around Tuple2<VV, EV> for convenience in the GatherFunction
...@@ -29,8 +27,7 @@ import java.io.Serializable; ...@@ -29,8 +27,7 @@ import java.io.Serializable;
* @param <EV> the edge value type * @param <EV> the edge value type
*/ */
@SuppressWarnings("serial") @SuppressWarnings("serial")
public class Neighbor<VV extends Serializable, EV extends Serializable> public class Neighbor<VV, EV> extends Tuple2<VV, EV> {
extends Tuple2<VV, EV> {
public Neighbor() {} public Neighbor() {}
......
...@@ -23,7 +23,7 @@ import org.apache.flink.api.common.functions.IterationRuntimeContext; ...@@ -23,7 +23,7 @@ import org.apache.flink.api.common.functions.IterationRuntimeContext;
import java.io.Serializable; import java.io.Serializable;
@SuppressWarnings("serial") @SuppressWarnings("serial")
public abstract class SumFunction<VV extends Serializable, EV extends Serializable, M> implements Serializable { public abstract class SumFunction<VV, EV, M> implements Serializable {
public abstract M sum(M arg0, M arg1); public abstract M sum(M arg0, M arg1);
......
...@@ -25,7 +25,6 @@ import org.apache.flink.graph.spargel.MessagingFunction; ...@@ -25,7 +25,6 @@ import org.apache.flink.graph.spargel.MessagingFunction;
import org.apache.flink.graph.spargel.VertexUpdateFunction; import org.apache.flink.graph.spargel.VertexUpdateFunction;
import org.apache.flink.types.NullValue; import org.apache.flink.types.NullValue;
import java.io.Serializable;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
...@@ -41,8 +40,7 @@ import java.util.Map.Entry; ...@@ -41,8 +40,7 @@ import java.util.Map.Entry;
* *
*/ */
@SuppressWarnings("serial") @SuppressWarnings("serial")
public class LabelPropagation<K extends Comparable<K> & Serializable> public class LabelPropagation<K> implements GraphAlgorithm<K, Long, NullValue> {
implements GraphAlgorithm<K, Long, NullValue> {
private final int maxIterations; private final int maxIterations;
...@@ -63,8 +61,7 @@ public class LabelPropagation<K extends Comparable<K> & Serializable> ...@@ -63,8 +61,7 @@ public class LabelPropagation<K extends Comparable<K> & Serializable>
* Function that updates the value of a vertex by adopting the most frequent * Function that updates the value of a vertex by adopting the most frequent
* label among its in-neighbors * label among its in-neighbors
*/ */
public static final class UpdateVertexLabel<K extends Comparable<K> & Serializable> public static final class UpdateVertexLabel<K> extends VertexUpdateFunction<K, Long, Long> {
extends VertexUpdateFunction<K, Long, Long> {
public void updateVertex(K vertexKey, Long vertexValue, public void updateVertex(K vertexKey, Long vertexValue,
MessageIterator<Long> inMessages) { MessageIterator<Long> inMessages) {
...@@ -105,8 +102,7 @@ public class LabelPropagation<K extends Comparable<K> & Serializable> ...@@ -105,8 +102,7 @@ public class LabelPropagation<K extends Comparable<K> & Serializable>
/** /**
* Sends the vertex label to all out-neighbors * Sends the vertex label to all out-neighbors
*/ */
public static final class SendNewLabelToNeighbors<K extends Comparable<K> & Serializable> public static final class SendNewLabelToNeighbors<K> extends MessagingFunction<K, Long, Long, NullValue> {
extends MessagingFunction<K, Long, Long, NullValue> {
public void sendMessages(K vertexKey, Long newLabel) { public void sendMessages(K vertexKey, Long newLabel) {
sendMessageToAllNeighbors(newLabel); sendMessageToAllNeighbors(newLabel);
......
...@@ -18,8 +18,6 @@ ...@@ -18,8 +18,6 @@
package org.apache.flink.graph.library; package org.apache.flink.graph.library;
import java.io.Serializable;
import org.apache.flink.graph.Edge; import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph; import org.apache.flink.graph.Graph;
import org.apache.flink.graph.GraphAlgorithm; import org.apache.flink.graph.GraphAlgorithm;
...@@ -27,8 +25,7 @@ import org.apache.flink.graph.spargel.MessageIterator; ...@@ -27,8 +25,7 @@ import org.apache.flink.graph.spargel.MessageIterator;
import org.apache.flink.graph.spargel.MessagingFunction; import org.apache.flink.graph.spargel.MessagingFunction;
import org.apache.flink.graph.spargel.VertexUpdateFunction; import org.apache.flink.graph.spargel.VertexUpdateFunction;
public class PageRank<K extends Comparable<K> & Serializable> implements public class PageRank<K> implements GraphAlgorithm<K, Double, Double> {
GraphAlgorithm<K, Double, Double> {
private double beta; private double beta;
private int maxIterations; private int maxIterations;
...@@ -51,8 +48,7 @@ public class PageRank<K extends Comparable<K> & Serializable> implements ...@@ -51,8 +48,7 @@ public class PageRank<K extends Comparable<K> & Serializable> implements
* ranks from all incoming messages and then applying the dampening formula. * ranks from all incoming messages and then applying the dampening formula.
*/ */
@SuppressWarnings("serial") @SuppressWarnings("serial")
public static final class VertexRankUpdater<K extends Comparable<K> & Serializable> public static final class VertexRankUpdater<K> extends VertexUpdateFunction<K, Double, Double> {
extends VertexUpdateFunction<K, Double, Double> {
private final double beta; private final double beta;
private final long numVertices; private final long numVertices;
...@@ -82,8 +78,7 @@ public class PageRank<K extends Comparable<K> & Serializable> implements ...@@ -82,8 +78,7 @@ public class PageRank<K extends Comparable<K> & Serializable> implements
* value. * value.
*/ */
@SuppressWarnings("serial") @SuppressWarnings("serial")
public static final class RankMessenger<K extends Comparable<K> & Serializable> public static final class RankMessenger<K> extends MessagingFunction<K, Double, Double, Double> {
extends MessagingFunction<K, Double, Double, Double> {
private final long numVertices; private final long numVertices;
......
...@@ -27,11 +27,8 @@ import org.apache.flink.graph.spargel.MessageIterator; ...@@ -27,11 +27,8 @@ import org.apache.flink.graph.spargel.MessageIterator;
import org.apache.flink.graph.spargel.MessagingFunction; import org.apache.flink.graph.spargel.MessagingFunction;
import org.apache.flink.graph.spargel.VertexUpdateFunction; import org.apache.flink.graph.spargel.VertexUpdateFunction;
import java.io.Serializable;
@SuppressWarnings("serial") @SuppressWarnings("serial")
public class SingleSourceShortestPaths<K extends Comparable<K> & Serializable> public class SingleSourceShortestPaths<K> implements GraphAlgorithm<K, Double, Double> {
implements GraphAlgorithm<K, Double, Double> {
private final K srcVertexId; private final K srcVertexId;
private final Integer maxIterations; private final Integer maxIterations;
...@@ -49,8 +46,7 @@ public class SingleSourceShortestPaths<K extends Comparable<K> & Serializable> ...@@ -49,8 +46,7 @@ public class SingleSourceShortestPaths<K extends Comparable<K> & Serializable>
maxIterations); maxIterations);
} }
public static final class InitVerticesMapper<K extends Comparable<K> & Serializable> public static final class InitVerticesMapper<K> implements MapFunction<Vertex<K, Double>, Double> {
implements MapFunction<Vertex<K, Double>, Double> {
private K srcVertexId; private K srcVertexId;
...@@ -73,8 +69,7 @@ public class SingleSourceShortestPaths<K extends Comparable<K> & Serializable> ...@@ -73,8 +69,7 @@ public class SingleSourceShortestPaths<K extends Comparable<K> & Serializable>
* *
* @param <K> * @param <K>
*/ */
public static final class VertexDistanceUpdater<K extends Comparable<K> & Serializable> public static final class VertexDistanceUpdater<K> extends VertexUpdateFunction<K, Double, Double> {
extends VertexUpdateFunction<K, Double, Double> {
@Override @Override
public void updateVertex(K vertexKey, Double vertexValue, public void updateVertex(K vertexKey, Double vertexValue,
...@@ -100,8 +95,7 @@ public class SingleSourceShortestPaths<K extends Comparable<K> & Serializable> ...@@ -100,8 +95,7 @@ public class SingleSourceShortestPaths<K extends Comparable<K> & Serializable>
* *
* @param <K> * @param <K>
*/ */
public static final class MinDistanceMessenger<K extends Comparable<K> & Serializable> public static final class MinDistanceMessenger<K> extends MessagingFunction<K, Double, Double, Double> {
extends MessagingFunction<K, Double, Double, Double> {
@Override @Override
public void sendMessages(K vertexKey, Double newDistance) public void sendMessages(K vertexKey, Double newDistance)
......
...@@ -38,8 +38,7 @@ import org.apache.flink.util.Collector; ...@@ -38,8 +38,7 @@ import org.apache.flink.util.Collector;
* @param <Message> The type of the message sent between vertices along the edges. * @param <Message> The type of the message sent between vertices along the edges.
* @param <EdgeValue> The type of the values that are associated with the edges. * @param <EdgeValue> The type of the values that are associated with the edges.
*/ */
public abstract class MessagingFunction<VertexKey extends Comparable<VertexKey> & Serializable, public abstract class MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> implements Serializable {
VertexValue extends Serializable, Message, EdgeValue extends Serializable> implements Serializable {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
...@@ -198,8 +197,7 @@ public abstract class MessagingFunction<VertexKey extends Comparable<VertexKey> ...@@ -198,8 +197,7 @@ public abstract class MessagingFunction<VertexKey extends Comparable<VertexKey>
this.edgesUsed = false; this.edgesUsed = false;
} }
private static final class EdgesIterator<VertexKey extends Comparable<VertexKey> & Serializable, private static final class EdgesIterator<VertexKey, EdgeValue>
EdgeValue extends Serializable>
implements Iterator<Edge<VertexKey, EdgeValue>>, Iterable<Edge<VertexKey, EdgeValue>> implements Iterator<Edge<VertexKey, EdgeValue>>, Iterable<Edge<VertexKey, EdgeValue>>
{ {
private Iterator<Edge<VertexKey, EdgeValue>> input; private Iterator<Edge<VertexKey, EdgeValue>> input;
......
...@@ -18,7 +18,6 @@ ...@@ -18,7 +18,6 @@
package org.apache.flink.graph.spargel; package org.apache.flink.graph.spargel;
import java.io.Serializable;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map; import java.util.Map;
...@@ -69,8 +68,7 @@ import org.apache.flink.util.Collector; ...@@ -69,8 +68,7 @@ import org.apache.flink.util.Collector;
* @param <Message> The type of the message sent between vertices along the edges. * @param <Message> The type of the message sent between vertices along the edges.
* @param <EdgeValue> The type of the values that are associated with the edges. * @param <EdgeValue> The type of the values that are associated with the edges.
*/ */
public class VertexCentricIteration<VertexKey extends Comparable<VertexKey> & Serializable, VertexValue extends Serializable, public class VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue>
Message, EdgeValue extends Serializable>
implements CustomUnaryOperation<Vertex<VertexKey, VertexValue>, Vertex<VertexKey, VertexValue>> implements CustomUnaryOperation<Vertex<VertexKey, VertexValue>, Vertex<VertexKey, VertexValue>>
{ {
private final VertexUpdateFunction<VertexKey, VertexValue, Message> updateFunction; private final VertexUpdateFunction<VertexKey, VertexValue, Message> updateFunction;
...@@ -218,8 +216,7 @@ public class VertexCentricIteration<VertexKey extends Comparable<VertexKey> & Se ...@@ -218,8 +216,7 @@ public class VertexCentricIteration<VertexKey extends Comparable<VertexKey> & Se
* *
* @return An in stance of the vertex-centric graph computation operator. * @return An in stance of the vertex-centric graph computation operator.
*/ */
public static final <VertexKey extends Comparable<VertexKey> & Serializable, VertexValue extends Serializable, public static final <VertexKey, VertexValue, Message, EdgeValue>
Message, EdgeValue extends Serializable>
VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue> withEdges( VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue> withEdges(
DataSet<Edge<VertexKey, EdgeValue>> edgesWithValue, DataSet<Edge<VertexKey, EdgeValue>> edgesWithValue,
VertexUpdateFunction<VertexKey, VertexValue, Message> uf, VertexUpdateFunction<VertexKey, VertexValue, Message> uf,
...@@ -233,8 +230,7 @@ public class VertexCentricIteration<VertexKey extends Comparable<VertexKey> & Se ...@@ -233,8 +230,7 @@ public class VertexCentricIteration<VertexKey extends Comparable<VertexKey> & Se
// Wrapping UDFs // Wrapping UDFs
// -------------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------------
private static final class VertexUpdateUdf<VertexKey extends Comparable<VertexKey> & Serializable, private static final class VertexUpdateUdf<VertexKey, VertexValue, Message>
VertexValue extends Serializable, Message>
extends RichCoGroupFunction<Tuple2<VertexKey, Message>, Vertex<VertexKey, VertexValue>, Vertex<VertexKey, VertexValue>> extends RichCoGroupFunction<Tuple2<VertexKey, Message>, Vertex<VertexKey, VertexValue>, Vertex<VertexKey, VertexValue>>
implements ResultTypeQueryable<Vertex<VertexKey, VertexValue>> implements ResultTypeQueryable<Vertex<VertexKey, VertexValue>>
{ {
...@@ -308,8 +304,7 @@ public class VertexCentricIteration<VertexKey extends Comparable<VertexKey> & Se ...@@ -308,8 +304,7 @@ public class VertexCentricIteration<VertexKey extends Comparable<VertexKey> & Se
/* /*
* UDF that encapsulates the message sending function for graphs where the edges have an associated value. * UDF that encapsulates the message sending function for graphs where the edges have an associated value.
*/ */
private static final class MessagingUdfWithEdgeValues<VertexKey extends Comparable<VertexKey> & Serializable, private static final class MessagingUdfWithEdgeValues<VertexKey, VertexValue, Message, EdgeValue>
VertexValue extends Serializable, Message, EdgeValue extends Serializable>
extends RichCoGroupFunction<Edge<VertexKey, EdgeValue>, Vertex<VertexKey, VertexValue>, Tuple2<VertexKey, Message>> extends RichCoGroupFunction<Edge<VertexKey, EdgeValue>, Vertex<VertexKey, VertexValue>, Tuple2<VertexKey, Message>>
implements ResultTypeQueryable<Tuple2<VertexKey, Message>> implements ResultTypeQueryable<Tuple2<VertexKey, Message>>
{ {
......
...@@ -36,8 +36,7 @@ import org.apache.flink.util.Collector; ...@@ -36,8 +36,7 @@ import org.apache.flink.util.Collector;
* <VertexValue> The vertex value type. * <VertexValue> The vertex value type.
* <Message> The message type. * <Message> The message type.
*/ */
public abstract class VertexUpdateFunction<VertexKey extends Comparable<VertexKey> & Serializable, public abstract class VertexUpdateFunction<VertexKey, VertexValue, Message> implements Serializable {
VertexValue extends Serializable, Message> implements Serializable {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
......
...@@ -18,14 +18,11 @@ ...@@ -18,14 +18,11 @@
package org.apache.flink.graph.utils; package org.apache.flink.graph.utils;
import java.io.Serializable;
import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.graph.Edge; import org.apache.flink.graph.Edge;
public class EdgeToTuple3Map<K extends Comparable<K> & Serializable, public class EdgeToTuple3Map<K, EV> implements MapFunction<Edge<K, EV>, Tuple3<K, K, EV>> {
EV extends Serializable> implements MapFunction<Edge<K, EV>, Tuple3<K, K, EV>> {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
......
...@@ -18,14 +18,11 @@ ...@@ -18,14 +18,11 @@
package org.apache.flink.graph.utils; package org.apache.flink.graph.utils;
import java.io.Serializable;
import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.graph.Vertex; import org.apache.flink.graph.Vertex;
public class Tuple2ToVertexMap<K extends Comparable<K> & Serializable, public class Tuple2ToVertexMap<K, VV> implements MapFunction<Tuple2<K, VV>, Vertex<K, VV>> {
VV extends Serializable> implements MapFunction<Tuple2<K, VV>, Vertex<K, VV>> {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
......
...@@ -18,8 +18,6 @@ ...@@ -18,8 +18,6 @@
package org.apache.flink.graph.utils; package org.apache.flink.graph.utils;
import java.io.Serializable;
import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.graph.Edge; import org.apache.flink.graph.Edge;
...@@ -30,8 +28,7 @@ import org.apache.flink.graph.Edge; ...@@ -30,8 +28,7 @@ import org.apache.flink.graph.Edge;
* @param <K> * @param <K>
* @param <EV> * @param <EV>
*/ */
public class Tuple3ToEdgeMap<K extends Comparable<K> & Serializable, public class Tuple3ToEdgeMap<K, EV> implements MapFunction<Tuple3<K, K, EV>, Edge<K, EV>> {
EV extends Serializable> implements MapFunction<Tuple3<K, K, EV>, Edge<K, EV>> {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
......
...@@ -18,14 +18,11 @@ ...@@ -18,14 +18,11 @@
package org.apache.flink.graph.utils; package org.apache.flink.graph.utils;
import java.io.Serializable;
import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.graph.Vertex; import org.apache.flink.graph.Vertex;
public class VertexToTuple2Map<K extends Comparable<K> & Serializable, public class VertexToTuple2Map<K, VV> implements MapFunction<Vertex<K, VV>, Tuple2<K, VV>> {
VV extends Serializable> implements MapFunction<Vertex<K, VV>, Tuple2<K, VV>> {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
......
...@@ -30,8 +30,7 @@ import org.apache.flink.graph.Graph; ...@@ -30,8 +30,7 @@ import org.apache.flink.graph.Graph;
* @param <EV> the edge value type * @param <EV> the edge value type
*/ */
@SuppressWarnings("serial") @SuppressWarnings("serial")
public abstract class GraphValidator<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable> public abstract class GraphValidator<K, VV, EV> implements Serializable {
implements Serializable {
public abstract boolean validate(Graph<K, VV, EV> graph) throws Exception; public abstract boolean validate(Graph<K, VV, EV> graph) throws Exception;
......
...@@ -28,11 +28,8 @@ import org.apache.flink.graph.Graph; ...@@ -28,11 +28,8 @@ import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex; import org.apache.flink.graph.Vertex;
import org.apache.flink.util.Collector; import org.apache.flink.util.Collector;
import java.io.Serializable;
@SuppressWarnings("serial") @SuppressWarnings("serial")
public class InvalidVertexIdsValidator<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable> public class InvalidVertexIdsValidator<K, VV, EV> extends GraphValidator<K, VV, EV> {
extends GraphValidator<K, VV, EV> {
/** /**
* Checks that the edge set input contains valid vertex Ids, i.e. that they * Checks that the edge set input contains valid vertex Ids, i.e. that they
...@@ -51,16 +48,14 @@ public class InvalidVertexIdsValidator<K extends Comparable<K> & Serializable, V ...@@ -51,16 +48,14 @@ public class InvalidVertexIdsValidator<K extends Comparable<K> & Serializable, V
return invalidIds.map(new KToTupleMap<K>()).count() == 0; return invalidIds.map(new KToTupleMap<K>()).count() == 0;
} }
private static final class MapEdgeIds<K extends Comparable<K> & Serializable, EV extends Serializable> private static final class MapEdgeIds<K, EV> implements FlatMapFunction<Edge<K, EV>, Tuple1<K>> {
implements FlatMapFunction<Edge<K, EV>, Tuple1<K>> {
public void flatMap(Edge<K, EV> edge, Collector<Tuple1<K>> out) { public void flatMap(Edge<K, EV> edge, Collector<Tuple1<K>> out) {
out.collect(new Tuple1<K>(edge.f0)); out.collect(new Tuple1<K>(edge.f0));
out.collect(new Tuple1<K>(edge.f1)); out.collect(new Tuple1<K>(edge.f1));
} }
} }
private static final class GroupInvalidIds<K extends Comparable<K> & Serializable, VV extends Serializable> private static final class GroupInvalidIds<K, VV> implements CoGroupFunction<Vertex<K, VV>, Tuple1<K>, K> {
implements CoGroupFunction<Vertex<K, VV>, Tuple1<K>, K> {
public void coGroup(Iterable<Vertex<K, VV>> vertexId, public void coGroup(Iterable<Vertex<K, VV>> vertexId,
Iterable<Tuple1<K>> edgeId, Collector<K> out) { Iterable<Tuple1<K>> edgeId, Collector<K> out) {
if (!(vertexId.iterator().hasNext())) { if (!(vertexId.iterator().hasNext())) {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.graph.test;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.graph.Graph;
import org.apache.flink.test.util.MultipleProgramsTestBase;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
public class WeaklyConnectedITCase extends MultipleProgramsTestBase {
public WeaklyConnectedITCase(TestExecutionMode mode){
super(mode);
}
private String resultPath;
private String expectedResult;
@Rule
public TemporaryFolder tempFolder = new TemporaryFolder();
@Before
public void before() throws Exception{
resultPath = tempFolder.newFile().toURI().toString();
}
@After
public void after() throws Exception{
compareResultsByLinesInMemory(expectedResult, resultPath);
}
@Test
public void testWithConnectedDirected() throws Exception {
/*
* Test isWeaklyConnected() with a connected, directed graph
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeData(env), env);
env.fromElements(graph.isWeaklyConnected(10)).writeAsText(resultPath);
env.execute();
expectedResult = "true\n";
}
@Test
public void testWithDisconnectedDirected() throws Exception {
/*
* Test isWeaklyConnected() with a disconnected, directed graph
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getDisconnectedLongLongEdgeData(env), env);
env.fromElements(graph.isWeaklyConnected(10)).writeAsText(resultPath);
env.execute();
expectedResult = "false\n";
}
@Test
public void testWithConnectedUndirected() throws Exception {
/*
* Test isWeaklyConnected() with a connected, undirected graph
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeData(env), env).getUndirected();
env.fromElements(graph.isWeaklyConnected(10)).writeAsText(resultPath);
env.execute();
expectedResult = "true\n";
}
@Test
public void testWithDisconnectedUndirected() throws Exception {
/*
* Test isWeaklyConnected() with a disconnected, undirected graph
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getDisconnectedLongLongEdgeData(env), env).getUndirected();
env.fromElements(graph.isWeaklyConnected(10)).writeAsText(resultPath);
env.execute();
expectedResult = "false\n";
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册