diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java index 363ad2e1512464ddd3b2566e14161cd855ce5023..9fef221b17b2a0e0897b5e0de95587ea81a84677 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java @@ -21,9 +21,9 @@ package org.apache.flink.graph.asm.degree.annotate.directed; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.functions.JoinFunction; -import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.operators.Order; import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst; import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond; import org.apache.flink.api.java.tuple.Tuple3; @@ -122,7 +122,7 @@ extends GraphAlgorithmDelegatingDataSet> { .setParallelism(parallelism) .name("Emit and flip edge") .groupBy(0, 1) - .reduce(new ReduceBitmask()) + .reduceGroup(new ReduceBitmask()) .setParallelism(parallelism) .name("Reduce bitmask"); @@ -177,13 +177,23 @@ extends GraphAlgorithmDelegatingDataSet> { * * @param ID type */ - private static class ReduceBitmask - implements ReduceFunction> { + @ForwardedFields("0; 1") + private static final class ReduceBitmask + implements GroupReduceFunction, Tuple3> { @Override - public Tuple3 reduce(Tuple3 left, Tuple3 right) + public void reduce(Iterable> values, Collector> out) throws Exception { - left.f2.setValue((byte)(left.f2.getValue() | right.f2.getValue())); - return left; + Tuple3 output = null; + + byte bitmask = 0; + + for (Tuple3 value: values) { + output = value; + bitmask |= value.f2.getValue(); + } + + output.f2.setValue(bitmask); + out.collect(output); } } diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java index 75f23699684544c2739af4bef30ab6325e10c487..f7ac18b6054e114deddbce937e386eee820102d8 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java @@ -18,6 +18,7 @@ package org.apache.flink.graph.asm.degree.annotate.directed; +import org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint; import org.apache.flink.api.java.DataSet; import org.apache.flink.graph.Graph; import org.apache.flink.graph.Vertex; @@ -119,6 +120,7 @@ extends GraphAlgorithmDelegatingDataSet> { DataSet> targetDegree = targetIds .groupBy(0) .reduce(new DegreeCount()) + .setCombineHint(CombineHint.HASH) .setParallelism(parallelism) .name("Degree count"); diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java index b0576f8a2c13dfda374fe63e238af188eab805f7..e235f6aa3654c44843b4a316c05ba36502bdd58d 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java @@ -18,6 +18,7 @@ package org.apache.flink.graph.asm.degree.annotate.directed; +import org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint; import org.apache.flink.api.java.DataSet; import org.apache.flink.graph.Graph; import org.apache.flink.graph.Vertex; @@ -119,6 +120,7 @@ extends GraphAlgorithmDelegatingDataSet> { DataSet> sourceDegree = sourceIds .groupBy(0) .reduce(new DegreeCount()) + .setCombineHint(CombineHint.HASH) .setParallelism(parallelism) .name("Degree count"); diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java index ec72222f73e3fbb697341a059251e3dd02c47a2f..42f084d7f872f09400d5fe4a379012a3626f98b4 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java @@ -19,6 +19,7 @@ package org.apache.flink.graph.asm.degree.annotate.undirected; import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint; import org.apache.flink.api.java.DataSet; import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet; import org.apache.flink.graph.Edge; @@ -143,6 +144,7 @@ extends GraphAlgorithmDelegatingDataSet> { DataSet> degree = vertexIds .groupBy(0) .reduce(new DegreeCount()) + .setCombineHint(CombineHint.HASH) .setParallelism(parallelism) .name("Degree count"); diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGeneratorUtils.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGeneratorUtils.java index a7b5ce9ecdfbe9cbb6fdaeb598234f416faac63f..01cb2d1cfcf87d491af8109f2c94e432642433ba 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGeneratorUtils.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGeneratorUtils.java @@ -50,12 +50,10 @@ public class GraphGeneratorUtils { .setParallelism(parallelism) .name("Vertex iterators"); - DataSet> vertexSequence = vertexLabels + return vertexLabels .map(new CreateVertex()) .setParallelism(parallelism) .name("Vertex sequence"); - - return vertexSequence; } @ForwardedFields("*->f0") @@ -73,7 +71,7 @@ public class GraphGeneratorUtils { } } - /**************************************************************************/ + // -------------------------------------------------------------------------------------------- /** * Generates {@link Vertex Vertices} present in the given set of {@link Edge}s. @@ -84,7 +82,7 @@ public class GraphGeneratorUtils { * @param edge value type * @return {@link DataSet} of discovered {@link Vertex Vertices} * - * @see {@link Graph#fromDataSet(DataSet, DataSet, ExecutionEnvironment)} + * @see Graph#fromDataSet(DataSet, DataSet, ExecutionEnvironment) */ public static DataSet> vertexSet(DataSet> edges, int parallelism) { DataSet> vertexSet = edges @@ -92,16 +90,14 @@ public class GraphGeneratorUtils { .setParallelism(parallelism) .name("Emit source and target labels"); - DataSet> distinctVertexSet = vertexSet + return vertexSet .distinct() .setParallelism(parallelism) .name("Emit vertex labels"); - - return distinctVertexSet; } /** - * @see {@link Graph.EmitSrcAndTarget} + * @see Graph.EmitSrcAndTarget */ private static final class EmitSrcAndTarget implements FlatMapFunction, Vertex> { diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java index 537ad0fcded9e177f49bd3806e8a713c36f29b1d..e0defcd585de17423dd27614a0034ac46071f9d4 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java @@ -21,6 +21,7 @@ package org.apache.flink.graph.library.clustering.directed; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.functions.FunctionAnnotation; import org.apache.flink.api.java.tuple.Tuple2; @@ -121,6 +122,7 @@ extends GraphAlgorithmDelegatingDataSet> { DataSet> vertexTriangleCount = triangleVertices .groupBy(0) .reduce(new CountTriangles()) + .setCombineHint(CombineHint.HASH) .name("Count triangles"); // u, deg(u) diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java index 8f707fd37d6683c02356781d18e026ab609cd1f1..cd859d965406db8575a9e73d16cb58a3a187e75d 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java @@ -21,6 +21,7 @@ package org.apache.flink.graph.library.clustering.undirected; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.functions.FunctionAnnotation; import org.apache.flink.api.java.tuple.Tuple2; @@ -122,6 +123,7 @@ extends GraphAlgorithmDelegatingDataSet> { DataSet> vertexTriangleCount = triangleVertices .groupBy(0) .reduce(new CountTriangles()) + .setCombineHint(CombineHint.HASH) .name("Count triangles"); // u, deg(u) diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java index b88badbebb5fd798f3d6006e842af3a24d86f688..60e99bd80d4395f00a99cf54f59fb32af63f0d1f 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java @@ -26,6 +26,7 @@ import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.functions.RichJoinFunction; import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint; +import org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst; @@ -169,6 +170,7 @@ extends GraphAlgorithmDelegatingDataSet> { .name("Initial scores") .groupBy(0) .reduce(new SumScores()) + .setCombineHint(CombineHint.HASH) .setParallelism(parallelism) .name("Sum"); @@ -185,6 +187,7 @@ extends GraphAlgorithmDelegatingDataSet> { .name("Hub") .groupBy(0) .reduce(new SumScore()) + .setCombineHint(CombineHint.HASH) .setParallelism(parallelism) .name("Sum"); @@ -194,6 +197,7 @@ extends GraphAlgorithmDelegatingDataSet> { .setParallelism(parallelism) .name("Square") .reduce(new Sum()) + .setCombineHint(CombineHint.HASH) .setParallelism(parallelism) .name("Sum"); @@ -207,6 +211,7 @@ extends GraphAlgorithmDelegatingDataSet> { .name("Authority") .groupBy(0) .reduce(new SumScore()) + .setCombineHint(CombineHint.HASH) .setParallelism(parallelism) .name("Sum"); @@ -216,6 +221,7 @@ extends GraphAlgorithmDelegatingDataSet> { .setParallelism(parallelism) .name("Square") .reduce(new Sum()) + .setCombineHint(CombineHint.HASH) .setParallelism(parallelism) .name("Sum");