From e281e4d6fc22e498f56d88b0f661972345bf0e55 Mon Sep 17 00:00:00 2001 From: andralungu Date: Mon, 30 Mar 2015 14:50:09 +0200 Subject: [PATCH] [FLINK-1741] [gelly] Adds Jaccard Similarity Metric Example This closes #544 --- .../JaccardSimilarityMeasureExample.java | 212 ++++++++++++++++++ .../utils/JaccardSimilarityMeasureData.java | 58 +++++ ...JaccardSimilarityMeasureExampleITCase.java | 74 ++++++ 3 files changed, 344 insertions(+) create mode 100644 flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasureExample.java create mode 100644 flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/JaccardSimilarityMeasureData.java create mode 100644 flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/JaccardSimilarityMeasureExampleITCase.java diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasureExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasureExample.java new file mode 100644 index 00000000000..c81aeb3b74e --- /dev/null +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasureExample.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.example; + +import org.apache.flink.api.common.ProgramDescription; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.EdgeDirection; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.EdgesFunction; +import org.apache.flink.graph.Triplet; +import org.apache.flink.graph.example.utils.JaccardSimilarityMeasureData; +import org.apache.flink.types.NullValue; + +import java.util.HashSet; + +/** + * Given a directed, unweighted graph, return a weighted graph where the edge values are equal + * to the Jaccard similarity coefficient - the number of common neighbors divided by the the size + * of the union of neighbor sets - for the src and target vertices. + * + *

+ * Input files are plain text files and must be formatted as follows: + *
+ * Edges are represented by pairs of srcVertexId, trgVertexId separated by tabs. + * Edges themselves are separated by newlines. + * For example: 1 2\n1 3\n defines two edges 1-2 and 1-3. + *

+ * + * Usage JaccardSimilarityMeasureExample <edge path> <result path>
+ * If no parameters are provided, the program is run with default data from + * {@link org.apache.flink.graph.example.utils.JaccardSimilarityMeasureData} + */ +@SuppressWarnings("serial") +public class JaccardSimilarityMeasureExample implements ProgramDescription { + + public static void main(String [] args) throws Exception { + + if(!parseParameters(args)) { + return; + } + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet> edges = getEdgesDataSet(env); + + Graph graph = Graph.fromDataSet(edges, env); + + DataSet>> verticesWithNeighbors = + graph.reduceOnEdges(new GatherNeighbors(), EdgeDirection.ALL); + + Graph, Double> graphWithVertexValues = Graph.fromDataSet(verticesWithNeighbors, edges, env); + + // the edge value will be the Jaccard similarity coefficient(number of common neighbors/ all neighbors) + DataSet> edgesWithJaccardWeight = graphWithVertexValues.getTriplets() + .map(new WeighEdgesMapper()); + + DataSet> result = graphWithVertexValues.joinWithEdges(edgesWithJaccardWeight, + new MapFunction, Double>() { + + @Override + public Double map(Tuple2 value) throws Exception { + return value.f1; + } + }).getEdges(); + + // emit result + if (fileOutput) { + result.writeAsCsv(outputPath, "\n", ","); + } else { + result.print(); + } + + env.execute("Executing Jaccard Similarity Measure"); + } + + @Override + public String getDescription() { + return "Vertex Jaccard Similarity Measure"; + } + + /** + * Each vertex will have a HashSet containing its neighbor ids as value. + */ + private static final class GatherNeighbors implements EdgesFunction>> { + + @Override + public Vertex> iterateEdges(Iterable>> edges) throws Exception { + + HashSet neighborsHashSet = new HashSet(); + long vertexId = -1; + + for(Tuple2> edge : edges) { + neighborsHashSet.add(getNeighborID(edge)); + vertexId = edge.f0; + } + return new Vertex>(vertexId, neighborsHashSet); + } + } + + /** + * The edge weight will be the Jaccard coefficient, which is computed as follows: + * + * Consider the edge x-y + * We denote by sizeX and sizeY, the neighbors hash set size of x and y respectively. + * sizeX+sizeY = union + intersection of neighborhoods + * size(hashSetX.addAll(hashSetY)).distinct = union of neighborhoods + * The intersection can then be deduced. + * + * The Jaccard similarity coefficient is then, the intersection/union. + */ + private static class WeighEdgesMapper implements MapFunction, Double>, + Tuple3> { + + @Override + public Tuple3 map(Triplet, Double> triplet) + throws Exception { + + Vertex> source = triplet.getSrcVertex(); + Vertex> target = triplet.getTrgVertex(); + + long unionPlusIntersection = source.getValue().size() + target.getValue().size(); + // within a HashSet, all elements are distinct + source.getValue().addAll(target.getValue()); + // the source value contains the union + long union = source.getValue().size(); + long intersection = unionPlusIntersection - union; + + return new Tuple3(source.getId(), target.getId(), (double) intersection/union); + } + } + + /** + * Helper method that extracts the neighborId given an edge. + * @param edge + * @return + */ + private static Long getNeighborID(Tuple2> edge) { + if(edge.f1.getSource() == edge.f0) { + return edge.f1.getTarget(); + } else { + return edge.f1.getSource(); + } + } + + // ************************************************************************* + // UTIL METHODS + // ************************************************************************* + + private static boolean fileOutput = false; + private static String edgeInputPath = null; + private static String outputPath = null; + + private static boolean parseParameters(String [] args) { + if(args.length > 0) { + if(args.length != 2) { + System.err.println("Usage JaccardSimilarityMeasureExample "); + return false; + } + + fileOutput = true; + edgeInputPath = args[0]; + outputPath = args[1]; + } else { + System.out.println("Executing JaccardSimilarityMeasure example with default parameters and built-in default data."); + System.out.println("Provide parameters to read input data from files."); + System.out.println("Usage JaccardSimilarityMeasureExample "); + } + + return true; + } + + private static DataSet> getEdgesDataSet(ExecutionEnvironment env) { + + if(fileOutput) { + return env.readCsvFile(edgeInputPath) + .ignoreComments("#") + .fieldDelimiter("\t") + .lineDelimiter("\n") + .types(Long.class, Long.class) + .map(new MapFunction, Edge>() { + @Override + public Edge map(Tuple2 tuple2) throws Exception { + return new Edge(tuple2.f0, tuple2.f1, new Double(0)); + } + }); + } else { + return JaccardSimilarityMeasureData.getDefaultEdgeDataSet(env); + } + } +} diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/JaccardSimilarityMeasureData.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/JaccardSimilarityMeasureData.java new file mode 100644 index 00000000000..7564b956d05 --- /dev/null +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/JaccardSimilarityMeasureData.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.example.utils; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.graph.Edge; + +import java.util.ArrayList; +import java.util.List; + +/** + * Provides the default data sets used for the Jaccard Similarity Measure example program. + * If no parameters are given to the program, the default data sets are used. + */ +public class JaccardSimilarityMeasureData { + + public static final String EDGES = "1 2\n" + "1 3\n" + "1 4\n" + "1 5\n" + "2 3\n" + "2 4\n" + + "2 5\n" + "3 4\n" + "3 5\n" + "4 5"; + + public static DataSet> getDefaultEdgeDataSet(ExecutionEnvironment env) { + + List> edges = new ArrayList>(); + edges.add(new Edge(1L, 2L, new Double(0))); + edges.add(new Edge(1L, 3L, new Double(0))); + edges.add(new Edge(1L, 4L, new Double(0))); + edges.add(new Edge(1L, 5L, new Double(0))); + edges.add(new Edge(2L, 3L, new Double(0))); + edges.add(new Edge(2L, 4L, new Double(0))); + edges.add(new Edge(2L, 5L, new Double(0))); + edges.add(new Edge(3L, 4L, new Double(0))); + edges.add(new Edge(3L, 5L, new Double(0))); + edges.add(new Edge(4L, 5L, new Double(0))); + + return env.fromCollection(edges); + } + + public static final String JACCARD_EDGES = "1,2,0.6\n" + "1,3,0.6\n" + "1,4,0.6\n" + "1,5,0.6\n" + + "2,3,0.6\n" + "2,4,0.6\n" + "2,5,0.6\n" + "3,4,0.6\n" + "3,5,0.6\n" + "4,5,0.6"; + + private JaccardSimilarityMeasureData() {} +} diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/JaccardSimilarityMeasureExampleITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/JaccardSimilarityMeasureExampleITCase.java new file mode 100644 index 00000000000..7269ed7ff66 --- /dev/null +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/JaccardSimilarityMeasureExampleITCase.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.test.example; + +import com.google.common.base.Charsets; +import com.google.common.io.Files; +import org.apache.flink.graph.example.JaccardSimilarityMeasureExample; +import org.apache.flink.graph.example.utils.JaccardSimilarityMeasureData; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.File; + +@RunWith(Parameterized.class) +public class JaccardSimilarityMeasureExampleITCase extends MultipleProgramsTestBase { + + private String edgesPath; + + private String debugResultPath; + + private String resultPath; + + private String expected; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + public JaccardSimilarityMeasureExampleITCase(TestExecutionMode mode) { + super(mode); + } + + @Before + public void before() throws Exception { + resultPath = tempFolder.newFile().toURI().toString(); + + File edgesFile = tempFolder.newFile(); + Files.write(JaccardSimilarityMeasureData.EDGES, edgesFile, Charsets.UTF_8); + + edgesPath = edgesFile.toURI().toString(); + } + + @Test + public void testJaccardSimilarityMeasureExample() throws Exception { + JaccardSimilarityMeasureExample.main(new String[]{edgesPath, resultPath}); + expected = JaccardSimilarityMeasureData.JACCARD_EDGES; + } + + @After + public void after() throws Exception { + compareResultsByLinesInMemory(expected, resultPath); + } +} -- GitLab