diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Edge.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Edge.java index 1bef5b1e3ed6c56412c3fa7a35b1081068b2d4ac..d84badb511a920cacc20139a58ff7468b688e008 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Edge.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Edge.java @@ -18,8 +18,6 @@ package org.apache.flink.graph; -import java.io.Serializable; - import org.apache.flink.api.java.tuple.Tuple3; /** @@ -30,8 +28,7 @@ import org.apache.flink.api.java.tuple.Tuple3; * @param the key type for the sources and target vertices * @param the edge value type */ -public class Edge & Serializable, V extends Serializable> - extends Tuple3{ +public class Edge extends Tuple3{ private static final long serialVersionUID = 1L; diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunction.java index aac63db7fb14ce4aae4fda7b5c75775859136755..bf1d6a2245afde7e358f54d0db82e4375313be7a 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunction.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunction.java @@ -32,8 +32,7 @@ import org.apache.flink.util.Collector; * @param the edge value type * @param the type of the return value */ -public interface EdgesFunction & Serializable, - EV extends Serializable, O> extends Function, Serializable { +public interface EdgesFunction extends Function, Serializable { void iterateEdges(Iterable>> edges, Collector out) throws Exception; } diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunctionWithVertexValue.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunctionWithVertexValue.java index f4f43208e56c3ce3a6ab851c1ad80c80e405e59b..0b0ab0e65286b6af245618aa2f5828092c834278 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunctionWithVertexValue.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunctionWithVertexValue.java @@ -33,8 +33,7 @@ import org.apache.flink.util.Collector; * @param the edge value type * @param the type of the return value */ -public interface EdgesFunctionWithVertexValue & Serializable, - VV extends Serializable, EV extends Serializable, O> extends Function, Serializable { +public interface EdgesFunctionWithVertexValue extends Function, Serializable { void iterateEdges(Vertex v, Iterable> edges, Collector out) throws Exception; } diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java index 1c0052d5a8806075e5505f1cb3a91240d5ea4df7..490658c7be974f572dc96920489c4ff09dff722f 100755 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java @@ -18,7 +18,6 @@ package org.apache.flink.graph; -import java.io.Serializable; import java.util.Collection; import java.util.Iterator; import java.util.NoSuchElementException; @@ -30,17 +29,13 @@ import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.FlatJoinFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.FlatMapFunction; -import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.functions.ReduceFunction; -import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.aggregation.Aggregations; import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst; -import org.apache.flink.api.java.operators.DeltaIteration; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; @@ -77,7 +72,7 @@ import org.apache.flink.types.NullValue; * @param the value type for edges */ @SuppressWarnings("serial") -public class Graph & Serializable, VV extends Serializable, EV extends Serializable> { +public class Graph { private final ExecutionEnvironment context; private final DataSet> vertices; @@ -104,9 +99,8 @@ public class Graph & Serializable, VV extends Serializab * @param context the flink execution environment. * @return the newly created graph. */ - public static & Serializable, VV extends Serializable, EV extends Serializable> Graph fromCollection( - Collection> vertices, Collection> edges, - ExecutionEnvironment context) { + public static Graph fromCollection(Collection> vertices, + Collection> edges, ExecutionEnvironment context) { return fromDataSet(context.fromCollection(vertices), context.fromCollection(edges), context); @@ -121,8 +115,8 @@ public class Graph & Serializable, VV extends Serializab * @param context the flink execution environment. * @return the newly created graph. */ - public static & Serializable, EV extends Serializable> Graph fromCollection( - Collection> edges, ExecutionEnvironment context) { + public static Graph fromCollection(Collection> edges, + ExecutionEnvironment context) { return fromDataSet(context.fromCollection(edges), context); } @@ -138,9 +132,8 @@ public class Graph & Serializable, VV extends Serializab * @param context the flink execution environment. * @return the newly created graph. */ - public static & Serializable, VV extends Serializable, EV extends Serializable> Graph fromCollection( - Collection> edges, final MapFunction mapper, - ExecutionEnvironment context) { + public static Graph fromCollection(Collection> edges, + final MapFunction mapper,ExecutionEnvironment context) { return fromDataSet(context.fromCollection(edges), mapper, context); } @@ -153,9 +146,8 @@ public class Graph & Serializable, VV extends Serializab * @param context the flink execution environment. * @return the newly created graph. */ - public static & Serializable, VV extends Serializable, EV extends Serializable> Graph fromDataSet( - DataSet> vertices, DataSet> edges, - ExecutionEnvironment context) { + public static Graph fromDataSet(DataSet> vertices, + DataSet> edges, ExecutionEnvironment context) { return new Graph(vertices, edges, context); } @@ -169,7 +161,7 @@ public class Graph & Serializable, VV extends Serializab * @param context the flink execution environment. * @return the newly created graph. */ - public static & Serializable, EV extends Serializable> Graph fromDataSet( + public static Graph fromDataSet( DataSet> edges, ExecutionEnvironment context) { DataSet> vertices = edges.flatMap(new EmitSrcAndTarget()).distinct(); @@ -177,8 +169,8 @@ public class Graph & Serializable, VV extends Serializab return new Graph(vertices, edges, context); } - private static final class EmitSrcAndTarget & Serializable, EV extends Serializable> - implements FlatMapFunction, Vertex> { + private static final class EmitSrcAndTarget implements FlatMapFunction< + Edge, Vertex> { public void flatMap(Edge edge, Collector> out) { out.collect(new Vertex(edge.f0, NullValue.getInstance())); @@ -197,8 +189,8 @@ public class Graph & Serializable, VV extends Serializab * @param context the flink execution environment. * @return the newly created graph. */ - public static & Serializable, VV extends Serializable, EV extends Serializable> Graph fromDataSet( - DataSet> edges, final MapFunction mapper, ExecutionEnvironment context) { + public static Graph fromDataSet(DataSet> edges, + final MapFunction mapper, ExecutionEnvironment context) { TypeInformation keyType = ((TupleTypeInfo) edges.getType()).getTypeAt(0); @@ -220,8 +212,8 @@ public class Graph & Serializable, VV extends Serializab return new Graph(vertices, edges, context); } - private static final class EmitSrcAndTargetAsTuple1 & Serializable, EV extends Serializable> - implements FlatMapFunction, Tuple1> { + private static final class EmitSrcAndTargetAsTuple1 implements FlatMapFunction< + Edge, Tuple1> { public void flatMap(Edge edge, Collector> out) { out.collect(new Tuple1(edge.f0)); @@ -240,8 +232,8 @@ public class Graph & Serializable, VV extends Serializab * @param context the flink execution environment. * @return the newly created graph. */ - public static & Serializable, VV extends Serializable, EV extends Serializable> Graph fromTupleDataSet( - DataSet> vertices, DataSet> edges, ExecutionEnvironment context) { + public static Graph fromTupleDataSet(DataSet> vertices, + DataSet> edges, ExecutionEnvironment context) { DataSet> vertexDataSet = vertices.map(new Tuple2ToVertexMap()); DataSet> edgeDataSet = edges.map(new Tuple3ToEdgeMap()); @@ -259,8 +251,8 @@ public class Graph & Serializable, VV extends Serializab * @param context the flink execution environment. * @return the newly created graph. */ - public static & Serializable, EV extends Serializable> Graph fromTupleDataSet( - DataSet> edges, ExecutionEnvironment context) { + public static Graph fromTupleDataSet(DataSet> edges, + ExecutionEnvironment context) { DataSet> edgeDataSet = edges.map(new Tuple3ToEdgeMap()); return fromDataSet(edgeDataSet, context); @@ -278,8 +270,8 @@ public class Graph & Serializable, VV extends Serializab * @param context the flink execution environment. * @return the newly created graph. */ - public static & Serializable, VV extends Serializable, EV extends Serializable> Graph fromTupleDataSet( - DataSet> edges, final MapFunction mapper, ExecutionEnvironment context) { + public static Graph fromTupleDataSet(DataSet> edges, + final MapFunction mapper, ExecutionEnvironment context) { DataSet> edgeDataSet = edges.map(new Tuple3ToEdgeMap()); return fromDataSet(edgeDataSet, mapper, context); @@ -367,7 +359,7 @@ public class Graph & Serializable, VV extends Serializab * @return a new graph */ @SuppressWarnings({ "unchecked", "rawtypes" }) - public Graph mapVertices(final MapFunction, NV> mapper) { + public Graph mapVertices(final MapFunction, NV> mapper) { TypeInformation keyType = ((TupleTypeInfo) vertices.getType()).getTypeAt(0); @@ -393,7 +385,7 @@ public class Graph & Serializable, VV extends Serializab * @return a new graph */ @SuppressWarnings({ "unchecked", "rawtypes" }) - public Graph mapEdges(final MapFunction, NV> mapper) { + public Graph mapEdges(final MapFunction, NV> mapper) { TypeInformation keyType = ((TupleTypeInfo) edges.getType()).getTypeAt(0); @@ -430,7 +422,7 @@ public class Graph & Serializable, VV extends Serializab return new Graph(resultedVertices, this.edges, this.context); } - private static final class ApplyCoGroupToVertexValues & Serializable, VV extends Serializable, T> + private static final class ApplyCoGroupToVertexValues implements CoGroupFunction, Tuple2, Vertex> { private MapFunction, VV> mapper; @@ -479,7 +471,7 @@ public class Graph & Serializable, VV extends Serializab return new Graph(this.vertices, resultedEdges, this.context); } - private static final class ApplyCoGroupToEdgeValues & Serializable, EV extends Serializable, T> + private static final class ApplyCoGroupToEdgeValues implements CoGroupFunction, Tuple3, Edge> { private MapFunction, EV> mapper; @@ -530,7 +522,7 @@ public class Graph & Serializable, VV extends Serializab return new Graph(this.vertices, resultedEdges, this.context); } - private static final class ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget & Serializable, EV extends Serializable, T> + private static final class ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget implements CoGroupFunction, Tuple2, Edge> { private MapFunction, EV> mapper; @@ -643,8 +635,8 @@ public class Graph & Serializable, VV extends Serializab } @ForwardedFieldsFirst("0->0;1->1;2->2") - private static final class ProjectEdge & Serializable, VV extends Serializable, EV extends Serializable> - implements FlatJoinFunction, Vertex, Edge> { + private static final class ProjectEdge implements FlatJoinFunction< + Edge, Vertex, Edge> { public void join(Edge first, Vertex second, Collector> out) { out.collect(first); } @@ -660,7 +652,7 @@ public class Graph & Serializable, VV extends Serializab return vertices.coGroup(edges).where(0).equalTo(0).with(new CountNeighborsCoGroup()); } - private static final class CountNeighborsCoGroup & Serializable, VV extends Serializable, EV extends Serializable> + private static final class CountNeighborsCoGroup implements CoGroupFunction, Edge, Tuple2> { @SuppressWarnings("unused") public void coGroup(Iterable> vertex, Iterable> outEdges, @@ -772,8 +764,8 @@ public class Graph & Serializable, VV extends Serializab } } - private static final class ProjectVertexIdMap & Serializable, EV extends Serializable> - implements MapFunction, Tuple2>> { + private static final class ProjectVertexIdMap implements MapFunction< + Edge, Tuple2>> { private int fieldPosition; @@ -787,8 +779,8 @@ public class Graph & Serializable, VV extends Serializab } } - private static final class ProjectVertexWithEdgeValueMap & Serializable, EV extends Serializable> - implements MapFunction, Tuple2> { + private static final class ProjectVertexWithEdgeValueMap implements MapFunction< + Edge, Tuple2> { private int fieldPosition; @@ -802,8 +794,8 @@ public class Graph & Serializable, VV extends Serializab } } - private static final class ApplyGroupReduceFunction & Serializable, EV extends Serializable, T> - implements GroupReduceFunction>, T>, ResultTypeQueryable { + private static final class ApplyGroupReduceFunction implements GroupReduceFunction< + Tuple2>, T>, ResultTypeQueryable { private EdgesFunction function; @@ -821,32 +813,35 @@ public class Graph & Serializable, VV extends Serializab } } - private static final class EmitOneEdgePerNode & Serializable, VV extends Serializable, EV extends Serializable> - implements FlatMapFunction, Tuple2>> { + private static final class EmitOneEdgePerNode implements FlatMapFunction< + Edge, Tuple2>> { + public void flatMap(Edge edge, Collector>> out) { out.collect(new Tuple2>(edge.getSource(), edge)); out.collect(new Tuple2>(edge.getTarget(), edge)); } } - private static final class EmitOneVertexWithEdgeValuePerNode & Serializable, VV extends Serializable, EV extends Serializable> - implements FlatMapFunction, Tuple2> { + private static final class EmitOneVertexWithEdgeValuePerNode implements FlatMapFunction< + Edge, Tuple2> { + public void flatMap(Edge edge, Collector> out) { out.collect(new Tuple2(edge.getSource(), edge.getValue())); out.collect(new Tuple2(edge.getTarget(), edge.getValue())); } } - private static final class EmitOneEdgeWithNeighborPerNode & Serializable, VV extends Serializable, EV extends Serializable> - implements FlatMapFunction, Tuple3>> { + private static final class EmitOneEdgeWithNeighborPerNode implements FlatMapFunction< + Edge, Tuple3>> { + public void flatMap(Edge edge, Collector>> out) { out.collect(new Tuple3>(edge.getSource(), edge.getTarget(), edge)); out.collect(new Tuple3>(edge.getTarget(), edge.getSource(), edge)); } } - private static final class ApplyCoGroupFunction & Serializable, VV extends Serializable, EV extends Serializable, T> - implements CoGroupFunction, Edge, T>, ResultTypeQueryable { + private static final class ApplyCoGroupFunction implements CoGroupFunction< + Vertex, Edge, T>, ResultTypeQueryable { private EdgesFunctionWithVertexValue function; @@ -866,7 +861,7 @@ public class Graph & Serializable, VV extends Serializab } } - private static final class ApplyCoGroupFunctionOnAllEdges & Serializable, VV extends Serializable, EV extends Serializable, T> + private static final class ApplyCoGroupFunctionOnAllEdges implements CoGroupFunction, Tuple2>, T>, ResultTypeQueryable { private EdgesFunctionWithVertexValue function; @@ -915,7 +910,7 @@ public class Graph & Serializable, VV extends Serializab } @ForwardedFields("0->1;1->0;2->2") - private static final class ReverseEdgesMap & Serializable, EV extends Serializable> + private static final class ReverseEdgesMap implements MapFunction, Edge> { public Edge map(Edge value) { @@ -955,7 +950,7 @@ public class Graph & Serializable, VV extends Serializab return vertices.map(new ExtractVertexIDMapper()); } - private static final class ExtractVertexIDMapper & Serializable, VV extends Serializable> + private static final class ExtractVertexIDMapper implements MapFunction, K> { @Override public K map(Vertex vertex) { @@ -970,7 +965,7 @@ public class Graph & Serializable, VV extends Serializab return edges.map(new ExtractEdgeIDsMapper()); } - private static final class ExtractEdgeIDsMapper & Serializable, EV extends Serializable> + private static final class ExtractEdgeIDsMapper implements MapFunction, Tuple2> { @Override public Tuple2 map(Edge edge) throws Exception { @@ -978,67 +973,6 @@ public class Graph & Serializable, VV extends Serializab } } - /** - * Checks the weak connectivity of a graph. - * - * @param maxIterations - * the maximum number of iterations for the inner delta iteration - * @return true if the graph is weakly connected. - */ - public boolean isWeaklyConnected(int maxIterations) throws Exception { - // first, convert to an undirected graph - Graph graph = this.getUndirected(); - - DataSet vertexIds = graph.getVertexIds(); - DataSet> verticesWithInitialIds = vertexIds - .map(new DuplicateVertexIDMapper()); - - DataSet> edgeIds = graph.getEdgeIds(); - - DeltaIteration, Tuple2> iteration = verticesWithInitialIds - .iterateDelta(verticesWithInitialIds, maxIterations, 0); - - DataSet> changes = iteration.getWorkset() - .join(edgeIds, JoinHint.REPARTITION_SORT_MERGE) - .where(0).equalTo(0).with(new FindNeighborsJoin()) - .groupBy(0).aggregate(Aggregations.MIN, 1) - .join(iteration.getSolutionSet(), JoinHint.REPARTITION_SORT_MERGE).where(0).equalTo(0) - .with(new VertexWithNewComponentJoin()); - - DataSet> components = iteration.closeWith(changes, changes); - return components.groupBy(1).reduceGroup(new EmitFirstReducer()).count() == 1; - } - - private static final class DuplicateVertexIDMapper implements MapFunction> { - @Override - public Tuple2 map(K k) { - return new Tuple2(k, k); - } - } - - private static final class FindNeighborsJoin implements JoinFunction, Tuple2, Tuple2> { - @Override - public Tuple2 join(Tuple2 vertexWithComponent, Tuple2 edge) { - return new Tuple2(edge.f1, vertexWithComponent.f1); - } - } - - private static final class VertexWithNewComponentJoin> - implements FlatJoinFunction, Tuple2, Tuple2> { - @Override - public void join(Tuple2 candidate, Tuple2 old, Collector> out) { - if (candidate.f1.compareTo(old.f1) < 0) { - out.collect(candidate); - } - } - } - - private static final class EmitFirstReducer implements GroupReduceFunction, Tuple2> { - public void reduce(Iterable> values, Collector> out) { - out.collect(values.iterator().next()); - } - } - /** * Adds the input vertex and edges to the graph. If the vertex already * exists in the graph, it will not be added again, but the given edges @@ -1098,7 +1032,7 @@ public class Graph & Serializable, VV extends Serializab return new Graph(newVertices, newEdges, this.context); } - private static final class RemoveVertexFilter & Serializable, VV extends Serializable> + private static final class RemoveVertexFilter implements FilterFunction> { private Vertex vertexToRemove; @@ -1113,7 +1047,7 @@ public class Graph & Serializable, VV extends Serializab } } - private static final class VertexRemovalEdgeFilter & Serializable, VV extends Serializable, EV extends Serializable> + private static final class VertexRemovalEdgeFilter implements FilterFunction> { private Vertex vertexToRemove; @@ -1147,7 +1081,7 @@ public class Graph & Serializable, VV extends Serializab return new Graph(this.vertices, newEdges, this.context); } - private static final class EdgeRemovalEdgeFilter & Serializable, EV extends Serializable> + private static final class EdgeRemovalEdgeFilter implements FilterFunction> { private Edge edgeToRemove; @@ -1336,7 +1270,7 @@ public class Graph & Serializable, VV extends Serializab } } - private static final class ApplyNeighborGroupReduceFunction & Serializable, VV extends Serializable, EV extends Serializable, T> + private static final class ApplyNeighborGroupReduceFunction implements GroupReduceFunction, Vertex>, T>, ResultTypeQueryable { private NeighborsFunction function; @@ -1355,7 +1289,7 @@ public class Graph & Serializable, VV extends Serializab } } - private static final class ProjectVertexWithNeighborValueJoin & Serializable, VV extends Serializable, EV extends Serializable> + private static final class ProjectVertexWithNeighborValueJoin implements FlatJoinFunction, Vertex, Tuple2> { private int fieldPosition; @@ -1371,8 +1305,9 @@ public class Graph & Serializable, VV extends Serializab } } - private static final class ProjectVertexIdJoin & Serializable, VV extends Serializable, EV extends Serializable> - implements FlatJoinFunction, Vertex, Tuple3, Vertex>> { + private static final class ProjectVertexIdJoin implements FlatJoinFunction< + Edge, Vertex, Tuple3, Vertex>> { + private int fieldPosition; public ProjectVertexIdJoin(int position) { this.fieldPosition = position; @@ -1384,8 +1319,8 @@ public class Graph & Serializable, VV extends Serializab } } - private static final class ProjectNeighborValue & Serializable, VV extends Serializable, EV extends Serializable> - implements FlatJoinFunction>, Vertex, Tuple2> { + private static final class ProjectNeighborValue implements FlatJoinFunction< + Tuple3>, Vertex, Tuple2> { public void join(Tuple3> keysWithEdge, Vertex neighbor, Collector> out) { @@ -1394,8 +1329,8 @@ public class Graph & Serializable, VV extends Serializab } } - private static final class ProjectEdgeWithNeighbor & Serializable, VV extends Serializable, EV extends Serializable> - implements FlatJoinFunction>, Vertex, Tuple3, Vertex>> { + private static final class ProjectEdgeWithNeighbor implements FlatJoinFunction< + Tuple3>, Vertex, Tuple3, Vertex>> { public void join(Tuple3> keysWithEdge, Vertex neighbor, Collector, Vertex>> out) { @@ -1403,8 +1338,8 @@ public class Graph & Serializable, VV extends Serializab } } - private static final class ApplyNeighborCoGroupFunction & Serializable, VV extends Serializable, EV extends Serializable, T> - implements CoGroupFunction, Tuple2, Vertex>, T>, ResultTypeQueryable { + private static final class ApplyNeighborCoGroupFunction implements CoGroupFunction< + Vertex, Tuple2, Vertex>, T>, ResultTypeQueryable { private NeighborsFunctionWithVertexValue function; @@ -1423,7 +1358,7 @@ public class Graph & Serializable, VV extends Serializab } } - private static final class ApplyCoGroupFunctionOnAllNeighbors & Serializable, VV extends Serializable, EV extends Serializable, T> + private static final class ApplyCoGroupFunctionOnAllNeighbors implements CoGroupFunction, Tuple3, Vertex>, T>, ResultTypeQueryable { private NeighborsFunctionWithVertexValue function; @@ -1513,8 +1448,7 @@ public class Graph & Serializable, VV extends Serializab } @ForwardedFields("f0") - private static final class ApplyNeighborReduceFunction & Serializable, VV extends Serializable> - implements ReduceFunction> { + private static final class ApplyNeighborReduceFunction implements ReduceFunction> { private ReduceNeighborsFunction function; @@ -1561,8 +1495,7 @@ public class Graph & Serializable, VV extends Serializab } @ForwardedFields("f0") - private static final class ApplyReduceFunction & Serializable, EV extends Serializable> - implements ReduceFunction> { + private static final class ApplyReduceFunction implements ReduceFunction> { private ReduceEdgesFunction function; diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphAlgorithm.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphAlgorithm.java index ceeeaf4a2a6750f5541a6f36486f9c484411ad79..04181d56e7a15e61da342bffe6985a9fe2c15c3c 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphAlgorithm.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphAlgorithm.java @@ -18,14 +18,12 @@ package org.apache.flink.graph; -import java.io.Serializable; - /** * @param key type * @param vertex value type * @param edge value type */ -public interface GraphAlgorithm & Serializable, VV extends Serializable, EV extends Serializable> { +public interface GraphAlgorithm { public Graph run(Graph input) throws Exception; } diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunction.java index b43f9d11bd4c4b30cd604eb1663768a78b0b9954..a21b23d7732378023baa1596d64d2d3a3251c977 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunction.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunction.java @@ -34,8 +34,7 @@ import org.apache.flink.util.Collector; * @param the edge value type * @param the type of the return value */ -public interface NeighborsFunction & Serializable, VV extends Serializable, - EV extends Serializable, O> extends Function, Serializable { +public interface NeighborsFunction extends Function, Serializable { void iterateNeighbors(Iterable, Vertex>> neighbors, Collector out) throws Exception; } diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunctionWithVertexValue.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunctionWithVertexValue.java index 32d184dffcaa8acdb3dd90ab1cbe2b06621e3476..fdf54fa4fdaa4ffa78b703a2561e6f6afe93c06d 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunctionWithVertexValue.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunctionWithVertexValue.java @@ -34,8 +34,7 @@ import org.apache.flink.util.Collector; * @param the edge value type * @param the type of the return value */ -public interface NeighborsFunctionWithVertexValue & Serializable, VV extends Serializable, - EV extends Serializable, O> extends Function, Serializable { +public interface NeighborsFunctionWithVertexValue extends Function, Serializable { void iterateNeighbors(Vertex vertex, Iterable, Vertex>> neighbors, Collector out) throws Exception; } diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceEdgesFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceEdgesFunction.java index 707efbfdea7f7dc96aa0abdcda71074ee83a1b8f..84eec51f16ba2e3d3314964007dc8713c432d204 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceEdgesFunction.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceEdgesFunction.java @@ -28,7 +28,7 @@ import java.io.Serializable; * * @param the edge value type */ -public interface ReduceEdgesFunction extends Function, Serializable { +public interface ReduceEdgesFunction extends Function, Serializable { EV reduceEdges(EV firstEdgeValue, EV secondEdgeValue); } diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceNeighborsFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceNeighborsFunction.java index 4b5a93038ba528f069aa1578db0fc9eaa7f470ff..fc5295dd7b7393bbfd84b18380844494a5b94cd4 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceNeighborsFunction.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceNeighborsFunction.java @@ -29,7 +29,7 @@ import java.io.Serializable; * * @param the vertex value type */ -public interface ReduceNeighborsFunction extends Function, Serializable { +public interface ReduceNeighborsFunction extends Function, Serializable { VV reduceNeighbors(VV firstNeighborValue, VV secondNeighborValue); } diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Triplet.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Triplet.java index b85987dc62636a5530476d534d3dec3fca197781..dee34802c54248133a481d8aeccc9e484a0a4cf2 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Triplet.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Triplet.java @@ -20,8 +20,6 @@ package org.apache.flink.graph; import org.apache.flink.api.java.tuple.Tuple5; -import java.io.Serializable; - /** * A Triplet stores and retrieves the edges along with their corresponding source and target vertices. * Triplets can be obtained from the input graph via the {@link org.apache.flink.graph.Graph#getTriplets()} method. @@ -30,8 +28,7 @@ import java.io.Serializable; * @param the vertex value type * @param the edge value type */ -public class Triplet & Serializable, VV extends Serializable, EV extends Serializable> - extends Tuple5 { +public class Triplet extends Tuple5 { private static final long serialVersionUID = 1L; diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Vertex.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Vertex.java index 2f718438bc7e98bae36aca009ccdf9413d8ca180..c5eb97393ae942ee48c4652bf504ff3acd543c36 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Vertex.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Vertex.java @@ -18,8 +18,6 @@ package org.apache.flink.graph; -import java.io.Serializable; - import org.apache.flink.api.java.tuple.Tuple2; /** @@ -29,8 +27,7 @@ import org.apache.flink.api.java.tuple.Tuple2; * @param * @param */ -public class Vertex & Serializable, V extends Serializable> - extends Tuple2 { +public class Vertex extends Tuple2 { private static final long serialVersionUID = 1L; diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java index a4963e0fa0e31e853c905a55381de218d950326e..7d24253d7581beeafd9fd93abf97a1890d24f108 100755 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java @@ -25,8 +25,7 @@ import org.apache.flink.util.Collector; import java.io.Serializable; @SuppressWarnings("serial") -public abstract class ApplyFunction & Serializable, VV extends Serializable, M> - implements Serializable { +public abstract class ApplyFunction implements Serializable { public abstract void apply(M newValue, VV currentValue); diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java index 4ffae8d2d7c63d33b9c9ad66c8dd0f5ebd069e40..1c4b2c429f24b6043d7d95ed814576c35406333a 100755 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java @@ -23,7 +23,7 @@ import org.apache.flink.api.common.functions.IterationRuntimeContext; import java.io.Serializable; @SuppressWarnings("serial") -public abstract class GatherFunction implements Serializable { +public abstract class GatherFunction implements Serializable { public abstract M gather(Neighbor neighbor); diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java index 22be591696537a5a2de4583b35e0d6310938f3ac..1de38390ffa85aa5e883a208760c18f437b82bfe 100755 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java @@ -39,8 +39,6 @@ import org.apache.flink.graph.Edge; import org.apache.flink.graph.Vertex; import org.apache.flink.util.Collector; -import java.io.Serializable; - /** * This class represents iterative graph computations, programmed in a gather-sum-apply perspective. * @@ -49,8 +47,7 @@ import java.io.Serializable; * @param The type of the edge value in the graph * @param The intermediate type used by the gather, sum and apply functions */ -public class GatherSumApplyIteration & Serializable, - VV extends Serializable, EV extends Serializable, M> implements CustomUnaryOperation, +public class GatherSumApplyIteration implements CustomUnaryOperation, Vertex> { private DataSet> vertexDataSet; @@ -159,10 +156,10 @@ public class GatherSumApplyIteration & Serializable, * * @return An in stance of the gather-sum-apply graph computation operator. */ - public static final & Serializable, VV extends Serializable, EV extends Serializable, M> - GatherSumApplyIteration withEdges(DataSet> edges, - GatherFunction gather, SumFunction sum, ApplyFunction apply, - int maximumNumberOfIterations) { + public static final GatherSumApplyIteration + withEdges(DataSet> edges, GatherFunction gather, + SumFunction sum, ApplyFunction apply, int maximumNumberOfIterations) { + return new GatherSumApplyIteration(gather, sum, apply, edges, maximumNumberOfIterations); } @@ -172,8 +169,7 @@ public class GatherSumApplyIteration & Serializable, @SuppressWarnings("serial") @ForwardedFields("f0") - private static final class GatherUdf & Serializable, VV extends Serializable, - EV extends Serializable, M> extends RichMapFunction>, + private static final class GatherUdf extends RichMapFunction>, Tuple2> implements ResultTypeQueryable> { private final GatherFunction gatherFunction; @@ -210,8 +206,7 @@ public class GatherSumApplyIteration & Serializable, } @SuppressWarnings("serial") - private static final class SumUdf & Serializable, VV extends Serializable, - EV extends Serializable, M> extends RichReduceFunction> + private static final class SumUdf extends RichReduceFunction> implements ResultTypeQueryable>{ private final SumFunction sumFunction; @@ -249,8 +244,7 @@ public class GatherSumApplyIteration & Serializable, } @SuppressWarnings("serial") - private static final class ApplyUdf & Serializable, - VV extends Serializable, EV extends Serializable, M> extends RichFlatJoinFunction, + private static final class ApplyUdf extends RichFlatJoinFunction, Vertex, Vertex> implements ResultTypeQueryable> { private final ApplyFunction applyFunction; @@ -289,8 +283,7 @@ public class GatherSumApplyIteration & Serializable, @SuppressWarnings("serial") @ForwardedFieldsSecond("f1->f0") - private static final class ProjectKeyWithNeighbor & Serializable, - VV extends Serializable, EV extends Serializable> implements FlatJoinFunction< + private static final class ProjectKeyWithNeighbor implements FlatJoinFunction< Vertex, Edge, Tuple2>> { public void join(Vertex vertex, Edge edge, Collector>> out) { diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/Neighbor.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/Neighbor.java index 5a06af98dfdb3951134bf4cbe2276fd99f3726ed..7fa1ed2227be832ed994b063953cccf558574f5c 100755 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/Neighbor.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/Neighbor.java @@ -20,8 +20,6 @@ package org.apache.flink.graph.gsa; import org.apache.flink.api.java.tuple.Tuple2; -import java.io.Serializable; - /** * This class represents a pair * This is a wrapper around Tuple2 for convenience in the GatherFunction @@ -29,8 +27,7 @@ import java.io.Serializable; * @param the edge value type */ @SuppressWarnings("serial") -public class Neighbor - extends Tuple2 { +public class Neighbor extends Tuple2 { public Neighbor() {} diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java index 4836af687c24406e97c207adddd5db639344c4b2..0a5e4aee06a7236e6b4783c25c14db14f75ccb48 100755 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java @@ -23,7 +23,7 @@ import org.apache.flink.api.common.functions.IterationRuntimeContext; import java.io.Serializable; @SuppressWarnings("serial") -public abstract class SumFunction implements Serializable { +public abstract class SumFunction implements Serializable { public abstract M sum(M arg0, M arg1); diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java index ff6fe85c1296dbe4e97e1d817c45c33ffcdbef57..d63a4c3a9850f6e6533301f9855678f7d1ef2d80 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java @@ -25,7 +25,6 @@ import org.apache.flink.graph.spargel.MessagingFunction; import org.apache.flink.graph.spargel.VertexUpdateFunction; import org.apache.flink.types.NullValue; -import java.io.Serializable; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; @@ -41,8 +40,7 @@ import java.util.Map.Entry; * */ @SuppressWarnings("serial") -public class LabelPropagation & Serializable> - implements GraphAlgorithm { +public class LabelPropagation implements GraphAlgorithm { private final int maxIterations; @@ -63,8 +61,7 @@ public class LabelPropagation & Serializable> * Function that updates the value of a vertex by adopting the most frequent * label among its in-neighbors */ - public static final class UpdateVertexLabel & Serializable> - extends VertexUpdateFunction { + public static final class UpdateVertexLabel extends VertexUpdateFunction { public void updateVertex(K vertexKey, Long vertexValue, MessageIterator inMessages) { @@ -105,8 +102,7 @@ public class LabelPropagation & Serializable> /** * Sends the vertex label to all out-neighbors */ - public static final class SendNewLabelToNeighbors & Serializable> - extends MessagingFunction { + public static final class SendNewLabelToNeighbors extends MessagingFunction { public void sendMessages(K vertexKey, Long newLabel) { sendMessageToAllNeighbors(newLabel); diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java index 48c9a513f5fa74028117ff595397c15894198ba1..bb0a1d1aec76829a7b00bde4a4d96f30593c0c45 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java @@ -18,8 +18,6 @@ package org.apache.flink.graph.library; -import java.io.Serializable; - import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; import org.apache.flink.graph.GraphAlgorithm; @@ -27,8 +25,7 @@ import org.apache.flink.graph.spargel.MessageIterator; import org.apache.flink.graph.spargel.MessagingFunction; import org.apache.flink.graph.spargel.VertexUpdateFunction; -public class PageRank & Serializable> implements - GraphAlgorithm { +public class PageRank implements GraphAlgorithm { private double beta; private int maxIterations; @@ -51,8 +48,7 @@ public class PageRank & Serializable> implements * ranks from all incoming messages and then applying the dampening formula. */ @SuppressWarnings("serial") - public static final class VertexRankUpdater & Serializable> - extends VertexUpdateFunction { + public static final class VertexRankUpdater extends VertexUpdateFunction { private final double beta; private final long numVertices; @@ -82,8 +78,7 @@ public class PageRank & Serializable> implements * value. */ @SuppressWarnings("serial") - public static final class RankMessenger & Serializable> - extends MessagingFunction { + public static final class RankMessenger extends MessagingFunction { private final long numVertices; diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java index 262b2c59134eb853facfa22eccb4847a974c1bf2..f4f8b27dd26abdcec49fcff9587e8929f15696b8 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java @@ -27,11 +27,8 @@ import org.apache.flink.graph.spargel.MessageIterator; import org.apache.flink.graph.spargel.MessagingFunction; import org.apache.flink.graph.spargel.VertexUpdateFunction; -import java.io.Serializable; - @SuppressWarnings("serial") -public class SingleSourceShortestPaths & Serializable> - implements GraphAlgorithm { +public class SingleSourceShortestPaths implements GraphAlgorithm { private final K srcVertexId; private final Integer maxIterations; @@ -49,8 +46,7 @@ public class SingleSourceShortestPaths & Serializable> maxIterations); } - public static final class InitVerticesMapper & Serializable> - implements MapFunction, Double> { + public static final class InitVerticesMapper implements MapFunction, Double> { private K srcVertexId; @@ -73,8 +69,7 @@ public class SingleSourceShortestPaths & Serializable> * * @param */ - public static final class VertexDistanceUpdater & Serializable> - extends VertexUpdateFunction { + public static final class VertexDistanceUpdater extends VertexUpdateFunction { @Override public void updateVertex(K vertexKey, Double vertexValue, @@ -100,8 +95,7 @@ public class SingleSourceShortestPaths & Serializable> * * @param */ - public static final class MinDistanceMessenger & Serializable> - extends MessagingFunction { + public static final class MinDistanceMessenger extends MessagingFunction { @Override public void sendMessages(K vertexKey, Double newDistance) diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java index e8a297f5d654e0324096bc25901b068748f34ed6..b7e74e38dfc108ab29185f41dbfaff60ba7b9124 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java @@ -38,8 +38,7 @@ import org.apache.flink.util.Collector; * @param The type of the message sent between vertices along the edges. * @param The type of the values that are associated with the edges. */ -public abstract class MessagingFunction & Serializable, - VertexValue extends Serializable, Message, EdgeValue extends Serializable> implements Serializable { +public abstract class MessagingFunction implements Serializable { private static final long serialVersionUID = 1L; @@ -198,8 +197,7 @@ public abstract class MessagingFunction this.edgesUsed = false; } - private static final class EdgesIterator & Serializable, - EdgeValue extends Serializable> + private static final class EdgesIterator implements Iterator>, Iterable> { private Iterator> input; diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java index ca66521276c2b3eeaa2cbc6f354b19fb6f8892e5..5ad1420b1d161ee34594cb20df4b2302e069827b 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java @@ -18,7 +18,6 @@ package org.apache.flink.graph.spargel; -import java.io.Serializable; import java.util.Iterator; import java.util.Map; @@ -69,8 +68,7 @@ import org.apache.flink.util.Collector; * @param The type of the message sent between vertices along the edges. * @param The type of the values that are associated with the edges. */ -public class VertexCentricIteration & Serializable, VertexValue extends Serializable, - Message, EdgeValue extends Serializable> +public class VertexCentricIteration implements CustomUnaryOperation, Vertex> { private final VertexUpdateFunction updateFunction; @@ -218,8 +216,7 @@ public class VertexCentricIteration & Se * * @return An in stance of the vertex-centric graph computation operator. */ - public static final & Serializable, VertexValue extends Serializable, - Message, EdgeValue extends Serializable> + public static final VertexCentricIteration withEdges( DataSet> edgesWithValue, VertexUpdateFunction uf, @@ -233,8 +230,7 @@ public class VertexCentricIteration & Se // Wrapping UDFs // -------------------------------------------------------------------------------------------- - private static final class VertexUpdateUdf & Serializable, - VertexValue extends Serializable, Message> + private static final class VertexUpdateUdf extends RichCoGroupFunction, Vertex, Vertex> implements ResultTypeQueryable> { @@ -308,8 +304,7 @@ public class VertexCentricIteration & Se /* * UDF that encapsulates the message sending function for graphs where the edges have an associated value. */ - private static final class MessagingUdfWithEdgeValues & Serializable, - VertexValue extends Serializable, Message, EdgeValue extends Serializable> + private static final class MessagingUdfWithEdgeValues extends RichCoGroupFunction, Vertex, Tuple2> implements ResultTypeQueryable> { diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java index 5a7cd5cb0a5d7995d52d0a38ce7fc6b4f8dd41b8..561c87a5c25443350c13cbfc77db57c221ee47c6 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java @@ -36,8 +36,7 @@ import org.apache.flink.util.Collector; * The vertex value type. * The message type. */ -public abstract class VertexUpdateFunction & Serializable, - VertexValue extends Serializable, Message> implements Serializable { +public abstract class VertexUpdateFunction implements Serializable { private static final long serialVersionUID = 1L; diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/EdgeToTuple3Map.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/EdgeToTuple3Map.java index a7b7b62f71feea1ce65dafdfbefc4284ede079ae..c83fc9c5ce7db83ccc6afa64c4d929ed3c461d2e 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/EdgeToTuple3Map.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/EdgeToTuple3Map.java @@ -18,14 +18,11 @@ package org.apache.flink.graph.utils; -import java.io.Serializable; - import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.graph.Edge; -public class EdgeToTuple3Map & Serializable, - EV extends Serializable> implements MapFunction, Tuple3> { +public class EdgeToTuple3Map implements MapFunction, Tuple3> { private static final long serialVersionUID = 1L; diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple2ToVertexMap.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple2ToVertexMap.java index d58e4ff36fc5a525cea85df64df072a0d7ccbb1d..f9645dc8583e8b1eebe2b54b316849ac3c980e2a 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple2ToVertexMap.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple2ToVertexMap.java @@ -18,14 +18,11 @@ package org.apache.flink.graph.utils; -import java.io.Serializable; - import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.graph.Vertex; -public class Tuple2ToVertexMap & Serializable, - VV extends Serializable> implements MapFunction, Vertex> { +public class Tuple2ToVertexMap implements MapFunction, Vertex> { private static final long serialVersionUID = 1L; diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple3ToEdgeMap.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple3ToEdgeMap.java index 3668dd28ec0de987ef44ab95678d4126258fe822..afeff8921ccea7bf9c18fcb24b846fe7679272a5 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple3ToEdgeMap.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple3ToEdgeMap.java @@ -18,8 +18,6 @@ package org.apache.flink.graph.utils; -import java.io.Serializable; - import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.graph.Edge; @@ -30,8 +28,7 @@ import org.apache.flink.graph.Edge; * @param * @param */ -public class Tuple3ToEdgeMap & Serializable, - EV extends Serializable> implements MapFunction, Edge> { +public class Tuple3ToEdgeMap implements MapFunction, Edge> { private static final long serialVersionUID = 1L; diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/VertexToTuple2Map.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/VertexToTuple2Map.java index 318e1ede6209ec6023cbb9a34b84582e2bc9e527..9ce6f33c1819e4b9ae63670f08dd47cae1eb35ec 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/VertexToTuple2Map.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/VertexToTuple2Map.java @@ -18,14 +18,11 @@ package org.apache.flink.graph.utils; -import java.io.Serializable; - import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.graph.Vertex; -public class VertexToTuple2Map & Serializable, - VV extends Serializable> implements MapFunction, Tuple2> { +public class VertexToTuple2Map implements MapFunction, Tuple2> { private static final long serialVersionUID = 1L; diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/GraphValidator.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/GraphValidator.java index 101e82c35b0ca2d9eb52529be6ffeaf5ce788e4c..75b672c872b424ed410dec5615edbf7e212d7326 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/GraphValidator.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/GraphValidator.java @@ -30,8 +30,7 @@ import org.apache.flink.graph.Graph; * @param the edge value type */ @SuppressWarnings("serial") -public abstract class GraphValidator & Serializable, VV extends Serializable, EV extends Serializable> - implements Serializable { +public abstract class GraphValidator implements Serializable { public abstract boolean validate(Graph graph) throws Exception; diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java index cc06ca71de1eb8000444268ad254294e554df80f..33d469b9ae6d62161aef67acce4c30b3ec42ad75 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java @@ -28,11 +28,8 @@ import org.apache.flink.graph.Graph; import org.apache.flink.graph.Vertex; import org.apache.flink.util.Collector; -import java.io.Serializable; - @SuppressWarnings("serial") -public class InvalidVertexIdsValidator & Serializable, VV extends Serializable, EV extends Serializable> - extends GraphValidator { +public class InvalidVertexIdsValidator extends GraphValidator { /** * Checks that the edge set input contains valid vertex Ids, i.e. that they @@ -51,16 +48,14 @@ public class InvalidVertexIdsValidator & Serializable, V return invalidIds.map(new KToTupleMap()).count() == 0; } - private static final class MapEdgeIds & Serializable, EV extends Serializable> - implements FlatMapFunction, Tuple1> { + private static final class MapEdgeIds implements FlatMapFunction, Tuple1> { public void flatMap(Edge edge, Collector> out) { out.collect(new Tuple1(edge.f0)); out.collect(new Tuple1(edge.f1)); } } - private static final class GroupInvalidIds & Serializable, VV extends Serializable> - implements CoGroupFunction, Tuple1, K> { + private static final class GroupInvalidIds implements CoGroupFunction, Tuple1, K> { public void coGroup(Iterable> vertexId, Iterable> edgeId, Collector out) { if (!(vertexId.iterator().hasNext())) { diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/WeaklyConnectedITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/WeaklyConnectedITCase.java deleted file mode 100644 index 9db449e3e4520cf9c03a7d22c606cdaaed2a030e..0000000000000000000000000000000000000000 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/WeaklyConnectedITCase.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.test; - -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.graph.Graph; -import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -@RunWith(Parameterized.class) -public class WeaklyConnectedITCase extends MultipleProgramsTestBase { - - public WeaklyConnectedITCase(TestExecutionMode mode){ - super(mode); - } - - private String resultPath; - private String expectedResult; - - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - - @Before - public void before() throws Exception{ - resultPath = tempFolder.newFile().toURI().toString(); - } - - @After - public void after() throws Exception{ - compareResultsByLinesInMemory(expectedResult, resultPath); - } - - @Test - public void testWithConnectedDirected() throws Exception { - /* - * Test isWeaklyConnected() with a connected, directed graph - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - env.fromElements(graph.isWeaklyConnected(10)).writeAsText(resultPath); - - env.execute(); - expectedResult = "true\n"; - } - - @Test - public void testWithDisconnectedDirected() throws Exception { - /* - * Test isWeaklyConnected() with a disconnected, directed graph - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getDisconnectedLongLongEdgeData(env), env); - - env.fromElements(graph.isWeaklyConnected(10)).writeAsText(resultPath); - - env.execute(); - expectedResult = "false\n"; - } - - @Test - public void testWithConnectedUndirected() throws Exception { - /* - * Test isWeaklyConnected() with a connected, undirected graph - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env).getUndirected(); - - env.fromElements(graph.isWeaklyConnected(10)).writeAsText(resultPath); - - env.execute(); - expectedResult = "true\n"; - } - - @Test - public void testWithDisconnectedUndirected() throws Exception { - /* - * Test isWeaklyConnected() with a disconnected, undirected graph - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getDisconnectedLongLongEdgeData(env), env).getUndirected(); - - env.fromElements(graph.isWeaklyConnected(10)).writeAsText(resultPath); - - env.execute(); - expectedResult = "false\n"; - } -}