提交 e2ef74ea 编写于 作者: G Greg Hogan

[FLINK-4213] [gelly] Provide CombineHint in Gelly algorithms

This closes #2248
上级 54f02ec7
......@@ -21,9 +21,9 @@ package org.apache.flink.graph.asm.degree.annotate.directed;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
import org.apache.flink.api.java.tuple.Tuple3;
......@@ -122,7 +122,7 @@ extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Vertex<K, Degrees>> {
.setParallelism(parallelism)
.name("Emit and flip edge")
.groupBy(0, 1)
.reduce(new ReduceBitmask<K>())
.reduceGroup(new ReduceBitmask<K>())
.setParallelism(parallelism)
.name("Reduce bitmask");
......@@ -177,13 +177,23 @@ extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Vertex<K, Degrees>> {
*
* @param <T> ID type
*/
private static class ReduceBitmask<T>
implements ReduceFunction<Tuple3<T, T, ByteValue>> {
@ForwardedFields("0; 1")
private static final class ReduceBitmask<T>
implements GroupReduceFunction<Tuple3<T, T, ByteValue>, Tuple3<T, T, ByteValue>> {
@Override
public Tuple3<T, T, ByteValue> reduce(Tuple3<T, T, ByteValue> left, Tuple3<T, T, ByteValue> right)
public void reduce(Iterable<Tuple3<T, T, ByteValue>> values, Collector<Tuple3<T, T, ByteValue>> out)
throws Exception {
left.f2.setValue((byte)(left.f2.getValue() | right.f2.getValue()));
return left;
Tuple3<T, T, ByteValue> output = null;
byte bitmask = 0;
for (Tuple3<T, T, ByteValue> value: values) {
output = value;
bitmask |= value.f2.getValue();
}
output.f2.setValue(bitmask);
out.collect(output);
}
}
......
......@@ -18,6 +18,7 @@
package org.apache.flink.graph.asm.degree.annotate.directed;
import org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
......@@ -119,6 +120,7 @@ extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Vertex<K, LongValue>> {
DataSet<Vertex<K, LongValue>> targetDegree = targetIds
.groupBy(0)
.reduce(new DegreeCount<K>())
.setCombineHint(CombineHint.HASH)
.setParallelism(parallelism)
.name("Degree count");
......
......@@ -18,6 +18,7 @@
package org.apache.flink.graph.asm.degree.annotate.directed;
import org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
......@@ -119,6 +120,7 @@ extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Vertex<K, LongValue>> {
DataSet<Vertex<K, LongValue>> sourceDegree = sourceIds
.groupBy(0)
.reduce(new DegreeCount<K>())
.setCombineHint(CombineHint.HASH)
.setParallelism(parallelism)
.name("Degree count");
......
......@@ -19,6 +19,7 @@
package org.apache.flink.graph.asm.degree.annotate.undirected;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
import org.apache.flink.graph.Edge;
......@@ -143,6 +144,7 @@ extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Vertex<K, LongValue>> {
DataSet<Vertex<K, LongValue>> degree = vertexIds
.groupBy(0)
.reduce(new DegreeCount<K>())
.setCombineHint(CombineHint.HASH)
.setParallelism(parallelism)
.name("Degree count");
......
......@@ -50,12 +50,10 @@ public class GraphGeneratorUtils {
.setParallelism(parallelism)
.name("Vertex iterators");
DataSet<Vertex<LongValue,NullValue>> vertexSequence = vertexLabels
return vertexLabels
.map(new CreateVertex())
.setParallelism(parallelism)
.name("Vertex sequence");
return vertexSequence;
}
@ForwardedFields("*->f0")
......@@ -73,7 +71,7 @@ public class GraphGeneratorUtils {
}
}
/**************************************************************************/
// --------------------------------------------------------------------------------------------
/**
* Generates {@link Vertex Vertices} present in the given set of {@link Edge}s.
......@@ -84,7 +82,7 @@ public class GraphGeneratorUtils {
* @param <EV> edge value type
* @return {@link DataSet} of discovered {@link Vertex Vertices}
*
* @see {@link Graph#fromDataSet(DataSet, DataSet, ExecutionEnvironment)}
* @see Graph#fromDataSet(DataSet, DataSet, ExecutionEnvironment)
*/
public static <K,EV> DataSet<Vertex<K,NullValue>> vertexSet(DataSet<Edge<K,EV>> edges, int parallelism) {
DataSet<Vertex<K,NullValue>> vertexSet = edges
......@@ -92,16 +90,14 @@ public class GraphGeneratorUtils {
.setParallelism(parallelism)
.name("Emit source and target labels");
DataSet<Vertex<K,NullValue>> distinctVertexSet = vertexSet
return vertexSet
.distinct()
.setParallelism(parallelism)
.name("Emit vertex labels");
return distinctVertexSet;
}
/**
* @see {@link Graph.EmitSrcAndTarget}
* @see Graph.EmitSrcAndTarget
*/
private static final class EmitSrcAndTarget<K,EV>
implements FlatMapFunction<Edge<K,EV>, Vertex<K,NullValue>> {
......
......@@ -21,6 +21,7 @@ package org.apache.flink.graph.library.clustering.directed;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.functions.FunctionAnnotation;
import org.apache.flink.api.java.tuple.Tuple2;
......@@ -121,6 +122,7 @@ extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Result<K>> {
DataSet<Tuple2<K, LongValue>> vertexTriangleCount = triangleVertices
.groupBy(0)
.reduce(new CountTriangles<K>())
.setCombineHint(CombineHint.HASH)
.name("Count triangles");
// u, deg(u)
......
......@@ -21,6 +21,7 @@ package org.apache.flink.graph.library.clustering.undirected;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.functions.FunctionAnnotation;
import org.apache.flink.api.java.tuple.Tuple2;
......@@ -122,6 +123,7 @@ extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Result<K>> {
DataSet<Tuple2<K, LongValue>> vertexTriangleCount = triangleVertices
.groupBy(0)
.reduce(new CountTriangles<K>())
.setCombineHint(CombineHint.HASH)
.name("Count triangles");
// u, deg(u)
......
......@@ -26,6 +26,7 @@ import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichJoinFunction;
import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
import org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
......@@ -169,6 +170,7 @@ extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Result<K>> {
.name("Initial scores")
.groupBy(0)
.reduce(new SumScores<K>())
.setCombineHint(CombineHint.HASH)
.setParallelism(parallelism)
.name("Sum");
......@@ -185,6 +187,7 @@ extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Result<K>> {
.name("Hub")
.groupBy(0)
.reduce(new SumScore<K>())
.setCombineHint(CombineHint.HASH)
.setParallelism(parallelism)
.name("Sum");
......@@ -194,6 +197,7 @@ extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Result<K>> {
.setParallelism(parallelism)
.name("Square")
.reduce(new Sum())
.setCombineHint(CombineHint.HASH)
.setParallelism(parallelism)
.name("Sum");
......@@ -207,6 +211,7 @@ extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Result<K>> {
.name("Authority")
.groupBy(0)
.reduce(new SumScore<K>())
.setCombineHint(CombineHint.HASH)
.setParallelism(parallelism)
.name("Sum");
......@@ -216,6 +221,7 @@ extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Result<K>> {
.setParallelism(parallelism)
.name("Square")
.reduce(new Sum())
.setCombineHint(CombineHint.HASH)
.setParallelism(parallelism)
.name("Sum");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册