提交 3ade4c79 编写于 作者: M Martin Junghanns 提交者: vasia

[FLINK-2905] [gelly] Add Graph Intersection method

This closes #1329
上级 c6075495
......@@ -485,13 +485,66 @@ val networkWithWeights = network.joinWithEdgesOnSource(vertexOutDegrees, (v1: Do
* <strong>Undirected</strong>: In Gelly, a `Graph` is always directed. Undirected graphs can be represented by adding all opposite-direction edges to a graph. For this purpose, Gelly provides the `getUndirected()` method.
* <strong>Union</strong>: Gelly's `union()` method performs a union operation on the vertex and edge sets of the specified graph and current graph. Duplicate vertices are removed from the resulting `Graph`, while if duplicate edges exists, these will be maintained.
* <strong>Union</strong>: Gelly's `union()` method performs a union operation on the vertex and edge sets of the specified graph and the current graph. Duplicate vertices are removed from the resulting `Graph`, while if duplicate edges exist, these will be preserved.
<p class="text-center">
<img alt="Union Transformation" width="50%" src="fig/gelly-union.png"/>
</p>
* <strong>Difference</strong>: Gelly's `difference()` method performs a difference on the vertex and edge sets of the current graph and specified graph.
* <strong>Difference</strong>: Gelly's `difference()` method performs a difference on the vertex and edge sets of the current graph and the specified graph.
* <strong>Intersect</strong>: Gelly's `intersect()` method performs an intersect on the edge
sets of the current graph and the specified graph. The result is a new `Graph` that contains all
edges that exist in both input graphs. Two edges are considered equal, if they have the same source
identifier, target identifier and edge value. Vertices in the resulting graph have no
value. If vertex values are required, one can for example retrieve them from one of the input graphs using
the `joinWithVertices()` method.
Depending on the parameter `distinct`, equal edges are either contained once in the resulting
`Graph` or as often as there are pairs of equal edges in the input graphs.
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// create first graph from edges {(1, 3, 12) (1, 3, 13), (1, 3, 13)}
List<Edge<Long, Long>> edges1 = ...
Graph<Long, NullValue, Long> graph1 = Graph.fromCollection(edges1, env);
// create second graph from edges {(1, 3, 13)}
List<Edge<Long, Long>> edges2 = ...
Graph<Long, NullValue, Long> graph2 = Graph.fromCollection(edges2, env);
// Using distinct = true results in {(1,3,13)}
Graph<Long, NullValue, Long> intersect1 = graph1.intersect(graph2, true);
// Using distinct = false results in {(1,3,13),(1,3,13)} as there is one edge pair
Graph<Long, NullValue, Long> intersect2 = graph1.intersect(graph2, false);
{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
{% highlight scala %}
val env = ExecutionEnvironment.getExecutionEnvironment
// create first graph from edges {(1, 3, 12) (1, 3, 13), (1, 3, 13)}
val edges1: List[Edge[Long, Long]] = ...
val graph1 = Graph.fromCollection(edges1, env)
// create second graph from edges {(1, 3, 13)}
val edges2: List[Edge[Long, Long]] = ...
val graph2 = Graph.fromCollection(edges2, env)
// Using distinct = true results in {(1,3,13)}
val intersect1 = graph1.intersect(graph2, true)
// Using distinct = false results in {(1,3,13),(1,3,13)} as there is one edge pair
val intersect2 = graph1.intersect(graph2, false)
{% endhighlight %}
</div>
</div>
-[Back to top](#top)
......
......@@ -967,6 +967,29 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
wrapGraph(jgraph.difference(graph.getWrappedGraph))
}
/**
* Performs intersect on the edge sets of the input graphs. Edges are considered equal, if they
* have the same source identifier, target identifier and edge value.
* <p>
* The method computes pairs of equal edges from the input graphs. If the same edge occurs
* multiple times in the input graphs, there will be multiple edge pairs to be considered. Each
* edge instance can only be part of one pair. If the given parameter {@code distinctEdges} is set
* to {@code true}, there will be exactly one edge in the output graph representing all pairs of
* equal edges. If the parameter is set to {@code false}, both edges of each pair will be in the
* output.
* <p>
* Vertices in the output graph will have no vertex values.
*
* @param graph the graph to perform intersect with
* @param distinctEdges if set to { @code true}, there will be exactly one edge in the output
* graph representing all pairs of equal edges, otherwise, for each pair,
* both edges will be in the output graph
* @return a new graph which contains only common vertices and edges from the input graphs
*/
def intersect(graph: Graph[K, VV, EV], distinctEdges: Boolean): Graph[K, NullValue, EV] = {
wrapGraph(jgraph.intersect(graph.getWrappedGraph, distinctEdges))
}
/**
* Compute a reduce transformation over the neighbors' vertex values of each vertex.
* For each vertex, the transformation consecutively calls a
......
......@@ -60,8 +60,8 @@ import org.apache.flink.graph.utils.Tuple2ToVertexMap;
import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
import org.apache.flink.graph.utils.VertexToTuple2Map;
import org.apache.flink.graph.validation.GraphValidator;
import org.apache.flink.util.Collector;
import org.apache.flink.types.NullValue;
import org.apache.flink.util.Collector;
/**
* Represents a Graph consisting of {@link Edge edges} and {@link Vertex
......@@ -174,7 +174,7 @@ public class Graph<K, VV, EV> {
}
private static final class EmitSrcAndTarget<K, EV> implements FlatMapFunction<
Edge<K, EV>, Vertex<K, NullValue>> {
Edge<K, EV>, Vertex<K, NullValue>> {
public void flatMap(Edge<K, EV> edge, Collector<Vertex<K, NullValue>> out) {
out.collect(new Vertex<K, NullValue>(edge.f0, NullValue.getInstance()));
......@@ -1477,7 +1477,6 @@ public class Graph<K, VV, EV> {
* @return a new graph
*/
public Graph<K, VV, EV> union(Graph<K, VV, EV> graph) {
DataSet<Vertex<K, VV>> unionedVertices = graph.getVertices().union(this.getVertices()).distinct();
DataSet<Edge<K, EV>> unionedEdges = graph.getEdges().union(this.getEdges());
return new Graph<K, VV, EV>(unionedVertices, unionedEdges, this.context);
......@@ -1496,6 +1495,95 @@ public class Graph<K, VV, EV> {
return this.removeVertices(removeVerticesData);
}
/**
* Performs intersect on the edge sets of the input graphs. Edges are considered equal, if they
* have the same source identifier, target identifier and edge value.
* <p>
* The method computes pairs of equal edges from the input graphs. If the same edge occurs
* multiple times in the input graphs, there will be multiple edge pairs to be considered. Each
* edge instance can only be part of one pair. If the given parameter {@code distinctEdges} is set
* to {@code true}, there will be exactly one edge in the output graph representing all pairs of
* equal edges. If the parameter is set to {@code false}, both edges of each pair will be in the
* output.
* <p>
* Vertices in the output graph will have no vertex values.
*
* @param graph the graph to perform intersect with
* @param distinctEdges if set to {@code true}, there will be exactly one edge in the output graph
* representing all pairs of equal edges, otherwise, for each pair, both
* edges will be in the output graph
* @return a new graph which contains only common vertices and edges from the input graphs
*/
public Graph<K, NullValue, EV> intersect(Graph<K, VV, EV> graph, boolean distinctEdges) {
DataSet<Edge<K, EV>> intersectEdges;
if (distinctEdges) {
intersectEdges = getDistinctEdgeIntersection(graph.getEdges());
} else {
intersectEdges = getPairwiseEdgeIntersection(graph.getEdges());
}
return Graph.fromDataSet(intersectEdges, getContext());
}
/**
* Computes the intersection between the edge set and the given edge set. For all matching pairs,
* only one edge will be in the resulting data set.
*
* @param edges edges to compute intersection with
* @return edge set containing one edge for all matching pairs of the same edge
*/
private DataSet<Edge<K, EV>> getDistinctEdgeIntersection(DataSet<Edge<K, EV>> edges) {
return this.getEdges()
.join(edges)
.where(0, 1, 2)
.equalTo(0, 1, 2)
.with(new JoinFunction<Edge<K, EV>, Edge<K, EV>, Edge<K, EV>>() {
@Override
public Edge<K, EV> join(Edge<K, EV> first, Edge<K, EV> second) throws Exception {
return first;
}
}).withForwardedFieldsFirst("*")
.distinct();
}
/**
* Computes the intersection between the edge set and the given edge set. For all matching pairs, both edges will be
* in the resulting data set.
*
* @param edges edges to compute intersection with
* @return edge set containing both edges from all matching pairs of the same edge
*/
private DataSet<Edge<K, EV>> getPairwiseEdgeIntersection(DataSet<Edge<K, EV>> edges) {
return this.getEdges()
.coGroup(edges)
.where(0, 1, 2)
.equalTo(0, 1, 2)
.with(new MatchingEdgeReducer<K, EV>());
}
/**
* As long as both input iterables have more edges, the reducer outputs each edge of a pair.
*
* @param <K> vertex identifier type
* @param <EV> edge value type
*/
private static final class MatchingEdgeReducer<K, EV>
implements CoGroupFunction<Edge<K,EV>, Edge<K,EV>, Edge<K, EV>> {
@Override
public void coGroup(Iterable<Edge<K, EV>> edgesLeft, Iterable<Edge<K, EV>> edgesRight, Collector<Edge<K, EV>> out)
throws Exception {
Iterator<Edge<K, EV>> leftIt = edgesLeft.iterator();
Iterator<Edge<K, EV>> rightIt = edgesRight.iterator();
// collect pairs once
while(leftIt.hasNext() && rightIt.hasNext()) {
out.collect(leftIt.next());
out.collect(rightIt.next());
}
}
}
/**
* Runs a Vertex-Centric iteration on the graph.
* No configuration options are provided.
......
......@@ -21,9 +21,12 @@ package org.apache.flink.graph.test.operations;
import java.util.ArrayList;
import java.util.List;
import com.google.common.collect.Lists;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
......@@ -31,6 +34,7 @@ import org.apache.flink.graph.Triplet;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.test.TestGraphUtils;
import org.apache.flink.test.util.MultipleProgramsTestBase;
import org.apache.flink.types.NullValue;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
......@@ -302,7 +306,6 @@ public class GraphOperationsITCase extends MultipleProgramsTestBase {
compareResultAsTuples(result, expectedResult);
}
@Test
public void testDifferenceVertices() throws Exception{
/*Test difference() method by checking the output for getVertices() on the resultant graph
......@@ -355,6 +358,91 @@ public class GraphOperationsITCase extends MultipleProgramsTestBase {
compareResultAsTuples(result, expectedResult);
}
@Test
public final void testIntersect() throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@SuppressWarnings("unchecked")
List<Edge<Long, Long>> edges1 = Lists.newArrayList(
new Edge<>(1L, 3L, 12L),
new Edge<>(1L, 3L, 13L), // needs to be in the output
new Edge<>(1L, 3L, 14L)
);
@SuppressWarnings("unchecked")
List<Edge<Long, Long>> edges2 = Lists.newArrayList(
new Edge<>(1L, 3L, 13L)
);
Graph<Long, NullValue, Long> graph1 = Graph.fromCollection(edges1, env);
Graph<Long, NullValue, Long> graph2 = Graph.fromCollection(edges2, env);
Graph<Long, NullValue, Long> intersect = graph1.intersect(graph2, true);
List<Vertex<Long, NullValue>> vertices = Lists.newArrayList();
List<Edge<Long, Long>> edges = Lists.newArrayList();
intersect.getVertices().output(new LocalCollectionOutputFormat<>(vertices));
intersect.getEdges().output(new LocalCollectionOutputFormat<>(edges));
env.execute();
String expectedVertices = "1,(null)\n" +
"3,(null)\n";
String expectedEdges = "1,3,13\n";
compareResultAsTuples(vertices, expectedVertices);
compareResultAsTuples(edges, expectedEdges);
}
@Test
public final void testIntersectWithPairs() throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@SuppressWarnings("unchecked")
List<Edge<Long, Long>> edges1 = Lists.newArrayList(
new Edge<>(1L, 3L, 12L),
new Edge<>(1L, 3L, 13L),
new Edge<>(1L, 3L, 13L), // output
new Edge<>(1L, 3L, 13L), // output
new Edge<>(1L, 3L, 14L) // output
);
@SuppressWarnings("unchecked")
List<Edge<Long, Long>> edges2 = Lists.newArrayList(
new Edge<>(1L, 3L, 13L), // output
new Edge<>(1L, 3L, 13L), // output
new Edge<>(1L, 3L, 14L) // output
);
Graph<Long, NullValue, Long> graph1 = Graph.fromCollection(edges1, env);
Graph<Long, NullValue, Long> graph2 = Graph.fromCollection(edges2, env);
Graph<Long, NullValue, Long> intersect = graph1.intersect(graph2, false);
List<Vertex<Long, NullValue>> vertices = Lists.newArrayList();
List<Edge<Long, Long>> edges = Lists.newArrayList();
intersect.getVertices().output(new LocalCollectionOutputFormat<>(vertices));
intersect.getEdges().output(new LocalCollectionOutputFormat<>(edges));
env.execute();
String expectedVertices = "1,(null)\n" +
"3,(null)\n";
String expectedEdges = "1,3,13\n" +
"1,3,13\n" +
"1,3,13\n" +
"1,3,13\n" +
"1,3,14\n" +
"1,3,14";
compareResultAsTuples(vertices, expectedVertices);
compareResultAsTuples(edges, expectedEdges);
}
@Test
public void testTriplets() throws Exception {
/*
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册