diff --git a/docs/libs/gelly_guide.md b/docs/libs/gelly_guide.md index fa49dcececcd6730e6584139adfb1e4e14601cee..19d442aafb5a6a3d95e677acc1bdeb7c71df1c48 100644 --- a/docs/libs/gelly_guide.md +++ b/docs/libs/gelly_guide.md @@ -251,14 +251,26 @@ Gelly includes the following methods for adding and removing vertices and edges // adds a Vertex and the given edges to the Graph. If the Vertex already exists, it will not be added again, but the given edges will. Graph addVertex(final Vertex vertex, List> edges) +// adds a data set of vertices and a list of edges to the Graph. If the vertices already exist in the graph, they will not be added once more, however the edges will. +Graph addVertices(DataSet> verticesToAdd, List> edges) + // adds an Edge to the Graph. If the source and target vertices do not exist in the graph, they will also be added. Graph addEdge(Vertex source, Vertex target, EV edgeValue) +// adds a data set of edges to the Graph. If the vertices already exist in the graph, they will not be added, however the edges will. +Graph addEdges(DataSet> newEdges, DataSet> newVertices) + // removes the given Vertex and its edges from the Graph. Graph removeVertex(Vertex vertex) +// removes the given data set of Vertices and their edges from the Graph +Graph removeVertices(DataSet> verticesToBeRemoved) + // removes *all* edges that match the given Edge from the Graph. Graph removeEdge(Edge edge) + +// removes *all* edges that match the edges in the given data set +Graph removeEdges(DataSet> edgesToBeRemoved) {% endhighlight %} Neighborhood Methods diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java index 6b632bc52541fc537827f5ffbc0a6a0c2ba42a7b..151450de60a29cabbce345a4ce4a61910ddc822e 100755 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java @@ -27,6 +27,7 @@ import java.util.Arrays; import org.apache.flink.api.common.functions.CoGroupFunction; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.FlatJoinFunction; +import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.GroupReduceFunction; @@ -1009,22 +1010,38 @@ public class Graph { * will. * * @param vertex the vertex to add to the graph - * @param edges a list of edges to add to the grap - * @return the new graph containing the existing and newly added vertices + * @param edges a list of edges to add to the graph + * @return the new graph containing the existing and newly added vertex * and edges */ @SuppressWarnings("unchecked") public Graph addVertex(final Vertex vertex, List> edges) { DataSet> newVertex = this.context.fromElements(vertex); - // Take care of empty edge set + return addVertices(newVertex, edges); + } + + /** + * Adds the data set of vertices and the edges, passed as input, to the graph. + * If the vertices already exist in the graph, they will not be added once more, + * however the edges will. + * + * @param verticesToAdd the data set of vertices to add + * @param edges a list of edges to add to the graph + * @return the new graph containing the existing and newly added vertices + * and edges + */ + @SuppressWarnings("unchecked") + public Graph addVertices(DataSet> verticesToAdd, List> edges) { + + // Consider empty edge set if (edges.isEmpty()) { - return new Graph(this.vertices.union(newVertex) + return new Graph(this.vertices.union(verticesToAdd) .distinct(), this.edges, this.context); } // Add the vertex and its edges - DataSet> newVertices = this.vertices.union(newVertex).distinct(); + DataSet> newVertices = this.vertices.union(verticesToAdd).distinct(); DataSet> newEdges = this.edges.union(context.fromCollection(edges)); return new Graph(newVertices, newEdges, this.context); @@ -1048,6 +1065,23 @@ public class Graph { return this.union(partialGraph); } + /** + * Adds the given data set of edges to the graph. if the vertices do not already exist in the + * graph, they will also be added. + * + * If the vertex values are not required during the computation, it is recommended to use + * addEdges(edges) + * + * @param newEdges the data set of edges to be added + * @param newVertices their corresponding vertices and vertexValues + * @return a new graph containing the existing vertices and edges plus the newly added edges and vertices. + */ + @SuppressWarnings("unchecked") + public Graph addEdges(DataSet> newEdges, DataSet> newVertices) { + Graph partialGraph = fromDataSet(newVertices, newEdges, context); + return this.union(partialGraph); + } + /** * Removes the given vertex and its edges from the graph. * @@ -1100,6 +1134,55 @@ public class Graph { } /** + * Removes the given data set of vertices and its edges from the graph. + * + * @param verticesToBeRemoved the data set of vertices to be removed + * @return the resulted graph containing the initial vertices and edges minus the vertices + * and edges removed. + */ + public Graph removeVertices(DataSet> verticesToBeRemoved) { + + DataSet> newVertices = getVertices().coGroup(verticesToBeRemoved).where(0).equalTo(0) + .with(new VerticesRemovalCoGroup()); + + DataSet < Edge < K, EV >> newEdges = newVertices.join(getEdges()).where(0).equalTo(0) + // if the edge source was removed, the edge will also be removed + .with(new ProjectEdgeToBeRemoved()) + // if the edge target was removed, the edge will also be removed + .join(newVertices).where(1).equalTo(0) + .with(new ProjectEdge()); + + return new Graph(newVertices, newEdges, context); + } + + private static final class VerticesRemovalCoGroup implements CoGroupFunction, Vertex, Vertex> { + + @Override + public void coGroup(Iterable> vertex, Iterable> vertexToBeRemoved, + Collector> out) throws Exception { + + final Iterator> vertexIterator = vertex.iterator(); + final Iterator> vertexToBeRemovedIterator = vertexToBeRemoved.iterator(); + Vertex next; + + if (vertexIterator.hasNext()) { + if (!vertexToBeRemovedIterator.hasNext()) { + next = vertexIterator.next(); + out.collect(next); + } + } + } + } + + @ForwardedFieldsSecond("f0; f1; f2") + private static final class ProjectEdgeToBeRemoved implements JoinFunction, Edge, Edge> { + @Override + public Edge join(Vertex vertex, Edge edge) throws Exception { + return edge; + } + } + + /** * Removes all edges that match the given edge from the graph. * * @param edge the edge to remove @@ -1126,6 +1209,39 @@ public class Graph { } } + /** + * Removes all the edges that match the edges in the given data set from the graph. + * + * @param edgesToBeRemoved the data set of edges to be removed + * @return a new graph where the edges have been removed and in which the vertices remained intact + */ + public Graph removeEdges(DataSet> edgesToBeRemoved) { + + DataSet> newEdges = getEdges().coGroup(edgesToBeRemoved).where(0,1).equalTo(0,1) + .with(new EdgeRemovalCoGroup()); + + return new Graph(this.vertices, newEdges, context); + } + + private static final class EdgeRemovalCoGroup implements CoGroupFunction, Edge, Edge> { + + @Override + public void coGroup(Iterable> edge, Iterable> edgeToBeRemoved, + Collector> out) throws Exception { + + final Iterator> edgeIterator = edge.iterator(); + final Iterator> edgeToBeRemovedIterator = edgeToBeRemoved.iterator(); + Edge next; + + if (edgeIterator.hasNext()) { + if (!edgeToBeRemovedIterator.hasNext()) { + next = edgeIterator.next(); + out.collect(next); + } + } + } + } + /** * Performs union on the vertices and edges sets of the input graphs * removing duplicate vertices but maintaining duplicate edges. diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphMutationsITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphMutationsITCase.java index dce34a80bcc439cb3d23b226f1fac13d29085e2d..6aa4ef3fc4c2d569628f7e0128e64fb26bc31d07 100644 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphMutationsITCase.java +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphMutationsITCase.java @@ -21,12 +21,14 @@ package org.apache.flink.graph.test.operations; import java.util.ArrayList; import java.util.List; +import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; import org.apache.flink.graph.Vertex; import org.apache.flink.graph.test.TestGraphUtils; import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.apache.flink.types.NullValue; import org.junit.After; import org.junit.Before; import org.junit.Rule; @@ -85,6 +87,39 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase { "6,1,61\n"; } + @Test + public void testAddVertices() throws Exception { + /* + * Test addVertices() -- simple case + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + List> edges = new ArrayList>(); + edges.add(new Edge(6L, 1L, 61L)); + edges.add(new Edge(7L, 1L, 71L)); + + DataSet> newVertices = env.fromElements(new Vertex(6L, 6L), + new Vertex(7L, 7L)); + graph = graph.addVertices(newVertices, edges); + + graph.getEdges().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2,12\n" + + "1,3,13\n" + + "2,3,23\n" + + "3,4,34\n" + + "3,5,35\n" + + "4,5,45\n" + + "5,1,51\n" + + "6,1,61\n" + + "7,1,71\n"; + } + @Test public void testAddVertexExisting() throws Exception { /* @@ -111,6 +146,72 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase { "5,1,51\n"; } + @Test + public void testAddVerticesBothExisting() throws Exception { + /* + * Test addVertices() -- add two existing vertices + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + List> edges = new ArrayList>(); + edges.add(new Edge(1L, 5L, 15L)); + edges.add(new Edge(3L, 1L, 31L)); + + DataSet> newVertices = env.fromElements(new Vertex(1L, 1L), + new Vertex(3L, 3L)); + graph = graph.addVertices(newVertices, edges); + + graph.getEdges().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2,12\n" + + "1,3,13\n" + + "1,5,15\n" + + "2,3,23\n" + + "3,4,34\n" + + "3,5,35\n" + + "3,1,31\n" + + "4,5,45\n" + + "5,1,51\n"; + } + + @Test + public void testAddVerticesOneExisting() throws Exception { + /* + * Test addVertices() -- add an existing vertex + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + List> edges = new ArrayList>(); + edges.add(new Edge(1L, 5L, 15L)); + edges.add(new Edge(6L, 1L, 61L)); + + DataSet> newVertices = env.fromElements(new Vertex(1L, 1L), + new Vertex(6L, 6L)); + graph = graph.addVertices(newVertices, edges); + + graph.getEdges().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2,12\n" + + "1,3,13\n" + + "1,5,15\n" + + "2,3,23\n" + + "3,4,34\n" + + "3,5,35\n" + + "4,5,45\n" + + "5,1,51\n" + + "6,1,61\n"; + } + @Test public void testAddVertexNoEdges() throws Exception { /* @@ -134,6 +235,34 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase { "6,6\n"; } + @Test + public void testAddVerticesNoEdges() throws Exception { + /* + * Test addVertices() -- add vertices with empty edge set + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + List> edges = new ArrayList>(); + + DataSet> newVertices = env.fromElements(new Vertex(6L, 6L), + new Vertex(7L, 7L)); + graph = graph.addVertices(newVertices, edges); + + graph.getVertices().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,1\n" + + "2,2\n" + + "3,3\n" + + "4,4\n" + + "5,5\n" + + "6,6\n" + + "7,7\n"; + } + @Test public void testRemoveVertex() throws Exception { /* @@ -154,6 +283,28 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase { "3,4,34\n"; } + @Test + public void testRemoveVertices() throws Exception { + /* + * Test removeVertices() -- simple case + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + DataSet> verticesToBeRemoved = env.fromElements(new Vertex(1L, 1L), + new Vertex(2L, 2L)); + + graph = graph.removeVertices(verticesToBeRemoved); + graph.getEdges().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "3,4,34\n" + + "3,5,35\n" + + "4,5,45\n"; + } + @Test public void testRemoveInvalidVertex() throws Exception { /* @@ -176,6 +327,79 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase { "4,5,45\n" + "5,1,51\n"; } + + @Test + public void testRemoveOneValidOneInvalidVertex() throws Exception { + /* + * Test removeVertices() -- remove one invalid vertex and a valid one + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + DataSet> verticesToBeRemoved = env.fromElements(new Vertex(1L, 1L), + new Vertex(7L, 7L)); + + graph = graph.removeVertices(verticesToBeRemoved); + graph.getEdges().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "2,3,23\n" + + "3,4,34\n" + + "3,5,35\n" + + "4,5,45\n"; + } + + @Test + public void testRemoveBothInvalidVertices() throws Exception { + /* + * Test removeVertices() -- remove two invalid vertices + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + DataSet> verticesToBeRemoved = env.fromElements(new Vertex(6L, 6L), + new Vertex(7L, 7L)); + + graph = graph.removeVertices(verticesToBeRemoved); + graph.getEdges().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2,12\n" + + "1,3,13\n" + + "2,3,23\n" + + "3,4,34\n" + + "3,5,35\n" + + "4,5,45\n" + + "5,1,51\n"; + } + + @Test + public void testRemoveBothInvalidVerticesVertexResult() throws Exception { + /* + * Test removeVertices() -- remove two invalid vertices and verify the data set of vertices + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + DataSet> verticesToBeRemoved = env.fromElements(new Vertex(6L, 6L), + new Vertex(7L, 7L)); + + graph = graph.removeVertices(verticesToBeRemoved); + graph.getVertices().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,1\n" + + "2,2\n" + + "3,3\n" + + "4,4\n" + + "5,5\n"; + } @Test public void testAddEdge() throws Exception { @@ -201,7 +425,39 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase { "5,1,51\n" + "6,1,61\n"; } - + + @Test + public void testAddEdges() throws Exception { + /* + * Test addEdges() -- simple case + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet> edgesToBeAdded = env.fromElements(new Edge(6L, 1L, 61L), + new Edge(7L, 1L, 71L)); + DataSet> verticesToBeAdded = env.fromElements(new Vertex(6L, 6L), + new Vertex(7L, 7L)); + + graph = graph.addEdges(edgesToBeAdded, verticesToBeAdded); + + graph.getEdges().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2,12\n" + + "1,3,13\n" + + "2,3,23\n" + + "3,4,34\n" + + "3,5,35\n" + + "4,5,45\n" + + "5,1,51\n" + + "6,1,61\n" + + "7,1,71\n"; + } + @Test public void testAddExistingEdge() throws Exception { /* @@ -226,9 +482,9 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase { "4,5,45\n" + "5,1,51\n"; } - + @Test - public void testRemoveVEdge() throws Exception { + public void testRemoveEdge() throws Exception { /* * Test removeEdge() -- simple case */ @@ -248,7 +504,56 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase { "3,5,35\n" + "4,5,45\n"; } - + + @Test + public void testRemoveEdges() throws Exception { + /* + * Test removeEdges() -- simple case + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + DataSet> edgesToBeRemoved = env.fromElements(new Edge(5L, 1L, 51L), + new Edge(2L, 3L, 23L)); + + graph = graph.removeEdges(edgesToBeRemoved); + graph.getEdges().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2,12\n" + + "1,3,13\n" + + "3,4,34\n" + + "3,5,35\n" + + "4,5,45\n"; + } + + @Test + public void testRemoveSameEdgeTwice() throws Exception { + /* + * Test removeEdges() -- try to remove the same edge twice + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + DataSet> edgesToBeRemoved = env.fromElements(new Edge(5L, 1L, 51L), + new Edge(5L, 1L, 51L)); + + graph = graph.removeEdges(edgesToBeRemoved); + graph.getEdges().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2,12\n" + + "1,3,13\n" + + "2,3,23\n" + + "3,4,34\n" + + "3,5,35\n" + + "4,5,45\n"; + } + @Test public void testRemoveInvalidEdge() throws Exception { /* @@ -271,4 +576,30 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase { "4,5,45\n" + "5,1,51\n"; } + + @Test + public void testRemoveOneValidOneInvalidEdge() throws Exception { + /* + * Test removeEdges() -- one edge is valid, the other is invalid + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + DataSet> edgesToBeRemoved = env.fromElements(new Edge(1L, 1L, 51L), + new Edge(6L, 1L, 61L)); + + graph = graph.removeEdges(edgesToBeRemoved); + graph.getEdges().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2,12\n" + + "1,3,13\n" + + "2,3,23\n" + + "3,4,34\n" + + "3,5,35\n" + + "4,5,45\n" + + "5,1,51\n"; + } } \ No newline at end of file