提交 97fb9a47 编写于 作者: A Andra Lungu

[FLINK-2570] [gelly] Added a Triangle Count Library Method

[FLINK-2570] [gelly] Added a description of the I/O

This closes #1054
上级 948b6e05
......@@ -800,6 +800,10 @@ Gelly has a growing collection of graph algorithms for easily analyzing large-sc
* Label Propagation
* Simple Community Detection
* Connected Components
* GSA PageRank
* GSA Connected Components
* GSA Single-Source Shortest Paths
* GSA Triangle Count
Gelly's library methods can be used by simply calling the `run()` method on the input graph:
......
......@@ -26,7 +26,7 @@ import java.util.ArrayList;
import java.util.List;
/**
* Provides the default data set used for the Simple Community Detection example program.
* Provides the default data set used for the Simple Community Detection test program.
* If no parameters are given to the program, the default edge data set is used.
*/
public class CommunityDetectionData {
......
......@@ -27,6 +27,10 @@ import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Vertex;
import org.apache.flink.types.NullValue;
/**
* Provides the default data set used for the Label Propagation test program.
* If no parameters are given to the program, the default edge data set is used.
*/
public class LabelPropagationData {
public static final String LABELS_AFTER_1_ITERATION = "1,10\n" +
......
......@@ -25,6 +25,10 @@ import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple3;
/**
* Provides the default data sets used for the Music Profiles example program.
* If no parameters are given to the program, the default data sets are used.
*/
public class MusicProfilesData {
public static DataSet<Tuple3<String, String, Integer>> getUserSongTriplets(ExecutionEnvironment env) {
......
......@@ -25,6 +25,10 @@ import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.graph.Edge;
/**
* Provides the default data set used for the PageRank test program.
* If no parameters are given to the program, the default edge data set is used.
*/
public class PageRankData {
public static final String EDGES = "2 1\n" +
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.graph.example.utils;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.graph.Edge;
import org.apache.flink.types.NullValue;
import java.util.ArrayList;
import java.util.List;
/**
* Provides the default data sets used for the Triangle Count test program.
* If no parameters are given to the program, the default data sets are used.
*/
public class TriangleCountData {
public static final String EDGES = "1 2\n"+"1 3\n"+"2 3\n"+"2 6\n"+"3 4\n"+"3 5\n"+"3 6\n"+"4 5\n"+"6 7\n";
public static DataSet<Edge<Long, NullValue>> getDefaultEdgeDataSet(ExecutionEnvironment env) {
List<Edge<Long, NullValue>> edges = new ArrayList<Edge<Long, NullValue>>();
edges.add(new Edge<Long, NullValue>(1L, 2L, NullValue.getInstance()));
edges.add(new Edge<Long, NullValue>(1L, 3L, NullValue.getInstance()));
edges.add(new Edge<Long, NullValue>(2L, 3L, NullValue.getInstance()));
edges.add(new Edge<Long, NullValue>(2L, 6L, NullValue.getInstance()));
edges.add(new Edge<Long, NullValue>(3L, 4L, NullValue.getInstance()));
edges.add(new Edge<Long, NullValue>(3L, 5L, NullValue.getInstance()));
edges.add(new Edge<Long, NullValue>(3L, 6L, NullValue.getInstance()));
edges.add(new Edge<Long, NullValue>(4L, 5L, NullValue.getInstance()));
edges.add(new Edge<Long, NullValue>(6L, 7L, NullValue.getInstance()));
return env.fromCollection(edges);
}
public static final String RESULTED_NUMBER_OF_TRIANGLES = "3";
private TriangleCountData () {}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.graph.library;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.graph.GraphAlgorithm;
import org.apache.flink.graph.ReduceNeighborsFunction;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Triplet;
import org.apache.flink.graph.EdgeDirection;
import org.apache.flink.graph.Graph;
import org.apache.flink.types.NullValue;
import java.util.TreeMap;
/**
* Triangle Count Algorithm.
*
* This algorithm operates in three phases. First, vertices select neighbors with id greater than theirs
* and send messages to them. Each received message is then propagated to neighbors with higher id.
* Finally, if a node encounters the target id in the list of received messages, it increments the number
* of triangles found.
*
* This implementation is non - iterative.
*
* The algorithm takes an undirected, unweighted graph as input and outputs a DataSet of
* Tuple1 which contains a single integer representing the number of triangles.
*/
public class GSATriangleCount implements
GraphAlgorithm<Long, NullValue, NullValue, DataSet<Tuple1<Integer>>> {
@Override
public DataSet<Tuple1<Integer>> run(Graph<Long, NullValue, NullValue> input) throws Exception {
ExecutionEnvironment env = input.getContext();
// order the edges so that src is always higher than trg
DataSet<Edge<Long, NullValue>> edges = input.getEdges()
.map(new OrderEdges()).distinct();
Graph<Long, TreeMap<Long, Integer>, NullValue> graph = Graph.fromDataSet(edges,
new VertexInitializer(), env);
// select neighbors with ids higher than the current vertex id
// Gather: a no-op in this case
// Sum: create the set of neighbors
DataSet<Tuple2<Long, TreeMap<Long, Integer>>> higherIdNeighbors =
graph.reduceOnNeighbors(new GatherHigherIdNeighbors(), EdgeDirection.IN);
Graph<Long, TreeMap<Long, Integer>, NullValue> graphWithReinitializedVertexValues =
graph.mapVertices(new VertexInitializerEmptyTreeMap());
// Apply: attach the computed values to the vertices
// joinWithVertices to update the node values
DataSet<Vertex<Long, TreeMap<Long, Integer>>> verticesWithHigherIdNeighbors =
graphWithReinitializedVertexValues.joinWithVertices(higherIdNeighbors, new AttachValues()).getVertices();
Graph<Long, TreeMap<Long,Integer>, NullValue> graphWithNeighbors = Graph.fromDataSet(verticesWithHigherIdNeighbors,
edges, env);
// propagate each received value to neighbors with higher id
// Gather: a no-op in this case
// Sum: propagate values
DataSet<Tuple2<Long, TreeMap<Long, Integer>>> propagatedValues = graphWithNeighbors
.reduceOnNeighbors(new GatherHigherIdNeighbors(), EdgeDirection.IN);
// Apply: attach propagated values to vertices
DataSet<Vertex<Long, TreeMap<Long, Integer>>> verticesWithPropagatedValues =
graphWithReinitializedVertexValues.joinWithVertices(propagatedValues, new AttachValues()).getVertices();
Graph<Long, TreeMap<Long, Integer>, NullValue> graphWithPropagatedNeighbors =
Graph.fromDataSet(verticesWithPropagatedValues, graphWithNeighbors.getEdges(), env);
// Scatter: compute the number of triangles
DataSet<Tuple1<Integer>> numberOfTriangles = graphWithPropagatedNeighbors.getTriplets()
.map(new ComputeTriangles()).reduce(new ReduceFunction<Tuple1<Integer>>() {
@Override
public Tuple1<Integer> reduce(Tuple1<Integer> firstTuple, Tuple1<Integer> secondTuple) throws Exception {
return new Tuple1<Integer>(firstTuple.f0 + secondTuple.f0);
}
});
return numberOfTriangles;
}
@SuppressWarnings("serial")
private static final class OrderEdges implements MapFunction<Edge<Long, NullValue>, Edge<Long, NullValue>> {
@Override
public Edge<Long, NullValue> map(Edge<Long, NullValue> edge) throws Exception {
if (edge.getSource() < edge.getTarget()) {
return new Edge<Long, NullValue>(edge.getTarget(), edge.getSource(), NullValue.getInstance());
} else {
return edge;
}
}
}
@SuppressWarnings("serial")
private static final class VertexInitializer implements MapFunction<Long, TreeMap<Long, Integer>> {
@Override
public TreeMap<Long, Integer> map(Long value) throws Exception {
TreeMap<Long, Integer> neighbors = new TreeMap<Long, Integer>();
neighbors.put(value, 1);
return neighbors;
}
}
@SuppressWarnings("serial")
private static final class VertexInitializerEmptyTreeMap implements
MapFunction<Vertex<Long, TreeMap<Long, Integer>>, TreeMap<Long, Integer>> {
@Override
public TreeMap<Long, Integer> map(Vertex<Long, TreeMap<Long, Integer>> vertex) throws Exception {
return new TreeMap<Long, Integer>();
}
}
@SuppressWarnings("serial")
private static final class AttachValues implements MapFunction<Tuple2<TreeMap<Long, Integer>,
TreeMap<Long, Integer>>, TreeMap<Long, Integer>> {
@Override
public TreeMap<Long, Integer> map(Tuple2<TreeMap<Long, Integer>, TreeMap<Long, Integer>> tuple2) throws Exception {
return tuple2.f1;
}
}
@SuppressWarnings("serial")
private static final class GatherHigherIdNeighbors implements ReduceNeighborsFunction<TreeMap<Long,Integer>> {
@Override
public TreeMap<Long,Integer> reduceNeighbors(TreeMap<Long,Integer> first, TreeMap<Long,Integer> second) {
for (Long key : second.keySet()) {
Integer value = first.get(key);
if (value != null) {
first.put(key, value + second.get(key));
} else {
first.put(key, second.get(key));
}
}
return first;
}
}
@SuppressWarnings("serial")
private static final class ComputeTriangles implements MapFunction<Triplet<Long, TreeMap<Long, Integer>, NullValue>,
Tuple1<Integer>> {
@Override
public Tuple1<Integer> map(Triplet<Long, TreeMap<Long, Integer>, NullValue> triplet) throws Exception {
Vertex<Long, TreeMap<Long, Integer>> srcVertex = triplet.getSrcVertex();
Vertex<Long, TreeMap<Long, Integer>> trgVertex = triplet.getTrgVertex();
int triangles = 0;
if(trgVertex.getValue().get(srcVertex.getId()) != null) {
triangles=trgVertex.getValue().get(srcVertex.getId());
}
return new Tuple1<Integer>(triangles);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.graph.test.library;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.example.utils.TriangleCountData;
import org.apache.flink.graph.library.GSATriangleCount;
import org.apache.flink.test.util.MultipleProgramsTestBase;
import org.apache.flink.types.NullValue;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.util.List;
@RunWith(Parameterized.class)
public class TriangleCountITCase extends MultipleProgramsTestBase {
private String expectedResult;
public TriangleCountITCase(TestExecutionMode mode) {
super(mode);
}
@Test
public void testGSATriangleCount() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, NullValue, NullValue> graph = Graph.fromDataSet(TriangleCountData.getDefaultEdgeDataSet(env),
env).getUndirected();
List<Tuple1<Integer>> numberOfTriangles = graph.run(new GSATriangleCount()).collect();
expectedResult = TriangleCountData.RESULTED_NUMBER_OF_TRIANGLES;
compareResultAsTuples(numberOfTriangles, expectedResult);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册