提交 d015bb0f 编写于 作者: A andralungu 提交者: vasia

[FLINK-1758] [gelly] Replaced groupReduce with reduce

上级 9de640af
......@@ -266,10 +266,13 @@ Neighborhood Methods
Neighborhood methods allow vertices to perform an aggregation on their first-hop neighborhood.
`groupReduceOnEdges()` can be used to compute an aggregation on the neighboring edges of a vertex, while `groupReduceOnNeighbors()` has access on both the neighboring edges and vertices. The neighborhood scope is defined by the `EdgeDirection` parameter, which takes the values `IN`, `OUT` or `ALL`. `IN` will gather all in-coming edges (neighbors) of a vertex, `OUT` will gather all out-going edges (neighbors), while `ALL` will gather all edges (neighbors).
`groupReduceOnEdges()` can be used to compute an aggregation on the neighboring edges of a vertex,
while `groupReduceOnNeighbors()` has access to both the neighboring edges and vertices. The neighborhood scope
is defined by the `EdgeDirection` parameter, which takes the values `IN`, `OUT` or `ALL`. `IN` will gather all in-coming edges (neighbors) of a vertex, `OUT` will gather all out-going edges (neighbors), while `ALL` will gather all edges (neighbors).
The `groupReduceOnEdges()` and `groupReduceOnNeighbors()` methods return zero, one or more values per vertex.
When returning a single value per vertex, `reduceOnEdges()` or `reduceOnNeighbors()` should be called as they are more efficient.
When the user-defined function to be applied on the neighborhood is associative and commutative, it is highly advised to use the `reduceOnEdges()` and `reduceOnNeighbors()` methods. These methods exploit combiners internally, significantly improving performance.
For example, assume that you want to select the minimum weight of all out-edges for each vertex in the following graph:
......@@ -286,22 +289,20 @@ DataSet<Tuple2<Long, Double>> minWeights = graph.groupReduceOnEdges(
new SelectMinWeight(), EdgeDirection.OUT);
// user-defined function to select the minimum weight
static final class SelectMinWeightNeighbor implements EdgesFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>> {
static final class SelectMinWeight implements EdgesFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>> {
@Override
public void iterateEdges(Vertex<Long, Long> v,
Iterable<Edge<Long, Long>> edges, Collector<Tuple2<Long, Long>> out) throws Exception {
long weight = Long.MAX_VALUE;
long minNeighborId = 0;
long minWeight = Long.MAX_VALUE;
for (Edge<Long, Long> edge: edges) {
if (edge.getValue() < weight) {
weight = edge.getValue();
minNeighborId = edge.getTarget();
if (edge.getValue() < minWeight) {
minWeight = edge.getValue();
}
}
out.collect(new Tuple2<Long, Long>(v.getId(), minNeighborId));
out.collect(new Tuple2<Long, Long>(v.getId(), minWeight));
}
}
{% endhighlight %}
......@@ -335,6 +336,32 @@ static final class SumValues implements ReduceNeighborsFunction<Long, Long, Doub
<img alt="reduceOnNeighbors Example" width="70%" src="img/gelly-reduceOnNeighbors.png"/>
</p>
The following code will collect the in-edges for each vertex and apply the `SumInNeighbors()` user-defined function on each of the resulting neighborhoods:
{% highlight java %}
Graph<Long, Long, Double> graph = ...
DataSet<Tuple2<Long, Long>> verticesWithSum =
graph.groupReduceOnNeighbors(new SumInNeighbors(), EdgeDirection.IN);
// user-defined function to sum up the in-neighbor values.
static final class SumInNeighbors implements NeighborsFunctionWithVertexValue<Long, Long, Long,
Tuple2<Long, Long>> {
@Override
public void iterateNeighbors(Vertex<Long, Long> vertex,
Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
Collector<Tuple2<Long, Long>> out) throws Exception {
long sum = 0;
for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
sum += neighbor.f0.getValue() * neighbor.f1.getValue();
}
out.collect(new Tuple2<Long, Long>(vertex.getId(), sum));
}
}
{% endhighlight %}
When the aggregation computation does not require access to the vertex value (for which the aggregation is performed), it is advised to use the more efficient `EdgesFunction` and `NeighborsFunction` for the user-defined functions. When access to the vertex value is required, one should use `EdgesFunctionWithVertexValue` and `NeighborsFunctionWithVertexValue` instead.
[Back to top](#top)
......
......@@ -22,8 +22,10 @@ package org.apache.flink.graph;
* The EdgeDirection is used to select a node's neighborhood
* by the {@link Graph#groupReduceOnEdges(EdgesFunction, EdgeDirection)},
* {@link Graph#groupReduceOnEdges(EdgesFunctionWithVertexValue, EdgeDirection)},
* {@link Graph#groupReduceOnNeighbors(NeighborsFunction, EdgeDirection)} and
* {@link Graph#groupReduceOnNeighbors(NeighborsFunctionWithVertexValue, EdgeDirection)}
* {@link Graph#groupReduceOnNeighbors(NeighborsFunction, EdgeDirection)},
* {@link Graph#groupReduceOnNeighbors(NeighborsFunctionWithVertexValue, EdgeDirection)},
* {@link Graph#reduceOnEdges(ReduceEdgesFunction, EdgeDirection)} and
* {@link Graph#reduceOnNeighbors(ReduceNeighborsFunction, EdgeDirection)}
* methods.
*/
public enum EdgeDirection {
......
......@@ -724,7 +724,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
* @throws IllegalArgumentException
*/
public <T> DataSet<T> groupReduceOnEdges(EdgesFunctionWithVertexValue<K, VV, EV, T> edgesFunction,
EdgeDirection direction) throws IllegalArgumentException {
EdgeDirection direction) throws IllegalArgumentException {
switch (direction) {
case IN:
......@@ -755,7 +755,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
* @throws IllegalArgumentException
*/
public <T> DataSet<T> groupReduceOnEdges(EdgesFunction<K, EV, T> edgesFunction,
EdgeDirection direction) throws IllegalArgumentException {
EdgeDirection direction) throws IllegalArgumentException {
switch (direction) {
case IN:
......@@ -1240,7 +1240,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
* @throws IllegalArgumentException
*/
public <T> DataSet<T> groupReduceOnNeighbors(NeighborsFunctionWithVertexValue<K, VV, EV, T> neighborsFunction,
EdgeDirection direction) throws IllegalArgumentException {
EdgeDirection direction) throws IllegalArgumentException {
switch (direction) {
case IN:
// create <edge-sourceVertex> pairs
......@@ -1283,7 +1283,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
* @throws IllegalArgumentException
*/
public <T> DataSet<T> groupReduceOnNeighbors(NeighborsFunction<K, VV, EV, T> neighborsFunction,
EdgeDirection direction) throws IllegalArgumentException {
EdgeDirection direction) throws IllegalArgumentException {
switch (direction) {
case IN:
// create <edge-sourceVertex> pairs
......@@ -1437,7 +1437,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
* @throws IllegalArgumentException
*/
public DataSet reduceOnNeighbors(ReduceNeighborsFunction<K, VV, EV> reduceNeighborsFunction,
EdgeDirection direction) throws IllegalArgumentException {
EdgeDirection direction) throws IllegalArgumentException {
switch (direction) {
case IN:
// create <edge-sourceVertex> pairs
......@@ -1504,7 +1504,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
* @throws IllegalArgumentException
*/
public DataSet reduceOnEdges(ReduceEdgesFunction<K, EV> reduceEdgesFunction,
EdgeDirection direction) throws IllegalArgumentException {
EdgeDirection direction) throws IllegalArgumentException {
switch (direction) {
case IN:
......
......@@ -18,6 +18,7 @@
package org.apache.flink.graph;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.java.tuple.Tuple2;
import java.io.Serializable;
......@@ -30,7 +31,7 @@ import java.io.Serializable;
* @param <EV> the edge value type
*/
public interface ReduceEdgesFunction<K extends Comparable<K> & Serializable,
EV extends Serializable> {
EV extends Serializable> extends Function, Serializable {
Tuple2<K, Edge<K, EV>> reduceEdges(Tuple2<K, Edge<K, EV>> firstEdge, Tuple2<K, Edge<K, EV>> secondEdge);
}
......@@ -36,5 +36,5 @@ public interface ReduceNeighborsFunction <K extends Comparable<K> & Serializable
EV extends Serializable> extends Function, Serializable {
Tuple3<K, Edge<K, EV>, Vertex<K, VV>> reduceNeighbors(Tuple3<K, Edge<K, EV>, Vertex<K, VV>> firstNeighbor,
Tuple3<K, Edge<K, EV>, Vertex<K, VV>> secondNeighbor);
Tuple3<K, Edge<K, EV>, Vertex<K, VV>> secondNeighbor);
}
......@@ -32,6 +32,7 @@ import org.apache.flink.graph.EdgesFunction;
import org.apache.flink.graph.Triplet;
import org.apache.flink.graph.example.utils.JaccardSimilarityMeasureData;
import org.apache.flink.types.NullValue;
import org.apache.flink.util.Collector;
import java.util.HashSet;
......@@ -68,7 +69,7 @@ public class JaccardSimilarityMeasureExample implements ProgramDescription {
Graph<Long, NullValue, Double> graph = Graph.fromDataSet(edges, env);
DataSet<Vertex<Long, HashSet<Long>>> verticesWithNeighbors =
graph.reduceOnEdges(new GatherNeighbors(), EdgeDirection.ALL);
graph.groupReduceOnEdges(new GatherNeighbors(), EdgeDirection.ALL);
Graph<Long, HashSet<Long>, Double> graphWithVertexValues = Graph.fromDataSet(verticesWithNeighbors, edges, env);
......@@ -106,7 +107,8 @@ public class JaccardSimilarityMeasureExample implements ProgramDescription {
private static final class GatherNeighbors implements EdgesFunction<Long, Double, Vertex<Long, HashSet<Long>>> {
@Override
public Vertex<Long, HashSet<Long>> iterateEdges(Iterable<Tuple2<Long, Edge<Long, Double>>> edges) throws Exception {
public void iterateEdges(Iterable<Tuple2<Long, Edge<Long, Double>>> edges,
Collector<Vertex<Long, HashSet<Long>>> out) throws Exception {
HashSet<Long> neighborsHashSet = new HashSet<Long>();
long vertexId = -1;
......@@ -115,7 +117,7 @@ public class JaccardSimilarityMeasureExample implements ProgramDescription {
neighborsHashSet.add(getNeighborID(edge));
vertexId = edge.f0;
}
return new Vertex<Long, HashSet<Long>>(vertexId, neighborsHashSet);
out.collect(new Vertex<Long, HashSet<Long>>(vertexId, neighborsHashSet));
}
}
......
......@@ -78,9 +78,9 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
env.execute();
expectedResult = "1,2\n" +
"2,3\n" +
"2,3\n" +
"3,4\n" +
"4,5\n" +
"4,5\n" +
"5,1\n";
}
......@@ -335,8 +335,8 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
@Test
public void testLowestWeightOutNeighborNoValue() throws Exception {
/*
* Get the lowest-weight out-neighbor
* for each vertex
* Get the lowest-weight out of all the out-neighbors
* of each vertex
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
......@@ -347,33 +347,33 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
verticesWithLowestOutNeighbor.writeAsCsv(resultPath);
env.execute();
expectedResult = "1,2\n" +
"2,3\n" +
"3,4\n" +
"4,5\n" +
"5,1\n";
expectedResult = "1,12\n" +
"2,23\n" +
"3,34\n" +
"4,45\n" +
"5,51\n";
}
@Test
public void testLowestWeightInNeighborNoValue() throws Exception {
/*
* Get the lowest-weight in-neighbor
* for each vertex
* Get the lowest-weight out of all the in-neighbors
* of each vertex
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeData(env), env);
DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor =
graph.groupReduceOnEdges(new SelectMinWeightInNeighborNoValue(), EdgeDirection.IN);
graph.reduceOnEdges(new SelectMinWeightInNeighborNoValue(), EdgeDirection.IN);
verticesWithLowestOutNeighbor.writeAsCsv(resultPath);
env.execute();
expectedResult = "1,5\n" +
"2,1\n" +
"3,1\n" +
"4,3\n" +
"5,3\n";
expectedResult = "1,51\n" +
"2,12\n" +
"3,13\n" +
"4,34\n" +
"5,35\n";
}
@Test
......@@ -455,25 +455,6 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
@SuppressWarnings("serial")
private static final class SelectMaxWeightNeighborNoValue implements ReduceEdgesFunction<Long, Long> {
// @Override
// public void iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges,
// Collector<Tuple2<Long, Long>> out) throws Exception {
//
// long weight = Long.MIN_VALUE;
// long vertexId = -1;
// long i=0;
//
// for (Tuple2<Long, Edge<Long, Long>> edge: edges) {
// if (edge.f1.getValue() > weight) {
// weight = edge.f1.getValue();
// }
// if (i==0) {
// vertexId = edge.f0;
// } i++;
// }
// out.collect(new Tuple2<Long, Long>(vertexId, weight));
// }
@Override
public Tuple2<Long, Edge<Long, Long>> reduceEdges(Tuple2<Long, Edge<Long, Long>> firstEdge,
Tuple2<Long, Edge<Long, Long>> secondEdge) {
......@@ -506,27 +487,16 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
}
@SuppressWarnings("serial")
private static final class SelectMinWeightInNeighborNoValue implements EdgesFunction<Long, Long, Tuple2<Long, Long>> {
private static final class SelectMinWeightInNeighborNoValue implements ReduceEdgesFunction<Long, Long> {
@Override
public void iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges,
Collector<Tuple2<Long, Long>> out) throws Exception {
long weight = Long.MAX_VALUE;
long minNeighorId = 0;
long vertexId = -1;
long i=0;
for (Tuple2<Long, Edge<Long, Long>> edge: edges) {
if (edge.f1.getValue() < weight) {
weight = edge.f1.getValue();
minNeighorId = edge.f1.getSource();
}
if (i==0) {
vertexId = edge.f0;
} i++;
public Tuple2<Long, Edge<Long, Long>> reduceEdges(Tuple2<Long, Edge<Long, Long>> firstEdge,
Tuple2<Long, Edge<Long, Long>> secondEdge) {
if(firstEdge.f1.getValue() < secondEdge.f1.getValue()) {
return firstEdge;
} else {
return secondEdge;
}
out.collect(new Tuple2<Long, Long>(vertexId, minNeighorId));
}
}
......
......@@ -20,7 +20,6 @@ package org.apache.flink.graph.test.operations;
import java.util.Iterator;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
......@@ -227,7 +226,7 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
TestGraphUtils.getLongLongEdgeData(env), env);
DataSet<Tuple2<Long, Long>> verticesWithSum =
graph.reduceOnNeighbors(new SumInNeighborsNoValue(), EdgeDirection.IN);
graph.groupReduceOnNeighbors(new SumInNeighborsNoValue(), EdgeDirection.IN);
verticesWithSum.writeAsCsv(resultPath);
env.execute();
......@@ -536,15 +535,21 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
}
@SuppressWarnings("serial")
private static final class SumInNeighborsNoValue implements ReduceNeighborsFunction<Long, Long, Long> {
private static final class SumInNeighborsNoValue implements NeighborsFunction<Long, Long, Long,
Tuple2<Long, Long>> {
@Override
public Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> reduceNeighbors(Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> firstNeighbor,
Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> secondNeighbor) {
long sum = firstNeighbor.f2.getValue() * firstNeighbor.f1.getValue() +
secondNeighbor.f2.getValue() * secondNeighbor.f1.getValue();
return new Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>(firstNeighbor.f0, firstNeighbor.f1,
new Vertex<Long, Long>(firstNeighbor.f0, sum));
public void iterateNeighbors(Iterable<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
Collector<Tuple2<Long, Long>> out) throws Exception {
long sum = 0;
Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> next = null;
Iterator<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighborsIterator =
neighbors.iterator();
while(neighborsIterator.hasNext()) {
next = neighborsIterator.next();
sum += next.f2.getValue() * next.f1.getValue();
}
out.collect(new Tuple2<Long, Long>(next.f0, sum));
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册