提交 edcfd8bb 编写于 作者: A andralungu 提交者: vasia

[FLINK-2012] [gelly] Added methods to remove/add multiple edges/vertices

This squashes the following commits:

[gelly] Removed trailing comment

[gelly] Made remove methods use a coGroup fun
上级 b68049b7
......@@ -251,14 +251,26 @@ Gelly includes the following methods for adding and removing vertices and edges
// adds a Vertex and the given edges to the Graph. If the Vertex already exists, it will not be added again, but the given edges will.
Graph<K, VV, EV> addVertex(final Vertex<K, VV> vertex, List<Edge<K, EV>> edges)
// adds a data set of vertices and a list of edges to the Graph. If the vertices already exist in the graph, they will not be added once more, however the edges will.
Graph<K, VV, EV> addVertices(DataSet<Vertex<K, VV>> verticesToAdd, List<Edge<K, EV>> edges)
// adds an Edge to the Graph. If the source and target vertices do not exist in the graph, they will also be added.
Graph<K, VV, EV> addEdge(Vertex<K, VV> source, Vertex<K, VV> target, EV edgeValue)
// adds a data set of edges to the Graph. If the vertices already exist in the graph, they will not be added, however the edges will.
Graph<K, VV, EV> addEdges(DataSet<Edge<K, EV>> newEdges, DataSet<Vertex<K, VV>> newVertices)
// removes the given Vertex and its edges from the Graph.
Graph<K, VV, EV> removeVertex(Vertex<K, VV> vertex)
// removes the given data set of Vertices and their edges from the Graph
Graph<K, VV, EV> removeVertices(DataSet<Vertex<K, VV>> verticesToBeRemoved)
// removes *all* edges that match the given Edge from the Graph.
Graph<K, VV, EV> removeEdge(Edge<K, EV> edge)
// removes *all* edges that match the edges in the given data set
Graph<K, VV, EV> removeEdges(DataSet<Edge<K, EV>> edgesToBeRemoved)
{% endhighlight %}
Neighborhood Methods
......
......@@ -27,6 +27,7 @@ import java.util.Arrays;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
......@@ -1009,22 +1010,38 @@ public class Graph<K, VV, EV> {
* will.
*
* @param vertex the vertex to add to the graph
* @param edges a list of edges to add to the grap
* @return the new graph containing the existing and newly added vertices
* @param edges a list of edges to add to the graph
* @return the new graph containing the existing and newly added vertex
* and edges
*/
@SuppressWarnings("unchecked")
public Graph<K, VV, EV> addVertex(final Vertex<K, VV> vertex, List<Edge<K, EV>> edges) {
DataSet<Vertex<K, VV>> newVertex = this.context.fromElements(vertex);
// Take care of empty edge set
return addVertices(newVertex, edges);
}
/**
* Adds the data set of vertices and the edges, passed as input, to the graph.
* If the vertices already exist in the graph, they will not be added once more,
* however the edges will.
*
* @param verticesToAdd the data set of vertices to add
* @param edges a list of edges to add to the graph
* @return the new graph containing the existing and newly added vertices
* and edges
*/
@SuppressWarnings("unchecked")
public Graph<K, VV, EV> addVertices(DataSet<Vertex<K, VV>> verticesToAdd, List<Edge<K, EV>> edges) {
// Consider empty edge set
if (edges.isEmpty()) {
return new Graph<K, VV, EV>(this.vertices.union(newVertex)
return new Graph<K, VV, EV>(this.vertices.union(verticesToAdd)
.distinct(), this.edges, this.context);
}
// Add the vertex and its edges
DataSet<Vertex<K, VV>> newVertices = this.vertices.union(newVertex).distinct();
DataSet<Vertex<K, VV>> newVertices = this.vertices.union(verticesToAdd).distinct();
DataSet<Edge<K, EV>> newEdges = this.edges.union(context.fromCollection(edges));
return new Graph<K, VV, EV>(newVertices, newEdges, this.context);
......@@ -1048,6 +1065,23 @@ public class Graph<K, VV, EV> {
return this.union(partialGraph);
}
/**
* Adds the given data set of edges to the graph. if the vertices do not already exist in the
* graph, they will also be added.
*
* If the vertex values are not required during the computation, it is recommended to use
* addEdges(edges)
*
* @param newEdges the data set of edges to be added
* @param newVertices their corresponding vertices and vertexValues
* @return a new graph containing the existing vertices and edges plus the newly added edges and vertices.
*/
@SuppressWarnings("unchecked")
public Graph<K, VV, EV> addEdges(DataSet<Edge<K, EV>> newEdges, DataSet<Vertex<K, VV>> newVertices) {
Graph<K, VV, EV> partialGraph = fromDataSet(newVertices, newEdges, context);
return this.union(partialGraph);
}
/**
* Removes the given vertex and its edges from the graph.
*
......@@ -1100,6 +1134,55 @@ public class Graph<K, VV, EV> {
}
/**
* Removes the given data set of vertices and its edges from the graph.
*
* @param verticesToBeRemoved the data set of vertices to be removed
* @return the resulted graph containing the initial vertices and edges minus the vertices
* and edges removed.
*/
public Graph<K, VV, EV> removeVertices(DataSet<Vertex<K, VV>> verticesToBeRemoved) {
DataSet<Vertex<K, VV>> newVertices = getVertices().coGroup(verticesToBeRemoved).where(0).equalTo(0)
.with(new VerticesRemovalCoGroup<K, VV>());
DataSet < Edge < K, EV >> newEdges = newVertices.join(getEdges()).where(0).equalTo(0)
// if the edge source was removed, the edge will also be removed
.with(new ProjectEdgeToBeRemoved<K, VV, EV>())
// if the edge target was removed, the edge will also be removed
.join(newVertices).where(1).equalTo(0)
.with(new ProjectEdge<K, VV, EV>());
return new Graph<K, VV, EV>(newVertices, newEdges, context);
}
private static final class VerticesRemovalCoGroup<K, VV> implements CoGroupFunction<Vertex<K, VV>, Vertex<K, VV>, Vertex<K, VV>> {
@Override
public void coGroup(Iterable<Vertex<K, VV>> vertex, Iterable<Vertex<K, VV>> vertexToBeRemoved,
Collector<Vertex<K, VV>> out) throws Exception {
final Iterator<Vertex<K, VV>> vertexIterator = vertex.iterator();
final Iterator<Vertex<K, VV>> vertexToBeRemovedIterator = vertexToBeRemoved.iterator();
Vertex<K, VV> next;
if (vertexIterator.hasNext()) {
if (!vertexToBeRemovedIterator.hasNext()) {
next = vertexIterator.next();
out.collect(next);
}
}
}
}
@ForwardedFieldsSecond("f0; f1; f2")
private static final class ProjectEdgeToBeRemoved<K,VV,EV> implements JoinFunction<Vertex<K, VV>, Edge<K, EV>, Edge<K, EV>> {
@Override
public Edge<K, EV> join(Vertex<K, VV> vertex, Edge<K, EV> edge) throws Exception {
return edge;
}
}
/**
* Removes all edges that match the given edge from the graph.
*
* @param edge the edge to remove
......@@ -1126,6 +1209,39 @@ public class Graph<K, VV, EV> {
}
}
/**
* Removes all the edges that match the edges in the given data set from the graph.
*
* @param edgesToBeRemoved the data set of edges to be removed
* @return a new graph where the edges have been removed and in which the vertices remained intact
*/
public Graph<K, VV, EV> removeEdges(DataSet<Edge<K, EV>> edgesToBeRemoved) {
DataSet<Edge<K, EV>> newEdges = getEdges().coGroup(edgesToBeRemoved).where(0,1).equalTo(0,1)
.with(new EdgeRemovalCoGroup<K, EV>());
return new Graph<K, VV, EV>(this.vertices, newEdges, context);
}
private static final class EdgeRemovalCoGroup<K,EV> implements CoGroupFunction<Edge<K, EV>, Edge<K, EV>, Edge<K, EV>> {
@Override
public void coGroup(Iterable<Edge<K, EV>> edge, Iterable<Edge<K, EV>> edgeToBeRemoved,
Collector<Edge<K, EV>> out) throws Exception {
final Iterator<Edge<K, EV>> edgeIterator = edge.iterator();
final Iterator<Edge<K, EV>> edgeToBeRemovedIterator = edgeToBeRemoved.iterator();
Edge<K, EV> next;
if (edgeIterator.hasNext()) {
if (!edgeToBeRemovedIterator.hasNext()) {
next = edgeIterator.next();
out.collect(next);
}
}
}
}
/**
* Performs union on the vertices and edges sets of the input graphs
* removing duplicate vertices but maintaining duplicate edges.
......
......@@ -21,12 +21,14 @@ package org.apache.flink.graph.test.operations;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.test.TestGraphUtils;
import org.apache.flink.test.util.MultipleProgramsTestBase;
import org.apache.flink.types.NullValue;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
......@@ -85,6 +87,39 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
"6,1,61\n";
}
@Test
public void testAddVertices() throws Exception {
/*
* Test addVertices() -- simple case
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeData(env), env);
List<Edge<Long, Long>> edges = new ArrayList<Edge<Long, Long>>();
edges.add(new Edge<Long, Long>(6L, 1L, 61L));
edges.add(new Edge<Long, Long>(7L, 1L, 71L));
DataSet<Vertex<Long, Long>> newVertices = env.fromElements(new Vertex<Long, Long>(6L, 6L),
new Vertex<Long, Long>(7L, 7L));
graph = graph.addVertices(newVertices, edges);
graph.getEdges().writeAsCsv(resultPath);
env.execute();
expectedResult = "1,2,12\n" +
"1,3,13\n" +
"2,3,23\n" +
"3,4,34\n" +
"3,5,35\n" +
"4,5,45\n" +
"5,1,51\n" +
"6,1,61\n" +
"7,1,71\n";
}
@Test
public void testAddVertexExisting() throws Exception {
/*
......@@ -111,6 +146,72 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
"5,1,51\n";
}
@Test
public void testAddVerticesBothExisting() throws Exception {
/*
* Test addVertices() -- add two existing vertices
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeData(env), env);
List<Edge<Long, Long>> edges = new ArrayList<Edge<Long, Long>>();
edges.add(new Edge<Long, Long>(1L, 5L, 15L));
edges.add(new Edge<Long, Long>(3L, 1L, 31L));
DataSet<Vertex<Long, Long>> newVertices = env.fromElements(new Vertex<Long, Long>(1L, 1L),
new Vertex<Long, Long>(3L, 3L));
graph = graph.addVertices(newVertices, edges);
graph.getEdges().writeAsCsv(resultPath);
env.execute();
expectedResult = "1,2,12\n" +
"1,3,13\n" +
"1,5,15\n" +
"2,3,23\n" +
"3,4,34\n" +
"3,5,35\n" +
"3,1,31\n" +
"4,5,45\n" +
"5,1,51\n";
}
@Test
public void testAddVerticesOneExisting() throws Exception {
/*
* Test addVertices() -- add an existing vertex
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeData(env), env);
List<Edge<Long, Long>> edges = new ArrayList<Edge<Long, Long>>();
edges.add(new Edge<Long, Long>(1L, 5L, 15L));
edges.add(new Edge<Long, Long>(6L, 1L, 61L));
DataSet<Vertex<Long, Long>> newVertices = env.fromElements(new Vertex<Long, Long>(1L, 1L),
new Vertex<Long, Long>(6L, 6L));
graph = graph.addVertices(newVertices, edges);
graph.getEdges().writeAsCsv(resultPath);
env.execute();
expectedResult = "1,2,12\n" +
"1,3,13\n" +
"1,5,15\n" +
"2,3,23\n" +
"3,4,34\n" +
"3,5,35\n" +
"4,5,45\n" +
"5,1,51\n" +
"6,1,61\n";
}
@Test
public void testAddVertexNoEdges() throws Exception {
/*
......@@ -134,6 +235,34 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
"6,6\n";
}
@Test
public void testAddVerticesNoEdges() throws Exception {
/*
* Test addVertices() -- add vertices with empty edge set
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeData(env), env);
List<Edge<Long, Long>> edges = new ArrayList<Edge<Long, Long>>();
DataSet<Vertex<Long, Long>> newVertices = env.fromElements(new Vertex<Long, Long>(6L, 6L),
new Vertex<Long, Long>(7L, 7L));
graph = graph.addVertices(newVertices, edges);
graph.getVertices().writeAsCsv(resultPath);
env.execute();
expectedResult = "1,1\n" +
"2,2\n" +
"3,3\n" +
"4,4\n" +
"5,5\n" +
"6,6\n" +
"7,7\n";
}
@Test
public void testRemoveVertex() throws Exception {
/*
......@@ -154,6 +283,28 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
"3,4,34\n";
}
@Test
public void testRemoveVertices() throws Exception {
/*
* Test removeVertices() -- simple case
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeData(env), env);
DataSet<Vertex<Long, Long>> verticesToBeRemoved = env.fromElements(new Vertex<Long, Long>(1L, 1L),
new Vertex<Long, Long>(2L, 2L));
graph = graph.removeVertices(verticesToBeRemoved);
graph.getEdges().writeAsCsv(resultPath);
env.execute();
expectedResult = "3,4,34\n" +
"3,5,35\n" +
"4,5,45\n";
}
@Test
public void testRemoveInvalidVertex() throws Exception {
/*
......@@ -176,6 +327,79 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
"4,5,45\n" +
"5,1,51\n";
}
@Test
public void testRemoveOneValidOneInvalidVertex() throws Exception {
/*
* Test removeVertices() -- remove one invalid vertex and a valid one
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeData(env), env);
DataSet<Vertex<Long, Long>> verticesToBeRemoved = env.fromElements(new Vertex<Long, Long>(1L, 1L),
new Vertex<Long, Long>(7L, 7L));
graph = graph.removeVertices(verticesToBeRemoved);
graph.getEdges().writeAsCsv(resultPath);
env.execute();
expectedResult = "2,3,23\n" +
"3,4,34\n" +
"3,5,35\n" +
"4,5,45\n";
}
@Test
public void testRemoveBothInvalidVertices() throws Exception {
/*
* Test removeVertices() -- remove two invalid vertices
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeData(env), env);
DataSet<Vertex<Long, Long>> verticesToBeRemoved = env.fromElements(new Vertex<Long, Long>(6L, 6L),
new Vertex<Long, Long>(7L, 7L));
graph = graph.removeVertices(verticesToBeRemoved);
graph.getEdges().writeAsCsv(resultPath);
env.execute();
expectedResult = "1,2,12\n" +
"1,3,13\n" +
"2,3,23\n" +
"3,4,34\n" +
"3,5,35\n" +
"4,5,45\n" +
"5,1,51\n";
}
@Test
public void testRemoveBothInvalidVerticesVertexResult() throws Exception {
/*
* Test removeVertices() -- remove two invalid vertices and verify the data set of vertices
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeData(env), env);
DataSet<Vertex<Long, Long>> verticesToBeRemoved = env.fromElements(new Vertex<Long, Long>(6L, 6L),
new Vertex<Long, Long>(7L, 7L));
graph = graph.removeVertices(verticesToBeRemoved);
graph.getVertices().writeAsCsv(resultPath);
env.execute();
expectedResult = "1,1\n" +
"2,2\n" +
"3,3\n" +
"4,4\n" +
"5,5\n";
}
@Test
public void testAddEdge() throws Exception {
......@@ -201,7 +425,39 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
"5,1,51\n" +
"6,1,61\n";
}
@Test
public void testAddEdges() throws Exception {
/*
* Test addEdges() -- simple case
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeData(env), env);
DataSet<Edge<Long, Long>> edgesToBeAdded = env.fromElements(new Edge<Long, Long>(6L, 1L, 61L),
new Edge<Long, Long>(7L, 1L, 71L));
DataSet<Vertex<Long, Long>> verticesToBeAdded = env.fromElements(new Vertex<Long, Long>(6L, 6L),
new Vertex<Long, Long>(7L, 7L));
graph = graph.addEdges(edgesToBeAdded, verticesToBeAdded);
graph.getEdges().writeAsCsv(resultPath);
env.execute();
expectedResult = "1,2,12\n" +
"1,3,13\n" +
"2,3,23\n" +
"3,4,34\n" +
"3,5,35\n" +
"4,5,45\n" +
"5,1,51\n" +
"6,1,61\n" +
"7,1,71\n";
}
@Test
public void testAddExistingEdge() throws Exception {
/*
......@@ -226,9 +482,9 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
"4,5,45\n" +
"5,1,51\n";
}
@Test
public void testRemoveVEdge() throws Exception {
public void testRemoveEdge() throws Exception {
/*
* Test removeEdge() -- simple case
*/
......@@ -248,7 +504,56 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
"3,5,35\n" +
"4,5,45\n";
}
@Test
public void testRemoveEdges() throws Exception {
/*
* Test removeEdges() -- simple case
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeData(env), env);
DataSet<Edge<Long, Long>> edgesToBeRemoved = env.fromElements(new Edge<Long, Long>(5L, 1L, 51L),
new Edge<Long, Long>(2L, 3L, 23L));
graph = graph.removeEdges(edgesToBeRemoved);
graph.getEdges().writeAsCsv(resultPath);
env.execute();
expectedResult = "1,2,12\n" +
"1,3,13\n" +
"3,4,34\n" +
"3,5,35\n" +
"4,5,45\n";
}
@Test
public void testRemoveSameEdgeTwice() throws Exception {
/*
* Test removeEdges() -- try to remove the same edge twice
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeData(env), env);
DataSet<Edge<Long, Long>> edgesToBeRemoved = env.fromElements(new Edge<Long, Long>(5L, 1L, 51L),
new Edge<Long, Long>(5L, 1L, 51L));
graph = graph.removeEdges(edgesToBeRemoved);
graph.getEdges().writeAsCsv(resultPath);
env.execute();
expectedResult = "1,2,12\n" +
"1,3,13\n" +
"2,3,23\n" +
"3,4,34\n" +
"3,5,35\n" +
"4,5,45\n";
}
@Test
public void testRemoveInvalidEdge() throws Exception {
/*
......@@ -271,4 +576,30 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
"4,5,45\n" +
"5,1,51\n";
}
@Test
public void testRemoveOneValidOneInvalidEdge() throws Exception {
/*
* Test removeEdges() -- one edge is valid, the other is invalid
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeData(env), env);
DataSet<Edge<Long, Long>> edgesToBeRemoved = env.fromElements(new Edge<Long, Long>(1L, 1L, 51L),
new Edge<Long, Long>(6L, 1L, 61L));
graph = graph.removeEdges(edgesToBeRemoved);
graph.getEdges().writeAsCsv(resultPath);
env.execute();
expectedResult = "1,2,12\n" +
"1,3,13\n" +
"2,3,23\n" +
"3,4,34\n" +
"3,5,35\n" +
"4,5,45\n" +
"5,1,51\n";
}
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册