diff --git a/flink-staging/flink-gelly/pom.xml b/flink-staging/flink-gelly/pom.xml index a36ab4bcc715fa4b93ba0cf9428741d0553d8ec1..524566744d077293b961e1bb926dffd898c0bcab 100644 --- a/flink-staging/flink-gelly/pom.xml +++ b/flink-staging/flink-gelly/pom.xml @@ -57,4 +57,35 @@ under the License. ${guava.version} + + + + + hadoop-1 + + + + hadoop.profile1 + + + + + + com.google.guava + guava + ${guava.version} + provided + + + + + hadoop-2 + + + + !hadoop.profile + + + + diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java old mode 100644 new mode 100755 index 62173e3c6e730863fe353545c4c725105817fe15..2a66acad3e64557e7dfb0ce27bfaaa68c87fa77f --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java @@ -43,15 +43,18 @@ import org.apache.flink.api.java.operators.DeltaIteration; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.graph.spargel.IterationConfiguration; +import org.apache.flink.graph.gsa.ApplyFunction; +import org.apache.flink.graph.gsa.GatherFunction; +import org.apache.flink.graph.gsa.GatherSumApplyIteration; +import org.apache.flink.graph.gsa.SumFunction; import org.apache.flink.graph.spargel.MessagingFunction; import org.apache.flink.graph.spargel.VertexCentricIteration; import org.apache.flink.graph.spargel.VertexUpdateFunction; import org.apache.flink.graph.utils.EdgeToTuple3Map; +import org.apache.flink.graph.utils.GraphUtils; import org.apache.flink.graph.utils.Tuple2ToVertexMap; import org.apache.flink.graph.utils.Tuple3ToEdgeMap; import org.apache.flink.graph.utils.VertexToTuple2Map; @@ -79,7 +82,8 @@ public class Graph & Serializable, VV extends Serializab private final DataSet> edges; /** - * Creates a graph from two DataSets: vertices and edges + * Creates a graph from two DataSets: vertices and edges and allow setting + * the undirected property * * @param vertices a DataSet of vertices. * @param edges a DataSet of edges. @@ -347,7 +351,7 @@ public class Graph & Serializable, VV extends Serializab @Override public void join(Tuple4 tripletWithSrcValSet, - Vertex vertex, Collector> collector) throws Exception { + Vertex vertex, Collector> collector) throws Exception { collector.collect(new Triplet(tripletWithSrcValSet.f0, tripletWithSrcValSet.f1, tripletWithSrcValSet.f2, vertex.getValue(), tripletWithSrcValSet.f3)); @@ -914,7 +918,7 @@ public class Graph & Serializable, VV extends Serializab } /** - * @return a long integer representing the number of edges + * @return Singleton DataSet containing the edge count */ public long numberOfEdges() throws Exception { return edges.count(); @@ -1011,6 +1015,13 @@ public class Graph & Serializable, VV extends Serializab } } + private static final class CheckIfOneComponentMapper implements MapFunction { + @Override + public Boolean map(Integer n) { + return (n == 1); + } + } + /** * Adds the input vertex and edges to the graph. If the vertex already * exists in the graph, it will not be added again, but the given edges @@ -1165,7 +1176,7 @@ public class Graph & Serializable, VV extends Serializab int maximumNumberOfIterations) { return this.runVertexCentricIteration(vertexUpdateFunction, messagingFunction, - maximumNumberOfIterations, null); + maximumNumberOfIterations, null); } /** @@ -1397,4 +1408,4 @@ public class Graph & Serializable, VV extends Serializab return TypeExtractor.createTypeInfo(NeighborsFunctionWithVertexValue.class, function.getClass(), 3, null, null); } } -} +} \ No newline at end of file diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAGreedyGraphColoringExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAGreedyGraphColoringExample.java new file mode 100755 index 0000000000000000000000000000000000000000..4acd86c9a5ea6c8c1bd1d588bbfaa8699134ffb7 --- /dev/null +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAGreedyGraphColoringExample.java @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.example; + +import org.apache.flink.api.common.ProgramDescription; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.gsa.ApplyFunction; +import org.apache.flink.graph.gsa.GatherFunction; +import org.apache.flink.graph.gsa.GatherSumApplyIteration; +import org.apache.flink.graph.gsa.SumFunction; +import org.apache.flink.graph.gsa.RichEdge; +import org.apache.flink.util.Collector; + +import java.util.HashSet; + +/** + * This is an implementation of the Greedy Graph Coloring algorithm, using a gather-sum-apply iteration + */ +public class GSAGreedyGraphColoringExample implements ProgramDescription { + + // -------------------------------------------------------------------------------------------- + // Program + // -------------------------------------------------------------------------------------------- + + public static void main(String[] args) throws Exception { + + if (!parseParameters(args)) { + return; + } + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet> vertices = getVertexDataSet(env); + DataSet> edges = getEdgeDataSet(env); + + Graph graph = Graph.fromDataSet(vertices, edges, env); + + // Gather the target vertices into a one-element set + GatherFunction> gather = new GreedyGraphColoringGather(); + + // Merge the sets between neighbors + SumFunction> sum = new GreedyGraphColoringSum(); + + // Find the minimum vertex id in the set which will be propagated + ApplyFunction> apply = new GreedyGraphColoringApply(); + + // Execute the GSA iteration + GatherSumApplyIteration> iteration = + graph.createGatherSumApplyIteration(gather, sum, apply, maxIterations); + Graph result = graph.runGatherSumApplyIteration(iteration); + + // Extract the vertices as the result + DataSet> greedyGraphColoring = result.getVertices(); + + // emit result + if (fileOutput) { + greedyGraphColoring.writeAsCsv(outputPath, "\n", " "); + } else { + greedyGraphColoring.print(); + } + + env.execute("GSA Greedy Graph Coloring"); + } + + // -------------------------------------------------------------------------------------------- + // Greedy Graph Coloring UDFs + // -------------------------------------------------------------------------------------------- + + private static final class GreedyGraphColoringGather + extends GatherFunction> { + @Override + public HashSet gather(RichEdge richEdge) { + + HashSet result = new HashSet(); + result.add(richEdge.getSrcVertexValue()); + + return result; + } + }; + + private static final class GreedyGraphColoringSum + extends SumFunction> { + @Override + public HashSet sum(HashSet newValue, HashSet currentValue) { + + HashSet result = new HashSet(); + result.addAll(newValue); + result.addAll(currentValue); + + return result; + } + }; + + private static final class GreedyGraphColoringApply + extends ApplyFunction> { + @Override + public void apply(HashSet set, Double src) { + double minValue = src; + for (Double d : set) { + if (d < minValue) { + minValue = d; + } + } + + // This is the condition that enables the termination of the iteration + if (minValue < src) { + setResult(minValue); + } + } + }; + + // -------------------------------------------------------------------------------------------- + // Util methods + // -------------------------------------------------------------------------------------------- + + private static boolean fileOutput = false; + private static String vertexInputPath = null; + private static String edgeInputPath = null; + private static String outputPath = null; + + private static int maxIterations = 16; + + private static boolean parseParameters(String[] args) { + + if(args.length > 0) { + // parse input arguments + fileOutput = true; + + if(args.length != 4) { + System.err.println("Usage: GSAGreedyGraphColoringExample " + + " "); + return false; + } + + vertexInputPath = args[0]; + edgeInputPath = args[1]; + outputPath = args[2]; + maxIterations = Integer.parseInt(args[3]); + } else { + System.out.println("Executing GSA Greedy Graph Coloring example with built-in default data."); + System.out.println(" Provide parameters to read input data from files."); + System.out.println(" See the documentation for the correct format of input files."); + System.out.println(" Usage: GSAGreedyGraphColoringExample " + + " "); + } + return true; + } + + private static DataSet> getVertexDataSet(ExecutionEnvironment env) { + if(fileOutput) { + return env + .readCsvFile(vertexInputPath) + .fieldDelimiter(" ") + .lineDelimiter("\n") + .types(Long.class, Double.class) + .map(new MapFunction, Vertex>() { + @Override + public Vertex map(Tuple2 value) throws Exception { + return new Vertex(value.f0, value.f1); + } + }); + } + + return env.generateSequence(0, 5).map(new MapFunction>() { + @Override + public Vertex map(Long value) throws Exception { + return new Vertex(value, (double) value); + } + }); + } + + private static DataSet> getEdgeDataSet(ExecutionEnvironment env) { + if(fileOutput) { + return env.readCsvFile(edgeInputPath) + .fieldDelimiter(" ") + .lineDelimiter("\n") + .types(Long.class, Long.class, Double.class) + .map(new MapFunction, Edge>() { + @Override + public Edge map(Tuple3 value) throws Exception { + return new Edge(value.f0, value.f1, value.f2); + } + }); + } + + return env.generateSequence(0, 5).flatMap(new FlatMapFunction>() { + @Override + public void flatMap(Long value, Collector> out) throws Exception { + out.collect(new Edge(value, (value + 1) % 6, 0.0)); + out.collect(new Edge(value, (value + 2) % 6, 0.0)); + } + }); + } + + @Override + public String getDescription() { + return "GSA Greedy Graph Coloring"; + } + +} diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java new file mode 100755 index 0000000000000000000000000000000000000000..9c8328ba36a934804c76a7e1b8f3df0fbf99cbc1 --- /dev/null +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.example; + +import org.apache.flink.api.common.ProgramDescription; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData; +import org.apache.flink.graph.gsa.ApplyFunction; +import org.apache.flink.graph.gsa.GatherFunction; +import org.apache.flink.graph.gsa.GatherSumApplyIteration; +import org.apache.flink.graph.gsa.SumFunction; +import org.apache.flink.graph.gsa.RichEdge; + +import java.io.Serializable; + +/** + * This is an implementation of the Single Source Shortest Paths algorithm, using a gather-sum-apply iteration + */ +public class GSASingleSourceShortestPathsExample implements ProgramDescription { + + // -------------------------------------------------------------------------------------------- + // Program + // -------------------------------------------------------------------------------------------- + + public static void main(String[] args) throws Exception { + + if(!parseParameters(args)) { + return; + } + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet> vertices = getVertexDataSet(env); + DataSet> edges = getEdgeDataSet(env); + + Graph graph = Graph.fromDataSet(vertices, edges, env); + + // The path from src to trg through edge e costs src + e + GatherFunction gather = new SingleSourceShortestPathGather(); + + // Return the smaller path length to minimize distance + SumFunction sum = new SingleSourceShortestPathSum(); + + // Iterate as long as the distance is updated + ApplyFunction apply = new SingleSourceShortestPathApply(); + + // Execute the GSA iteration + GatherSumApplyIteration iteration = graph.createGatherSumApplyIteration( + gather, sum, apply, maxIterations); + Graph result = graph.mapVertices(new InitVerticesMapper(srcVertexId)) + .runGatherSumApplyIteration(iteration); + + // Extract the vertices as the result + DataSet> singleSourceShortestPaths = result.getVertices(); + + // emit result + if(fileOutput) { + singleSourceShortestPaths.writeAsCsv(outputPath, "\n", " "); + } else { + singleSourceShortestPaths.print(); + } + + env.execute("GSA Single Source Shortest Paths Example"); + } + + public static final class InitVerticesMapper & Serializable> + implements MapFunction, Double> { + + private K srcVertexId; + + public InitVerticesMapper(K srcId) { + this.srcVertexId = srcId; + } + + public Double map(Vertex value) { + if (value.f0.equals(srcVertexId)) { + return 0.0; + } else { + return Double.POSITIVE_INFINITY; + } + } + } + + // -------------------------------------------------------------------------------------------- + // Single Source Shortest Path UDFs + // -------------------------------------------------------------------------------------------- + + private static final class SingleSourceShortestPathGather + extends GatherFunction { + @Override + public Double gather(RichEdge richEdge) { + return richEdge.getSrcVertexValue() + richEdge.getEdgeValue(); + } + }; + + private static final class SingleSourceShortestPathSum + extends SumFunction { + @Override + public Double sum(Double newValue, Double currentValue) { + return Math.min(newValue, currentValue); + } + }; + + private static final class SingleSourceShortestPathApply + extends ApplyFunction { + @Override + public void apply(Double summed, Double target) { + if (summed < target) { + setResult(summed); + } + } + }; + + // -------------------------------------------------------------------------------------------- + // Util methods + // -------------------------------------------------------------------------------------------- + + private static boolean fileOutput = false; + private static String vertexInputPath = null; + private static String edgeInputPath = null; + private static String outputPath = null; + + private static int maxIterations = 2; + private static long srcVertexId = 1; + + private static boolean parseParameters(String[] args) { + + if(args.length > 0) { + // parse input arguments + fileOutput = true; + + if(args.length != 5) { + System.err.println("Usage: GSASingleSourceShortestPathsExample " + + " "); + return false; + } + + vertexInputPath = args[0]; + edgeInputPath = args[1]; + outputPath = args[2]; + srcVertexId = Long.parseLong(args[3]); + maxIterations = Integer.parseInt(args[4]); + } else { + System.out.println("Executing GSA Single Source Shortest Paths example with built-in default data."); + System.out.println(" Provide parameters to read input data from files."); + System.out.println(" See the documentation for the correct format of input files."); + System.out.println(" Usage: GSASingleSourceShortestPathsExample " + + " "); + } + return true; + } + + private static DataSet> getVertexDataSet(ExecutionEnvironment env) { + if(fileOutput) { + return env + .readCsvFile(vertexInputPath) + .fieldDelimiter(" ") + .lineDelimiter("\n") + .types(Long.class, Double.class) + .map(new MapFunction, Vertex>() { + @Override + public Vertex map(Tuple2 value) throws Exception { + return new Vertex(value.f0, value.f1); + } + }); + } else { + return SingleSourceShortestPathsData.getDefaultVertexDataSet(env); + } + } + + private static DataSet> getEdgeDataSet(ExecutionEnvironment env) { + if(fileOutput) { + return env.readCsvFile(edgeInputPath) + .fieldDelimiter(" ") + .lineDelimiter("\n") + .types(Long.class, Long.class, Double.class) + .map(new MapFunction, Edge>() { + @Override + public Edge map(Tuple3 value) throws Exception { + return new Edge(value.f0, value.f1, value.f2); + } + }); + } else { + return SingleSourceShortestPathsData.getDefaultEdgeDataSet(env); + } + } + + @Override + public String getDescription() { + return "GSA Single Source Shortest Paths"; + } +} diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java new file mode 100755 index 0000000000000000000000000000000000000000..d863da1e55926cbfa092110fc60b2f74f096302c --- /dev/null +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.gsa; + +import org.apache.flink.api.common.functions.IterationRuntimeContext; +import org.apache.flink.util.Collector; + +import java.io.Serializable; + +public abstract class ApplyFunction implements Serializable { + + public abstract void apply(M message, VV vertexValue); + + /** + * Sets the result for the apply function + * + * @param result the result of the apply phase + */ + public void setResult(VV result) { + out.collect(result); + } + + /** + * This method is executed once per superstep before the vertex update function is invoked for each vertex. + * + * @throws Exception Exceptions in the pre-superstep phase cause the superstep to fail. + */ + public void preSuperstep() {}; + + /** + * This method is executed once per superstep after the vertex update function has been invoked for each vertex. + * + * @throws Exception Exceptions in the post-superstep phase cause the superstep to fail. + */ + public void postSuperstep() {}; + + // -------------------------------------------------------------------------------------------- + // Internal methods + // -------------------------------------------------------------------------------------------- + + private IterationRuntimeContext runtimeContext; + + private Collector out; + + public void init(IterationRuntimeContext iterationRuntimeContext) { + this.runtimeContext = iterationRuntimeContext; + }; + + public void setOutput(Collector out) { + this.out = out; + } + +} diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java new file mode 100755 index 0000000000000000000000000000000000000000..91a468de55d9224532244366340b60c07453dd1d --- /dev/null +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.gsa; + +import org.apache.flink.api.common.functions.IterationRuntimeContext; + +import java.io.Serializable; + +public abstract class GatherFunction implements Serializable { + + public abstract M gather(RichEdge richEdge); + + /** + * This method is executed once per superstep before the vertex update function is invoked for each vertex. + * + * @throws Exception Exceptions in the pre-superstep phase cause the superstep to fail. + */ + public void preSuperstep() {}; + + /** + * This method is executed once per superstep after the vertex update function has been invoked for each vertex. + * + * @throws Exception Exceptions in the post-superstep phase cause the superstep to fail. + */ + public void postSuperstep() {}; + + // -------------------------------------------------------------------------------------------- + // Internal methods + // -------------------------------------------------------------------------------------------- + + private IterationRuntimeContext runtimeContext; + + public void init(IterationRuntimeContext iterationRuntimeContext) { + this.runtimeContext = iterationRuntimeContext; + }; +} diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java new file mode 100755 index 0000000000000000000000000000000000000000..426efbb3ac48d4a5c8697d1ad2b629c1fbbf5351 --- /dev/null +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.gsa; + +import org.apache.commons.lang3.Validate; +import org.apache.flink.api.common.functions.RichFlatJoinFunction; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.functions.RichReduceFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.operators.CustomUnaryOperation; +import org.apache.flink.api.java.operators.DeltaIteration; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Vertex; +import org.apache.flink.util.Collector; + +import java.io.Serializable; + +/** + * This class represents iterative graph computations, programmed in a gather-sum-apply perspective. + * + * @param The type of the vertex key in the graph + * @param The type of the vertex value in the graph + * @param The type of the edge value in the graph + * @param The intermediate type used by the gather, sum and apply functions + */ +public class GatherSumApplyIteration & Serializable, + VV extends Serializable, EV extends Serializable, M> implements CustomUnaryOperation, + Vertex> { + + private DataSet> vertexDataSet; + private DataSet> edgeDataSet; + + private final GatherFunction gather; + private final SumFunction sum; + private final ApplyFunction apply; + private final int maximumNumberOfIterations; + + private String name; + private int parallelism = -1; + + // ---------------------------------------------------------------------------------- + + private GatherSumApplyIteration(GatherFunction gather, SumFunction sum, + ApplyFunction apply, DataSet> edges, int maximumNumberOfIterations) { + + Validate.notNull(gather); + Validate.notNull(sum); + Validate.notNull(apply); + Validate.notNull(edges); + Validate.isTrue(maximumNumberOfIterations > 0, "The maximum number of iterations must be at least one."); + + this.gather = gather; + this.sum = sum; + this.apply = apply; + this.edgeDataSet = edges; + this.maximumNumberOfIterations = maximumNumberOfIterations; + } + + + /** + * Sets the name for the gather-sum-apply iteration. The name is displayed in logs and messages. + * + * @param name The name for the iteration. + */ + public void setName(String name) { + this.name = name; + } + + /** + * Gets the name from this gather-sum-apply iteration. + * + * @return The name of the iteration. + */ + public String getName() { + return name; + } + + /** + * Sets the degree of parallelism for the iteration. + * + * @param parallelism The degree of parallelism. + */ + public void setParallelism(int parallelism) { + Validate.isTrue(parallelism > 0 || parallelism == -1, + "The degree of parallelism must be positive, or -1 (use default)."); + this.parallelism = parallelism; + } + + /** + * Gets the iteration's degree of parallelism. + * + * @return The iterations parallelism, or -1, if not set. + */ + public int getParallelism() { + return parallelism; + } + + // -------------------------------------------------------------------------------------------- + // Custom Operator behavior + // -------------------------------------------------------------------------------------------- + + /** + * Sets the input data set for this operator. In the case of this operator this input data set represents + * the set of vertices with their initial state. + * + * @param dataSet The input data set, which in the case of this operator represents the set of + * vertices with their initial state. + */ + @Override + public void setInput(DataSet> dataSet) { + this.vertexDataSet = dataSet; + } + + /** + * Computes the results of the gather-sum-apply iteration + * + * @return The resulting DataSet + */ + @Override + public DataSet> createResult() { + if (vertexDataSet == null) { + throw new IllegalStateException("The input data set has not been set."); + } + + // Prepare type information + TypeInformation keyType = ((TupleTypeInfo) vertexDataSet.getType()).getTypeAt(0); + TypeInformation messageType = TypeExtractor.createTypeInfo(GatherFunction.class, gather.getClass(), 2, null, null); + TypeInformation> innerType = new TupleTypeInfo>(keyType, messageType); + TypeInformation> outputType = vertexDataSet.getType(); + + // Prepare UDFs + GatherUdf gatherUdf = new GatherUdf(gather, innerType); + SumUdf sumUdf = new SumUdf(sum, innerType); + ApplyUdf applyUdf = new ApplyUdf(apply, outputType); + + final int[] zeroKeyPos = new int[] {0}; + final DeltaIteration, Vertex> iteration = + vertexDataSet.iterateDelta(vertexDataSet, maximumNumberOfIterations, zeroKeyPos); + + // Prepare the rich edges + DataSet, Edge>> richEdges = iteration + .getWorkset() + .join(edgeDataSet) + .where(0) + .equalTo(0); + + // Gather, sum and apply + DataSet> gatheredSet = richEdges.map(gatherUdf); + DataSet> summedSet = gatheredSet.groupBy(0).reduce(sumUdf); + DataSet> appliedSet = summedSet + .join(iteration.getSolutionSet()) + .where(0) + .equalTo(0) + .with(applyUdf); + + return iteration.closeWith(appliedSet, appliedSet); + } + + /** + * Creates a new gather-sum-apply iteration operator for graphs + * + * @param edges The edge DataSet + * + * @param gather The gather function of the GSA iteration + * @param sum The sum function of the GSA iteration + * @param apply The apply function of the GSA iteration + * + * @param maximumNumberOfIterations The maximum number of iterations executed + * + * @param The type of the vertex key in the graph + * @param The type of the vertex value in the graph + * @param The type of the edge value in the graph + * @param The intermediate type used by the gather, sum and apply functions + * + * @return An in stance of the gather-sum-apply graph computation operator. + */ + public static final & Serializable, VV extends Serializable, EV extends Serializable, M> + GatherSumApplyIteration withEdges(DataSet> edges, + GatherFunction gather, SumFunction sum, ApplyFunction apply, + int maximumNumberOfIterations) { + return new GatherSumApplyIteration(gather, sum, apply, edges, maximumNumberOfIterations); + } + + // -------------------------------------------------------------------------------------------- + // Wrapping UDFs + // -------------------------------------------------------------------------------------------- + + private static final class GatherUdf & Serializable, VV extends Serializable, + EV extends Serializable, M> extends RichMapFunction, Edge>, + Tuple2> implements ResultTypeQueryable> { + + private final GatherFunction gatherFunction; + private transient TypeInformation> resultType; + + private GatherUdf(GatherFunction gatherFunction, TypeInformation> resultType) { + this.gatherFunction = gatherFunction; + this.resultType = resultType; + } + + @Override + public Tuple2 map(Tuple2, Edge> richEdge) throws Exception { + RichEdge userRichEdge = new RichEdge(richEdge.f0.getValue(), + richEdge.f1.getValue()); + + K key = richEdge.f1.getTarget(); + M result = this.gatherFunction.gather(userRichEdge); + return new Tuple2(key, result); + } + + @Override + public void open(Configuration parameters) throws Exception { + if (getIterationRuntimeContext().getSuperstepNumber() == 1) { + this.gatherFunction.init(getIterationRuntimeContext()); + } + this.gatherFunction.preSuperstep(); + } + + @Override + public void close() throws Exception { + this.gatherFunction.postSuperstep(); + } + + @Override + public TypeInformation> getProducedType() { + return this.resultType; + } + } + + private static final class SumUdf & Serializable, VV extends Serializable, + EV extends Serializable, M> extends RichReduceFunction> + implements ResultTypeQueryable>{ + + private final SumFunction sumFunction; + private transient TypeInformation> resultType; + + private SumUdf(SumFunction sumFunction, TypeInformation> resultType) { + this.sumFunction = sumFunction; + this.resultType = resultType; + } + + @Override + public Tuple2 reduce(Tuple2 arg0, Tuple2 arg1) throws Exception { + K key = arg0.f0; + M result = this.sumFunction.sum(arg0.f1, arg1.f1); + return new Tuple2(key, result); + } + + @Override + public void open(Configuration parameters) throws Exception { + if (getIterationRuntimeContext().getSuperstepNumber() == 1) { + this.sumFunction.init(getIterationRuntimeContext()); + } + this.sumFunction.preSuperstep(); + } + + @Override + public void close() throws Exception { + this.sumFunction.postSuperstep(); + } + + @Override + public TypeInformation> getProducedType() { + return this.resultType; + } + } + + private static final class ApplyUdf & Serializable, + VV extends Serializable, EV extends Serializable, M> extends RichFlatJoinFunction, + Vertex, Vertex> implements ResultTypeQueryable> { + + private final ApplyFunction applyFunction; + private transient TypeInformation> resultType; + + private ApplyUdf(ApplyFunction applyFunction, TypeInformation> resultType) { + this.applyFunction = applyFunction; + this.resultType = resultType; + } + + @Override + public void join(Tuple2 arg0, Vertex arg1, final Collector> out) throws Exception { + + final K key = arg1.getId(); + Collector userOut = new Collector() { + @Override + public void collect(VV record) { + out.collect(new Vertex(key, record)); + } + + @Override + public void close() { + out.close(); + } + }; + + this.applyFunction.setOutput(userOut); + this.applyFunction.apply(arg0.f1, arg1.getValue()); + } + + @Override + public void open(Configuration parameters) throws Exception { + if (getIterationRuntimeContext().getSuperstepNumber() == 1) { + this.applyFunction.init(getIterationRuntimeContext()); + } + this.applyFunction.preSuperstep(); + } + + @Override + public void close() throws Exception { + this.applyFunction.postSuperstep(); + } + + @Override + public TypeInformation> getProducedType() { + return this.resultType; + } + } + +} diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/RichEdge.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/RichEdge.java new file mode 100755 index 0000000000000000000000000000000000000000..9befccba13e418220f89d1196ba9cf1ac8153cdb --- /dev/null +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/RichEdge.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.gsa; + +import org.apache.flink.api.java.tuple.Tuple2; + +import java.io.Serializable; + +/** + * A wrapper around Tuple3 for convenience in the GatherFunction + * @param the vertex value type + * @param the edge value type + */ +public class RichEdge + extends Tuple2 { + + public RichEdge() {} + + public RichEdge(VV src, EV edge) { + super(src, edge); + } + + public VV getSrcVertexValue() { + return this.f0; + } + + public EV getEdgeValue() { + return this.f1; + } +} diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java new file mode 100755 index 0000000000000000000000000000000000000000..b616194fb218c1ecdbb2006d6a31a371e84d50ae --- /dev/null +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.gsa; + +import org.apache.flink.api.common.functions.IterationRuntimeContext; + +import java.io.Serializable; + +public abstract class SumFunction implements Serializable { + + public abstract M sum(M arg0, M arg1); + + /** + * This method is executed once per superstep before the vertex update function is invoked for each vertex. + * + * @throws Exception Exceptions in the pre-superstep phase cause the superstep to fail. + */ + public void preSuperstep() {}; + + /** + * This method is executed once per superstep after the vertex update function has been invoked for each vertex. + * + * @throws Exception Exceptions in the post-superstep phase cause the superstep to fail. + */ + public void postSuperstep() {}; + + // -------------------------------------------------------------------------------------------- + // Internal methods + // -------------------------------------------------------------------------------------------- + + private IterationRuntimeContext runtimeContext; + + public void init(IterationRuntimeContext iterationRuntimeContext) { + this.runtimeContext = iterationRuntimeContext; + }; +} diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java new file mode 100755 index 0000000000000000000000000000000000000000..321d5306dcaba31d22c364358cf5e437683026dc --- /dev/null +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.test; + +import com.google.common.base.Charsets; +import com.google.common.io.Files; +import org.apache.flink.graph.example.GSAGreedyGraphColoringExample; +import org.apache.flink.graph.example.GSASingleSourceShortestPathsExample; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.File; + +@RunWith(Parameterized.class) +public class GatherSumApplyITCase extends MultipleProgramsTestBase { + + public GatherSumApplyITCase(TestExecutionMode mode){ + super(mode); + } + + private String verticesPath; + private String edgesPath; + private String resultPath; + private String expectedResult; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + public void before() throws Exception{ + resultPath = tempFolder.newFile().toURI().toString(); + File verticesFile = tempFolder.newFile(); + Files.write(GatherSumApplyITCase.VERTICES, verticesFile, Charsets.UTF_8); + + File edgesFile = tempFolder.newFile(); + Files.write(GatherSumApplyITCase.EDGES, edgesFile, Charsets.UTF_8); + + verticesPath = verticesFile.toURI().toString(); + edgesPath = edgesFile.toURI().toString(); + + } + + @After + public void after() throws Exception{ + compareResultsByLinesInMemory(expectedResult, resultPath); + } + + // -------------------------------------------------------------------------------------------- + // Greedy Graph Coloring Test + // -------------------------------------------------------------------------------------------- + + @Test + public void testGreedyGraphColoring() throws Exception { + GSAGreedyGraphColoringExample.main(new String[] {verticesPath, edgesPath, resultPath, "16"}); + expectedResult = "1 1.0\n" + + "2 1.0\n" + + "3 1.0\n" + + "4 1.0\n" + + "5 1.0\n"; + + } + + // -------------------------------------------------------------------------------------------- + // Single Source Shortest Path Test + // -------------------------------------------------------------------------------------------- + + @Test + public void testSingleSourceShortestPath() throws Exception { + GSASingleSourceShortestPathsExample.main(new String[]{verticesPath, edgesPath, resultPath, "1", "16"}); + expectedResult = "1 0.0\n" + + "2 12.0\n" + + "3 13.0\n" + + "4 47.0\n" + + "5 48.0\n"; + } + + + // -------------------------------------------------------------------------------------------- + // Sample data + // -------------------------------------------------------------------------------------------- + + private static final String VERTICES = "1 1.0\n" + + "2 2.0\n" + + "3 3.0\n" + + "4 4.0\n" + + "5 5.0\n"; + + private static final String EDGES = "1 2 12.0\n" + + "1 3 13.0\n" + + "2 3 23.0\n" + + "3 4 34.0\n" + + "3 5 35.0\n" + + "4 5 45.0\n" + + "5 1 51.0\n"; + +}