From d015bb0f6e9b6c8764cb34dc8085751596b8a901 Mon Sep 17 00:00:00 2001 From: andralungu Date: Mon, 13 Apr 2015 10:26:55 +0200 Subject: [PATCH] [FLINK-1758] [gelly] Replaced groupReduce with reduce --- docs/libs/gelly_guide.md | 45 ++++++++--- .../org/apache/flink/graph/EdgeDirection.java | 6 +- .../java/org/apache/flink/graph/Graph.java | 12 +-- .../flink/graph/ReduceEdgesFunction.java | 3 +- .../flink/graph/ReduceNeighborsFunction.java | 2 +- .../JaccardSimilarityMeasureExample.java | 8 +- .../ReduceOnEdgesMethodsITCase.java | 78 ++++++------------- .../ReduceOnNeighborMethodsITCase.java | 23 +++--- 8 files changed, 92 insertions(+), 85 deletions(-) diff --git a/docs/libs/gelly_guide.md b/docs/libs/gelly_guide.md index 1a02e6d0291..6390d6fe4ab 100644 --- a/docs/libs/gelly_guide.md +++ b/docs/libs/gelly_guide.md @@ -266,10 +266,13 @@ Neighborhood Methods Neighborhood methods allow vertices to perform an aggregation on their first-hop neighborhood. -`groupReduceOnEdges()` can be used to compute an aggregation on the neighboring edges of a vertex, while `groupReduceOnNeighbors()` has access on both the neighboring edges and vertices. The neighborhood scope is defined by the `EdgeDirection` parameter, which takes the values `IN`, `OUT` or `ALL`. `IN` will gather all in-coming edges (neighbors) of a vertex, `OUT` will gather all out-going edges (neighbors), while `ALL` will gather all edges (neighbors). +`groupReduceOnEdges()` can be used to compute an aggregation on the neighboring edges of a vertex, +while `groupReduceOnNeighbors()` has access to both the neighboring edges and vertices. The neighborhood scope +is defined by the `EdgeDirection` parameter, which takes the values `IN`, `OUT` or `ALL`. `IN` will gather all in-coming edges (neighbors) of a vertex, `OUT` will gather all out-going edges (neighbors), while `ALL` will gather all edges (neighbors). The `groupReduceOnEdges()` and `groupReduceOnNeighbors()` methods return zero, one or more values per vertex. -When returning a single value per vertex, `reduceOnEdges()` or `reduceOnNeighbors()` should be called as they are more efficient. + +When the user-defined function to be applied on the neighborhood is associative and commutative, it is highly advised to use the `reduceOnEdges()` and `reduceOnNeighbors()` methods. These methods exploit combiners internally, significantly improving performance. For example, assume that you want to select the minimum weight of all out-edges for each vertex in the following graph: @@ -286,22 +289,20 @@ DataSet> minWeights = graph.groupReduceOnEdges( new SelectMinWeight(), EdgeDirection.OUT); // user-defined function to select the minimum weight -static final class SelectMinWeightNeighbor implements EdgesFunctionWithVertexValue> { +static final class SelectMinWeight implements EdgesFunctionWithVertexValue> { @Override public void iterateEdges(Vertex v, Iterable> edges, Collector> out) throws Exception { - long weight = Long.MAX_VALUE; - long minNeighborId = 0; + long minWeight = Long.MAX_VALUE; for (Edge edge: edges) { - if (edge.getValue() < weight) { - weight = edge.getValue(); - minNeighborId = edge.getTarget(); + if (edge.getValue() < minWeight) { + minWeight = edge.getValue(); } } - out.collect(new Tuple2(v.getId(), minNeighborId)); + out.collect(new Tuple2(v.getId(), minWeight)); } } {% endhighlight %} @@ -335,6 +336,32 @@ static final class SumValues implements ReduceNeighborsFunction

+The following code will collect the in-edges for each vertex and apply the `SumInNeighbors()` user-defined function on each of the resulting neighborhoods: + +{% highlight java %} +Graph graph = ... + +DataSet> verticesWithSum = + graph.groupReduceOnNeighbors(new SumInNeighbors(), EdgeDirection.IN); + +// user-defined function to sum up the in-neighbor values. +static final class SumInNeighbors implements NeighborsFunctionWithVertexValue> { + + @Override + public void iterateNeighbors(Vertex vertex, + Iterable, Vertex>> neighbors, + Collector> out) throws Exception { + + long sum = 0; + for (Tuple2, Vertex> neighbor : neighbors) { + sum += neighbor.f0.getValue() * neighbor.f1.getValue(); + } + out.collect(new Tuple2(vertex.getId(), sum)); + } +} +{% endhighlight %} + When the aggregation computation does not require access to the vertex value (for which the aggregation is performed), it is advised to use the more efficient `EdgesFunction` and `NeighborsFunction` for the user-defined functions. When access to the vertex value is required, one should use `EdgesFunctionWithVertexValue` and `NeighborsFunctionWithVertexValue` instead. [Back to top](#top) diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgeDirection.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgeDirection.java index 65d40986f28..0a055bbf4a1 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgeDirection.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgeDirection.java @@ -22,8 +22,10 @@ package org.apache.flink.graph; * The EdgeDirection is used to select a node's neighborhood * by the {@link Graph#groupReduceOnEdges(EdgesFunction, EdgeDirection)}, * {@link Graph#groupReduceOnEdges(EdgesFunctionWithVertexValue, EdgeDirection)}, - * {@link Graph#groupReduceOnNeighbors(NeighborsFunction, EdgeDirection)} and - * {@link Graph#groupReduceOnNeighbors(NeighborsFunctionWithVertexValue, EdgeDirection)} + * {@link Graph#groupReduceOnNeighbors(NeighborsFunction, EdgeDirection)}, + * {@link Graph#groupReduceOnNeighbors(NeighborsFunctionWithVertexValue, EdgeDirection)}, + * {@link Graph#reduceOnEdges(ReduceEdgesFunction, EdgeDirection)} and + * {@link Graph#reduceOnNeighbors(ReduceNeighborsFunction, EdgeDirection)} * methods. */ public enum EdgeDirection { diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java index 1325e0cc931..48d39b1e62a 100755 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java @@ -724,7 +724,7 @@ public class Graph & Serializable, VV extends Serializab * @throws IllegalArgumentException */ public DataSet groupReduceOnEdges(EdgesFunctionWithVertexValue edgesFunction, - EdgeDirection direction) throws IllegalArgumentException { + EdgeDirection direction) throws IllegalArgumentException { switch (direction) { case IN: @@ -755,7 +755,7 @@ public class Graph & Serializable, VV extends Serializab * @throws IllegalArgumentException */ public DataSet groupReduceOnEdges(EdgesFunction edgesFunction, - EdgeDirection direction) throws IllegalArgumentException { + EdgeDirection direction) throws IllegalArgumentException { switch (direction) { case IN: @@ -1240,7 +1240,7 @@ public class Graph & Serializable, VV extends Serializab * @throws IllegalArgumentException */ public DataSet groupReduceOnNeighbors(NeighborsFunctionWithVertexValue neighborsFunction, - EdgeDirection direction) throws IllegalArgumentException { + EdgeDirection direction) throws IllegalArgumentException { switch (direction) { case IN: // create pairs @@ -1283,7 +1283,7 @@ public class Graph & Serializable, VV extends Serializab * @throws IllegalArgumentException */ public DataSet groupReduceOnNeighbors(NeighborsFunction neighborsFunction, - EdgeDirection direction) throws IllegalArgumentException { + EdgeDirection direction) throws IllegalArgumentException { switch (direction) { case IN: // create pairs @@ -1437,7 +1437,7 @@ public class Graph & Serializable, VV extends Serializab * @throws IllegalArgumentException */ public DataSet reduceOnNeighbors(ReduceNeighborsFunction reduceNeighborsFunction, - EdgeDirection direction) throws IllegalArgumentException { + EdgeDirection direction) throws IllegalArgumentException { switch (direction) { case IN: // create pairs @@ -1504,7 +1504,7 @@ public class Graph & Serializable, VV extends Serializab * @throws IllegalArgumentException */ public DataSet reduceOnEdges(ReduceEdgesFunction reduceEdgesFunction, - EdgeDirection direction) throws IllegalArgumentException { + EdgeDirection direction) throws IllegalArgumentException { switch (direction) { case IN: diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceEdgesFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceEdgesFunction.java index 0b5d2cf918d..53c7934e520 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceEdgesFunction.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceEdgesFunction.java @@ -18,6 +18,7 @@ package org.apache.flink.graph; +import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.java.tuple.Tuple2; import java.io.Serializable; @@ -30,7 +31,7 @@ import java.io.Serializable; * @param the edge value type */ public interface ReduceEdgesFunction & Serializable, - EV extends Serializable> { + EV extends Serializable> extends Function, Serializable { Tuple2> reduceEdges(Tuple2> firstEdge, Tuple2> secondEdge); } diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceNeighborsFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceNeighborsFunction.java index 50c0d355710..f5e978f093a 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceNeighborsFunction.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceNeighborsFunction.java @@ -36,5 +36,5 @@ public interface ReduceNeighborsFunction & Serializable EV extends Serializable> extends Function, Serializable { Tuple3, Vertex> reduceNeighbors(Tuple3, Vertex> firstNeighbor, - Tuple3, Vertex> secondNeighbor); + Tuple3, Vertex> secondNeighbor); } diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasureExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasureExample.java index c81aeb3b74e..2783a29ce22 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasureExample.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasureExample.java @@ -32,6 +32,7 @@ import org.apache.flink.graph.EdgesFunction; import org.apache.flink.graph.Triplet; import org.apache.flink.graph.example.utils.JaccardSimilarityMeasureData; import org.apache.flink.types.NullValue; +import org.apache.flink.util.Collector; import java.util.HashSet; @@ -68,7 +69,7 @@ public class JaccardSimilarityMeasureExample implements ProgramDescription { Graph graph = Graph.fromDataSet(edges, env); DataSet>> verticesWithNeighbors = - graph.reduceOnEdges(new GatherNeighbors(), EdgeDirection.ALL); + graph.groupReduceOnEdges(new GatherNeighbors(), EdgeDirection.ALL); Graph, Double> graphWithVertexValues = Graph.fromDataSet(verticesWithNeighbors, edges, env); @@ -106,7 +107,8 @@ public class JaccardSimilarityMeasureExample implements ProgramDescription { private static final class GatherNeighbors implements EdgesFunction>> { @Override - public Vertex> iterateEdges(Iterable>> edges) throws Exception { + public void iterateEdges(Iterable>> edges, + Collector>> out) throws Exception { HashSet neighborsHashSet = new HashSet(); long vertexId = -1; @@ -115,7 +117,7 @@ public class JaccardSimilarityMeasureExample implements ProgramDescription { neighborsHashSet.add(getNeighborID(edge)); vertexId = edge.f0; } - return new Vertex>(vertexId, neighborsHashSet); + out.collect(new Vertex>(vertexId, neighborsHashSet)); } } diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java index 2452cba0451..3ace49a3826 100644 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java @@ -78,9 +78,9 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase { env.execute(); expectedResult = "1,2\n" + - "2,3\n" + + "2,3\n" + "3,4\n" + - "4,5\n" + + "4,5\n" + "5,1\n"; } @@ -335,8 +335,8 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase { @Test public void testLowestWeightOutNeighborNoValue() throws Exception { /* - * Get the lowest-weight out-neighbor - * for each vertex + * Get the lowest-weight out of all the out-neighbors + * of each vertex */ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), @@ -347,33 +347,33 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase { verticesWithLowestOutNeighbor.writeAsCsv(resultPath); env.execute(); - expectedResult = "1,2\n" + - "2,3\n" + - "3,4\n" + - "4,5\n" + - "5,1\n"; + expectedResult = "1,12\n" + + "2,23\n" + + "3,34\n" + + "4,45\n" + + "5,51\n"; } @Test public void testLowestWeightInNeighborNoValue() throws Exception { /* - * Get the lowest-weight in-neighbor - * for each vertex + * Get the lowest-weight out of all the in-neighbors + * of each vertex */ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env); DataSet> verticesWithLowestOutNeighbor = - graph.groupReduceOnEdges(new SelectMinWeightInNeighborNoValue(), EdgeDirection.IN); + graph.reduceOnEdges(new SelectMinWeightInNeighborNoValue(), EdgeDirection.IN); verticesWithLowestOutNeighbor.writeAsCsv(resultPath); env.execute(); - expectedResult = "1,5\n" + - "2,1\n" + - "3,1\n" + - "4,3\n" + - "5,3\n"; + expectedResult = "1,51\n" + + "2,12\n" + + "3,13\n" + + "4,34\n" + + "5,35\n"; } @Test @@ -455,25 +455,6 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase { @SuppressWarnings("serial") private static final class SelectMaxWeightNeighborNoValue implements ReduceEdgesFunction { -// @Override -// public void iterateEdges(Iterable>> edges, -// Collector> out) throws Exception { -// -// long weight = Long.MIN_VALUE; -// long vertexId = -1; -// long i=0; -// -// for (Tuple2> edge: edges) { -// if (edge.f1.getValue() > weight) { -// weight = edge.f1.getValue(); -// } -// if (i==0) { -// vertexId = edge.f0; -// } i++; -// } -// out.collect(new Tuple2(vertexId, weight)); -// } - @Override public Tuple2> reduceEdges(Tuple2> firstEdge, Tuple2> secondEdge) { @@ -506,27 +487,16 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase { } @SuppressWarnings("serial") - private static final class SelectMinWeightInNeighborNoValue implements EdgesFunction> { + private static final class SelectMinWeightInNeighborNoValue implements ReduceEdgesFunction { @Override - public void iterateEdges(Iterable>> edges, - Collector> out) throws Exception { - - long weight = Long.MAX_VALUE; - long minNeighorId = 0; - long vertexId = -1; - long i=0; - - for (Tuple2> edge: edges) { - if (edge.f1.getValue() < weight) { - weight = edge.f1.getValue(); - minNeighorId = edge.f1.getSource(); - } - if (i==0) { - vertexId = edge.f0; - } i++; + public Tuple2> reduceEdges(Tuple2> firstEdge, + Tuple2> secondEdge) { + if(firstEdge.f1.getValue() < secondEdge.f1.getValue()) { + return firstEdge; + } else { + return secondEdge; } - out.collect(new Tuple2(vertexId, minNeighorId)); } } diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java index 5300d24405f..5f235699e5f 100644 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java @@ -20,7 +20,6 @@ package org.apache.flink.graph.test.operations; import java.util.Iterator; -import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; @@ -227,7 +226,7 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase { TestGraphUtils.getLongLongEdgeData(env), env); DataSet> verticesWithSum = - graph.reduceOnNeighbors(new SumInNeighborsNoValue(), EdgeDirection.IN); + graph.groupReduceOnNeighbors(new SumInNeighborsNoValue(), EdgeDirection.IN); verticesWithSum.writeAsCsv(resultPath); env.execute(); @@ -536,15 +535,21 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase { } @SuppressWarnings("serial") - private static final class SumInNeighborsNoValue implements ReduceNeighborsFunction { + private static final class SumInNeighborsNoValue implements NeighborsFunction> { @Override - public Tuple3, Vertex> reduceNeighbors(Tuple3, Vertex> firstNeighbor, - Tuple3, Vertex> secondNeighbor) { - long sum = firstNeighbor.f2.getValue() * firstNeighbor.f1.getValue() + - secondNeighbor.f2.getValue() * secondNeighbor.f1.getValue(); - return new Tuple3, Vertex>(firstNeighbor.f0, firstNeighbor.f1, - new Vertex(firstNeighbor.f0, sum)); + public void iterateNeighbors(Iterable, Vertex>> neighbors, + Collector> out) throws Exception { + long sum = 0; + Tuple3, Vertex> next = null; + Iterator, Vertex>> neighborsIterator = + neighbors.iterator(); + while(neighborsIterator.hasNext()) { + next = neighborsIterator.next(); + sum += next.f2.getValue() * next.f1.getValue(); + } + out.collect(new Tuple2(next.f0, sum)); } } -- GitLab