提交 d34bdaf7 编写于 作者: G Greg Hogan 提交者: EC2 Default User

[FLINK-3907] [gelly] Directed Clustering Coefficient

This closes #2079
上级 d92aeb7a
......@@ -2112,8 +2112,9 @@ divided by the number of potential edges between neighbors.
See the [Triangle Enumeration](#triangle-enumeration) library method for a detailed explanation of triangle enumeration.
#### Usage
The algorithm takes a simple, undirected graph as input and outputs a `DataSet` of tuples containing the vertex ID,
vertex degree, and number of triangles containing the vertex. The graph ID type must be `Comparable` and `Copyable`.
Directed and undirected variants are provided. The algorithms take a simple graph as input and output a `DataSet` of
tuples containing the vertex ID, vertex degree, and number of triangles containing the vertex. The graph ID type must be
`Comparable` and `Copyable`.
### Global Clustering Coefficient
......@@ -2126,8 +2127,9 @@ See the [Local Clustering Coefficient](#local-clustering-coefficient) library me
clustering coefficient.
#### Usage
The algorithm takes a simple, undirected graph as input and outputs a result containing the total number of triplets and
triangles in the graph. The graph ID type must be `Comparable` and `Copyable`.
Directed and undirected variants are provided. The algorithm takes a simple graph as input and outputs a result
containing the total number of triplets and triangles in the graph. The graph ID type must be `Comparable` and
`Copyable`.
{% top %}
......
......@@ -204,7 +204,7 @@ public final class Utils {
@Override
public int hashCode() {
return (int) (this.count + this.hashCode());
return (int) (this.count + this.checksum);
}
@Override
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.graph.examples;
import org.apache.commons.lang3.StringEscapeUtils;
import org.apache.commons.lang3.text.WordUtils;
import org.apache.commons.math3.random.JDKRandomGenerator;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.CsvOutputFormat;
import org.apache.flink.api.java.utils.DataSetUtils;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.GraphAnalytic;
import org.apache.flink.graph.asm.translate.LongValueToIntValue;
import org.apache.flink.graph.asm.translate.TranslateGraphIds;
import org.apache.flink.graph.generator.RMatGraph;
import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
import org.apache.flink.graph.generator.random.RandomGenerableFactory;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.LongValue;
import org.apache.flink.types.NullValue;
import java.text.NumberFormat;
/**
* Driver for the library implementations of Global and Local Clustering Coefficient.
*
* This example reads a simple directed or undirected graph from a CSV file or
* generates an RMat graph with the given scale and edge factor then calculates
* the local clustering coefficient for each vertex and the global clustering
* coefficient for the graph.
*
* @see org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient
* @see org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient
* @see org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient
* @see org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient
*/
public class ClusteringCoefficient {
public static final int DEFAULT_SCALE = 10;
public static final int DEFAULT_EDGE_FACTOR = 16;
public static final boolean DEFAULT_CLIP_AND_FLIP = true;
private static void printUsage() {
System.out.println(WordUtils.wrap("The local clustering coefficient measures the connectedness of each" +
" vertex's neighborhood and the global clustering coefficient measures the connectedness of the graph." +
" Scores range from 0.0 (no edges between neighbors or vertices) to 1.0 (neighborhood or graph" +
" is a clique).", 80));
System.out.println();
System.out.println(WordUtils.wrap("This algorithm returns tuples containing the vertex ID, the degree of" +
" the vertex, and the number of edges between vertex neighbors.", 80));
System.out.println();
System.out.println("usage: ClusteringCoefficient --directed <true | false> --input <csv | rmat [options]> --output <print | hash | csv [options]");
System.out.println();
System.out.println("options:");
System.out.println(" --input csv --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]");
System.out.println(" --input rmat [--scale SCALE] [--edge_factor EDGE_FACTOR]");
System.out.println();
System.out.println(" --output print");
System.out.println(" --output hash");
System.out.println(" --output csv --output_filename FILENAME [--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter FIELD_DELIMITER]");
}
public static void main(String[] args) throws Exception {
// Set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableObjectReuse();
ParameterTool parameters = ParameterTool.fromArgs(args);
if (! parameters.has("directed")) {
printUsage();
return;
}
boolean directedAlgorithm = parameters.getBoolean("directed");
// global and local clustering coefficient results
GraphAnalytic gcc;
DataSet lcc;
switch (parameters.get("input", "")) {
case "csv": {
String lineDelimiter = StringEscapeUtils.unescapeJava(
parameters.get("input_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER));
String fieldDelimiter = StringEscapeUtils.unescapeJava(
parameters.get("input_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
Graph<LongValue, NullValue, NullValue> graph = Graph
.fromCsvReader(parameters.get("input_filename"), env)
.ignoreCommentsEdges("#")
.lineDelimiterEdges(lineDelimiter)
.fieldDelimiterEdges(fieldDelimiter)
.keyType(LongValue.class);
if (directedAlgorithm) {
gcc = graph
.run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<LongValue, NullValue, NullValue>());
lcc = graph
.run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<LongValue, NullValue, NullValue>());
} else {
gcc = graph
.run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<LongValue, NullValue, NullValue>());
lcc = graph
.run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<LongValue, NullValue, NullValue>());
}
} break;
case "rmat": {
int scale = parameters.getInt("scale", DEFAULT_SCALE);
int edgeFactor = parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR);
RandomGenerableFactory<JDKRandomGenerator> rnd = new JDKRandomGeneratorFactory();
long vertexCount = 1L << scale;
long edgeCount = vertexCount * edgeFactor;
Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
.generate();
if (directedAlgorithm) {
if (scale > 32) {
Graph<LongValue, NullValue, NullValue> newGraph = graph
.run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>());
gcc = newGraph
.run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<LongValue, NullValue, NullValue>());
lcc = newGraph
.run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<LongValue, NullValue, NullValue>());
} else {
Graph<IntValue, NullValue, NullValue> newGraph = graph
.run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToIntValue()))
.run(new org.apache.flink.graph.asm.simple.directed.Simplify<IntValue, NullValue, NullValue>());
gcc = newGraph
.run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<IntValue, NullValue, NullValue>());
lcc = newGraph
.run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<IntValue, NullValue, NullValue>());
}
} else {
boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP);
if (scale > 32) {
Graph<LongValue, NullValue, NullValue> newGraph = graph
.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(clipAndFlip));
gcc = newGraph
.run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<LongValue, NullValue, NullValue>());
lcc = newGraph
.run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<LongValue, NullValue, NullValue>());
} else {
Graph<IntValue, NullValue, NullValue> newGraph = graph
.run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToIntValue()))
.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<IntValue, NullValue, NullValue>(clipAndFlip));
gcc = newGraph
.run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<IntValue, NullValue, NullValue>());
lcc = newGraph
.run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<IntValue, NullValue, NullValue>());
}
}
} break;
default:
printUsage();
return;
}
switch (parameters.get("output", "")) {
case "print":
if (directedAlgorithm) {
for (Object e: lcc.collect()) {
org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient.Result result =
(org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient.Result)e;
System.out.println(result.toVerboseString());
}
} else {
for (Object e: lcc.collect()) {
org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient.Result result =
(org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient.Result)e;
System.out.println(result.toVerboseString());
}
}
System.out.println(gcc.getResult());
break;
case "hash":
System.out.println(DataSetUtils.checksumHashCode(lcc));
System.out.println(gcc.getResult());
break;
case "csv":
String filename = parameters.get("output_filename");
String lineDelimiter = StringEscapeUtils.unescapeJava(
parameters.get("output_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER));
String fieldDelimiter = StringEscapeUtils.unescapeJava(
parameters.get("output_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
lcc.writeAsCsv(filename, lineDelimiter, fieldDelimiter);
System.out.println(gcc.execute());
break;
default:
printUsage();
return;
}
JobExecutionResult result = env.getLastJobExecutionResult();
NumberFormat nf = NumberFormat.getInstance();
System.out.println("Execution runtime: " + nf.format(result.getNetRuntime()) + " ms");
}
}
......@@ -97,10 +97,10 @@ public class JaccardIndex {
Graph<LongValue, NullValue, NullValue> graph = Graph
.fromCsvReader(parameters.get("input_filename"), env)
.ignoreCommentsEdges("#")
.lineDelimiterEdges(lineDelimiter)
.fieldDelimiterEdges(fieldDelimiter)
.keyType(LongValue.class);
.ignoreCommentsEdges("#")
.lineDelimiterEdges(lineDelimiter)
.fieldDelimiterEdges(fieldDelimiter)
.keyType(LongValue.class);
ji = graph
.run(new org.apache.flink.graph.library.similarity.JaccardIndex<LongValue, NullValue, NullValue>());
......@@ -162,6 +162,7 @@ public class JaccardIndex {
env.execute();
break;
default:
printUsage();
return;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.graph.examples;
import org.apache.commons.lang3.text.WordUtils;
import org.apache.commons.math3.random.JDKRandomGenerator;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.CsvOutputFormat;
import org.apache.flink.api.java.utils.DataSetUtils;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.asm.simple.undirected.Simplify;
import org.apache.flink.graph.asm.translate.LongValueToIntValue;
import org.apache.flink.graph.asm.translate.TranslateGraphIds;
import org.apache.flink.graph.generator.RMatGraph;
import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
import org.apache.flink.graph.generator.random.RandomGenerableFactory;
import org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient.Result;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.LongValue;
import org.apache.flink.types.NullValue;
import java.text.NumberFormat;
/**
* Driver for the library implementation of Local Clustering Coefficient.
*
* This example generates an undirected RMat graph with the given scale and
* edge factor then calculates the local clustering coefficient for each vertex.
*
* @see org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient
*/
public class LocalClusteringCoefficient {
public static final int DEFAULT_SCALE = 10;
public static final int DEFAULT_EDGE_FACTOR = 16;
public static final boolean DEFAULT_CLIP_AND_FLIP = true;
public static void main(String[] args) throws Exception {
// Set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableObjectReuse();
ParameterTool parameters = ParameterTool.fromArgs(args);
// Generate RMat graph
int scale = parameters.getInt("scale", DEFAULT_SCALE);
int edgeFactor = parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR);
RandomGenerableFactory<JDKRandomGenerator> rnd = new JDKRandomGeneratorFactory();
long vertexCount = 1L << scale;
long edgeCount = vertexCount * edgeFactor;
boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP);
Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
.generate()
.run(new Simplify<LongValue, NullValue, NullValue>(clipAndFlip));
DataSet cc;
if (scale > 32) {
cc = graph
.run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<LongValue, NullValue, NullValue>());
} else {
cc = graph
.run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToIntValue()))
.run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<IntValue, NullValue, NullValue>());
}
switch (parameters.get("output", "")) {
case "print":
for (Object e: cc.collect()) {
Result result = (Result)e;
System.out.println(result.toVerboseString());
}
break;
case "hash":
System.out.println(DataSetUtils.checksumHashCode(cc));
break;
case "csv":
String filename = parameters.get("filename");
String row_delimiter = parameters.get("row_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER);
String field_delimiter = parameters.get("field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER);
cc.writeAsCsv(filename, row_delimiter, field_delimiter);
env.execute();
break;
default:
System.out.println(WordUtils.wrap("The local clustering coefficient measures the connectedness of each" +
" vertex's neighborhood. Scores range from 0.0 (no edges between neighbors) to 1.0 (neighborhood" +
" is a clique).", 80));
System.out.println();
System.out.println(WordUtils.wrap("This algorithm returns tuples containing the vertex ID, the degree of" +
" the vertex, the number of edges between vertex neighbors, and the local clustering coefficient.", 80));
System.out.println();
System.out.println("usage:");
System.out.println(" LocalClusteringCoefficient [--scale SCALE] [--edge_factor EDGE_FACTOR] --output print");
System.out.println(" LocalClusteringCoefficient [--scale SCALE] [--edge_factor EDGE_FACTOR] --output hash");
System.out.println(" LocalClusteringCoefficient [--scale SCALE] [--edge_factor EDGE_FACTOR] --output csv" +
" --filename FILENAME [--row_delimiter ROW_DELIMITER] [--field_delimiter FIELD_DELIMITER]");
return;
}
JobExecutionResult result = env.getLastJobExecutionResult();
NumberFormat nf = NumberFormat.getInstance();
System.out.println("Execution runtime: " + nf.format(result.getNetRuntime()) + " ms");
}
}
......@@ -18,6 +18,8 @@
package org.apache.flink.graph.examples;
import org.apache.commons.lang3.StringEscapeUtils;
import org.apache.commons.lang3.text.WordUtils;
import org.apache.commons.math3.random.JDKRandomGenerator;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.java.DataSet;
......@@ -41,9 +43,11 @@ import java.text.NumberFormat;
/**
* Driver for the library implementation of Triangle Listing.
*
* This example generates an undirected RMat graph with the given scale
* and edge factor then lists all triangles.
* This example reads a simple directed or undirected graph from a CSV file or
* generates an RMat graph with the given scale and edge factor then lists
* all triangles.
*
* @see org.apache.flink.graph.library.clustering.directed.TriangleListing
* @see org.apache.flink.graph.library.clustering.undirected.TriangleListing
*/
public class TriangleListing {
......@@ -54,68 +58,142 @@ public class TriangleListing {
public static final boolean DEFAULT_CLIP_AND_FLIP = true;
private static void printUsage() {
System.out.println(WordUtils.wrap("Lists all triangles in a graph.", 80));
System.out.println();
System.out.println(WordUtils.wrap("This algorithm returns tuples containing the vertex IDs for each triangle and" +
" for directed graphs a bitmask indicating the presence of the six potential connecting edges.", 80));
System.out.println();
System.out.println("usage: TriangleListing --directed <true | false> --input <csv | rmat [options]> --output <print | hash | csv [options]");
System.out.println();
System.out.println("options:");
System.out.println(" --input csv --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]");
System.out.println(" --input rmat [--scale SCALE] [--edge_factor EDGE_FACTOR]");
System.out.println();
System.out.println(" --output print");
System.out.println(" --output hash");
System.out.println(" --output csv --output_filename FILENAME [--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter FIELD_DELIMITER]");
}
public static void main(String[] args) throws Exception {
// Set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableObjectReuse();
ParameterTool parameters = ParameterTool.fromArgs(args);
// Generate RMat graph
int scale = parameters.getInt("scale", DEFAULT_SCALE);
int edgeFactor = parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR);
RandomGenerableFactory<JDKRandomGenerator> rnd = new JDKRandomGeneratorFactory();
long vertexCount = 1L << scale;
long edgeCount = vertexCount * edgeFactor;
boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP);
Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
.generate()
.run(new Simplify<LongValue, NullValue, NullValue>(clipAndFlip));
if (! parameters.has("directed")) {
printUsage();
return;
}
boolean directedAlgorithm = parameters.getBoolean("directed");
DataSet tl;
if (scale > 32) {
tl = graph
.run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing<LongValue, NullValue, NullValue>());
} else {
tl = graph
.run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToIntValue()))
.run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing<IntValue, NullValue, NullValue>());
switch (parameters.get("input", "")) {
case "csv": {
String lineDelimiter = StringEscapeUtils.unescapeJava(
parameters.get("input_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER));
String fieldDelimiter = StringEscapeUtils.unescapeJava(
parameters.get("input_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
Graph<LongValue, NullValue, NullValue> graph = Graph
.fromCsvReader(parameters.get("input_filename"), env)
.ignoreCommentsEdges("#")
.lineDelimiterEdges(lineDelimiter)
.fieldDelimiterEdges(fieldDelimiter)
.keyType(LongValue.class);
if (directedAlgorithm) {
tl = graph
.run(new org.apache.flink.graph.library.clustering.directed.TriangleListing<LongValue, NullValue, NullValue>());
} else {
tl = graph
.run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing<LongValue, NullValue, NullValue>());
}
} break;
case "rmat": {
int scale = parameters.getInt("scale", DEFAULT_SCALE);
int edgeFactor = parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR);
RandomGenerableFactory<JDKRandomGenerator> rnd = new JDKRandomGeneratorFactory();
long vertexCount = 1L << scale;
long edgeCount = vertexCount * edgeFactor;
Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
.generate();
if (directedAlgorithm) {
if (scale > 32) {
tl = graph
.run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>())
.run(new org.apache.flink.graph.library.clustering.directed.TriangleListing<LongValue, NullValue, NullValue>());
} else {
tl = graph
.run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToIntValue()))
.run(new org.apache.flink.graph.asm.simple.directed.Simplify<IntValue, NullValue, NullValue>())
.run(new org.apache.flink.graph.library.clustering.directed.TriangleListing<IntValue, NullValue, NullValue>());
}
} else {
boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP);
graph = graph
.run(new Simplify<LongValue, NullValue, NullValue>(clipAndFlip));
if (scale > 32) {
tl = graph
.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(clipAndFlip))
.run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing<LongValue, NullValue, NullValue>());
} else {
tl = graph
.run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToIntValue()))
.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<IntValue, NullValue, NullValue>(clipAndFlip))
.run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing<IntValue, NullValue, NullValue>());
}
}
} break;
default:
printUsage();
return;
}
switch (parameters.get("output", "")) {
case "print":
tl.print();
break;
case "hash":
System.out.println(DataSetUtils.checksumHashCode(tl));
break;
case "csv":
String filename = parameters.get("filename");
String row_delimiter = parameters.get("row_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER);
String field_delimiter = parameters.get("field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER);
tl.writeAsCsv(filename, row_delimiter, field_delimiter);
env.execute();
break;
default:
System.out.println("Lists all distinct triangles in the generated RMat graph.");
System.out.println();
System.out.println("usage:");
System.out.println(" TriangleListing [--scale SCALE] [--edge_factor EDGE_FACTOR] --output print");
System.out.println(" TriangleListing [--scale SCALE] [--edge_factor EDGE_FACTOR] --output hash");
System.out.println(" TriangleListing [--scale SCALE] [--edge_factor EDGE_FACTOR] --output csv" +
" --filename FILENAME [--row_delimiter ROW_DELIMITER] [--field_delimiter FIELD_DELIMITER]");
return;
case "print":
if (directedAlgorithm) {
for (Object e: tl.collect()) {
org.apache.flink.graph.library.clustering.directed.TriangleListing.Result result =
(org.apache.flink.graph.library.clustering.directed.TriangleListing.Result) e;
System.out.println(result.toVerboseString());
}
} else {
tl.print();
}
break;
case "hash":
System.out.println(DataSetUtils.checksumHashCode(tl));
break;
case "csv":
String filename = parameters.get("output_filename");
String lineDelimiter = StringEscapeUtils.unescapeJava(
parameters.get("output_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER));
String fieldDelimiter = StringEscapeUtils.unescapeJava(
parameters.get("output_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
tl.writeAsCsv(filename, lineDelimiter, fieldDelimiter);
env.execute();
break;
default:
printUsage();
return;
}
JobExecutionResult result = env.getLastJobExecutionResult();
......
......@@ -1127,8 +1127,10 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
*
* @param analytic the analytic to run on the Graph
*/
def run[T: TypeInformation : ClassTag](analytic: GraphAnalytic[K, VV, EV, T])= {
def run[T: TypeInformation : ClassTag](analytic: GraphAnalytic[K, VV, EV, T]):
GraphAnalytic[K, VV, EV, T] = {
jgraph.run(analytic)
analytic
}
/**
......
......@@ -1797,8 +1797,9 @@ public class Graph<K, VV, EV> {
* @param <T> the result type
* @throws Exception
*/
public <T> void run(GraphAnalytic<K, VV, EV, T> analytic) throws Exception {
public <T> GraphAnalytic<K, VV, EV, T> run(GraphAnalytic<K, VV, EV, T> analytic) throws Exception {
analytic.run(this);
return analytic;
}
/**
......
......@@ -38,7 +38,7 @@ public interface GraphAnalytic<K, VV, EV, T> {
* This method must be called after the program has executed:
* 1) "run" analytics and algorithms
* 2) call ExecutionEnvironment.execute()
* 3) get analytics results
* 3) get analytic results
*
* @return the result
*/
......
......@@ -142,7 +142,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, Degrees>>> {
}
/**
* Combine mutual edges.
* Reduce bitmasks to a single value using bitwise-or.
*
* @param <T> ID type
*/
......
/*
*
* * Licensed to the Apache Software Foundation (ASF) under one
* * or more contributor license agreements. See the NOTICE file
* * distributed with this work for additional information
* * regarding copyright ownership. The ASF licenses this file
* * to you under the Apache License, Version 2.0 (the
* * "License"); you may not use this file except in compliance
* * with the License. You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
*
*/
package org.apache.flink.graph.library.clustering.directed;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.flink.graph.AbstractGraphAnalytic;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient.Result;
import org.apache.flink.graph.library.metric.directed.VertexMetrics;
import org.apache.flink.types.CopyableValue;
import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
/**
* The global clustering coefficient measures the connectedness of a graph.
* Scores range from 0.0 (no triangles) to 1.0 (complete graph).
*
* @param <K> graph ID type
* @param <VV> vertex value type
* @param <EV> edge value type
*/
public class GlobalClusteringCoefficient<K extends Comparable<K> & CopyableValue<K>, VV, EV>
extends AbstractGraphAnalytic<K, VV, EV, Result> {
private TriangleCount<K, VV, EV> triangleCount;
private VertexMetrics<K, VV, EV> vertexMetrics;
// Optional configuration
private int littleParallelism = PARALLELISM_DEFAULT;
/**
* Override the parallelism of operators processing small amounts of data.
*
* @param littleParallelism operator parallelism
* @return this
*/
public GlobalClusteringCoefficient<K, VV, EV> setLittleParallelism(int littleParallelism) {
this.littleParallelism = littleParallelism;
return this;
}
/*
* Implementation notes:
*
* The requirement that "K extends CopyableValue<K>" can be removed when
* removed from TriangleListing.
*/
@Override
public GlobalClusteringCoefficient<K, VV, EV> run(Graph<K, VV, EV> input)
throws Exception {
super.run(input);
triangleCount = new TriangleCount<K, VV, EV>()
.setLittleParallelism(littleParallelism);
input.run(triangleCount);
vertexMetrics = new VertexMetrics<K, VV, EV>()
.setParallelism(littleParallelism);
input.run(vertexMetrics);
return this;
}
@Override
public Result getResult() {
// each triangle must be counted from each of the three vertices
// as each triplet is counted in this manner
long numberOfTriangles = 3 * triangleCount.getResult();
return new Result(vertexMetrics.getResult().getNumberOfTriplets(), numberOfTriangles);
}
/**
* Wraps global clustering coefficient metrics.
*/
public static class Result {
private long tripletCount;
private long triangleCount;
/**
* Instantiate an immutable result.
*
* @param tripletCount triplet count
* @param triangleCount triangle count
*/
public Result(long tripletCount, long triangleCount) {
this.tripletCount = tripletCount;
this.triangleCount = triangleCount;
}
/**
* Get the number of triplets.
*
* @return number of triplets
*/
public long getNumberOfTriplets() {
return tripletCount;
}
/**
* Get the number of triangles.
*
* @return number of triangles
*/
public long getNumberOfTriangles() {
return triangleCount;
}
/**
* Get the global clustering coefficient score. This is computed as the
* number of closed triplets (triangles) divided by the total number of
* triplets.
*
* A score of {@code Double.NaN} is returned for a graph of isolated vertices
* for which both the triangle count and number of neighbors are zero.
*
* @return global clustering coefficient score
*/
public double getGlobalClusteringCoefficientScore() {
return (tripletCount == 0) ? Double.NaN : triangleCount / (double)tripletCount;
}
@Override
public String toString() {
return "triplet count: " + tripletCount
+ ", triangle count: " + triangleCount
+ ", global clustering coefficient: " + getGlobalClusteringCoefficientScore();
}
@Override
public int hashCode() {
return new HashCodeBuilder()
.append(tripletCount)
.append(triangleCount)
.hashCode();
}
@Override
public boolean equals(Object obj) {
if (obj == null) { return false; }
if (obj == this) { return true; }
if (obj.getClass() != getClass()) { return false; }
Result rhs = (Result)obj;
return new EqualsBuilder()
.append(tripletCount, rhs.tripletCount)
.append(triangleCount, rhs.triangleCount)
.isEquals();
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.graph.library.clustering.directed;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.functions.FunctionAnnotation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.GraphAlgorithm;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees;
import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
import org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient.Result;
import org.apache.flink.graph.utils.Murmur3_32;
import org.apache.flink.types.CopyableValue;
import org.apache.flink.types.LongValue;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;
import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
/**
* The local clustering coefficient measures the connectedness of each vertex's
* neighborhood. Scores range from 0.0 (no edges between neighbors) to 1.0
* (neighborhood is a clique).
* <br/>
* An edge between a vertex's neighbors is a triangle. Counting edges between
* neighbors is equivalent to counting the number of triangles which include
* the vertex.
* <br/>
* The input graph must be a simple graph containing no duplicate edges or
* self-loops.
*
* @param <K> graph ID type
* @param <VV> vertex value type
* @param <EV> edge value type
*/
public class LocalClusteringCoefficient<K extends Comparable<K> & CopyableValue<K>, VV, EV>
implements GraphAlgorithm<K, VV, EV, DataSet<Result<K>>> {
// Optional configuration
private int littleParallelism = PARALLELISM_DEFAULT;
/**
* Override the parallelism of operators processing small amounts of data.
*
* @param littleParallelism operator parallelism
* @return this
*/
public LocalClusteringCoefficient<K, VV, EV> setLittleParallelism(int littleParallelism) {
Preconditions.checkArgument(littleParallelism > 0 || littleParallelism == PARALLELISM_DEFAULT,
"The parallelism must be greater than zero.");
this.littleParallelism = littleParallelism;
return this;
}
/*
* Implementation notes:
*
* The requirement that "K extends CopyableValue<K>" can be removed when
* removed from TriangleListing.
*
* CountVertices can be replaced by ".sum(1)" when Flink aggregators use
* code generation.
*/
@Override
public DataSet<Result<K>> run(Graph<K, VV, EV> input)
throws Exception {
// u, v, w, bitmask
DataSet<TriangleListing.Result<K>> triangles = input
.run(new TriangleListing<K,VV,EV>()
.setSortTriangleVertices(false)
.setLittleParallelism(littleParallelism));
// u, edge count
DataSet<Tuple2<K, LongValue>> triangleVertices = triangles
.flatMap(new SplitTriangles<K>())
.name("Split triangle vertices");
// u, triangle count
DataSet<Tuple2<K, LongValue>> vertexTriangleCount = triangleVertices
.groupBy(0)
.reduce(new CountTriangles<K>())
.name("Count triangles");
// u, deg(u)
DataSet<Vertex<K, Degrees>> vertexDegree = input
.run(new VertexDegrees<K, VV, EV>()
.setParallelism(littleParallelism)
.setIncludeZeroDegreeVertices(true));
// u, deg(u), triangle count
return vertexDegree
.leftOuterJoin(vertexTriangleCount)
.where(0)
.equalTo(0)
.with(new JoinVertexDegreeWithTriangleCount<K>())
.setParallelism(littleParallelism)
.name("Clustering coefficient");
}
/**
* Emits the three vertex IDs comprising each triangle along with an initial count.
*
* @param <T> ID type
*/
private class SplitTriangles<T>
implements FlatMapFunction<TriangleListing.Result<T>, Tuple2<T, LongValue>> {
private LongValue one = new LongValue(1);
private LongValue two = new LongValue(2);
private Tuple2<T, LongValue> output = new Tuple2<>();
@Override
public void flatMap(TriangleListing.Result<T> value, Collector<Tuple2<T, LongValue>> out)
throws Exception {
byte bitmask = value.f3.getValue();
output.f0 = value.f0;
output.f1 = ((bitmask & 0b000011) == 0b000011) ? two : one;
out.collect(output);
output.f0 = value.f1;
output.f1 = ((bitmask & 0b001100) == 0b001100) ? two : one;
out.collect(output);
output.f0 = value.f2;
output.f1 = ((bitmask & 0b110000) == 0b110000) ? two : one;
out.collect(output);
}
}
/**
* Sums the triangle count for each vertex ID.
*
* @param <T> ID type
*/
@FunctionAnnotation.ForwardedFields("0")
private class CountTriangles<T>
implements ReduceFunction<Tuple2<T, LongValue>> {
@Override
public Tuple2<T, LongValue> reduce(Tuple2<T, LongValue> left, Tuple2<T, LongValue> right)
throws Exception {
left.f1.setValue(left.f1.getValue() + right.f1.getValue());
return left;
}
}
/**
* Joins the vertex and degree with the vertex's triangle count.
*
* @param <T> ID type
*/
@FunctionAnnotation.ForwardedFieldsFirst("0; 1.0->1.0")
@FunctionAnnotation.ForwardedFieldsSecond("0")
private class JoinVertexDegreeWithTriangleCount<T>
implements JoinFunction<Vertex<T, Degrees>, Tuple2<T, LongValue>, Result<T>> {
private LongValue zero = new LongValue(0);
private Result<T> output = new Result<>();
@Override
public Result<T> join(Vertex<T, Degrees> vertexAndDegree, Tuple2<T, LongValue> vertexAndTriangleCount)
throws Exception {
output.f0 = vertexAndDegree.f0;
output.f1.f0 = vertexAndDegree.f1.f0;
output.f1.f1 = (vertexAndTriangleCount == null) ? zero : vertexAndTriangleCount.f1;
return output;
}
}
/**
* Wraps the vertex type to encapsulate results from the Local Clustering Coefficient algorithm.
*
* @param <T> ID type
*/
public static class Result<T>
extends Vertex<T, Tuple2<LongValue, LongValue>> {
public static final int HASH_SEED = 0x37a208c4;
private Murmur3_32 hasher = new Murmur3_32(HASH_SEED);
/**
* No-args constructor.
*/
public Result() {
f1 = new Tuple2<>();
}
/**
* Get the vertex degree.
*
* @return vertex degree
*/
public LongValue getDegree() {
return f1.f0;
}
/**
* Get the number of triangles containing this vertex; equivalently,
* this is the number of edges between neighbors of this vertex.
*
* @return triangle count
*/
public LongValue getTriangleCount() {
return f1.f1;
}
/**
* Get the local clustering coefficient score. This is computed as the
* number of edges between neighbors, equal to the triangle count,
* divided by the number of potential edges between neighbors.
*
* A score of {@code Double.NaN} is returned for a vertex with degree 1
* for which both the triangle count and number of neighbors are zero.
*
* @return local clustering coefficient score
*/
public double getLocalClusteringCoefficientScore() {
long degree = getDegree().getValue();
long neighborPairs = degree * (degree - 1);
return (neighborPairs == 0) ? Double.NaN : getTriangleCount().getValue() / (double)neighborPairs;
}
/**
* Format values into a human-readable string.
*
* @return verbose string
*/
public String toVerboseString() {
return "Vertex ID: " + f0
+ ", vertex degree: " + getDegree()
+ ", triangle count: " + getTriangleCount()
+ ", local clustering coefficient: " + getLocalClusteringCoefficientScore();
}
@Override
public int hashCode() {
return hasher.reset()
.hash(f0.hashCode())
.hash(f1.f0.getValue())
.hash(f1.f1.getValue())
.hash();
}
}
}
/*
*
* * Licensed to the Apache Software Foundation (ASF) under one
* * or more contributor license agreements. See the NOTICE file
* * distributed with this work for additional information
* * regarding copyright ownership. The ASF licenses this file
* * to you under the Apache License, Version 2.0 (the
* * "License"); you may not use this file except in compliance
* * with the License. You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
*
*/
package org.apache.flink.graph.library.clustering.directed;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.Utils.CountHelper;
import org.apache.flink.graph.AbstractGraphAnalytic;
import org.apache.flink.graph.Graph;
import org.apache.flink.types.CopyableValue;
import org.apache.flink.util.AbstractID;
import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
/**
* Count the number of distinct triangles in an undirected graph.
*
* @param <K> graph ID type
* @param <VV> vertex value type
* @param <EV> edge value type
* @see TriangleListing
*/
public class TriangleCount<K extends Comparable<K> & CopyableValue<K>, VV, EV>
extends AbstractGraphAnalytic<K, VV, EV, Long> {
private String id = new AbstractID().toString();
// Optional configuration
private int littleParallelism = PARALLELISM_DEFAULT;
/**
* Override the parallelism of operators processing small amounts of data.
*
* @param littleParallelism operator parallelism
* @return this
*/
public TriangleCount<K, VV, EV> setLittleParallelism(int littleParallelism) {
this.littleParallelism = littleParallelism;
return this;
}
@Override
public TriangleCount<K, VV, EV> run(Graph<K, VV, EV> input)
throws Exception {
super.run(input);
DataSet<TriangleListing.Result<K>> triangles = input
.run(new TriangleListing<K, VV, EV>()
.setSortTriangleVertices(false)
.setLittleParallelism(littleParallelism));
triangles
.output(new CountHelper<TriangleListing.Result<K>>(id))
.name("Count triangles");
return this;
}
@Override
public Long getResult() {
return env.getLastJobExecutionResult().<Long> getAccumulatorResult(id);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.graph.library.clustering.directed;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.common.operators.base.JoinOperatorBase;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.EdgeOrder;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.GraphAlgorithm;
import org.apache.flink.graph.asm.degree.annotate.directed.EdgeDegreesPair;
import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
import org.apache.flink.types.ByteValue;
import org.apache.flink.types.CopyableValue;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
/**
* Generates a listing of distinct triangles from the input graph.
* <br/>
* A triangle is a 3-clique with vertices A, B, and C connected by edges
* (A, B), (A, C), and (B, C).
* <br/>
* The input graph must not contain duplicate edges or self-loops.
*
* @param <K> graph ID type
* @param <VV> vertex value type
* @param <EV> edge value type
*/
public class TriangleListing<K extends Comparable<K> & CopyableValue<K>, VV, EV>
implements GraphAlgorithm<K, VV, EV, DataSet<TriangleListing.Result<K>>> {
// Optional configuration
private boolean sortTriangleVertices = false;
private int littleParallelism = PARALLELISM_DEFAULT;
/**
* Normalize the triangle listing such that for each result (K0, K1, K2)
* the vertex IDs are sorted K0 < K1 < K2.
*
* @param sortTriangleVertices whether to output each triangle's vertices in sorted order
* @return this
*/
public TriangleListing<K, VV, EV> setSortTriangleVertices(boolean sortTriangleVertices) {
this.sortTriangleVertices = sortTriangleVertices;
return this;
}
/**
* Override the parallelism of operators processing small amounts of data.
*
* @param littleParallelism operator parallelism
* @return this
*/
public TriangleListing<K, VV, EV> setLittleParallelism(int littleParallelism) {
Preconditions.checkArgument(littleParallelism > 0 || littleParallelism == PARALLELISM_DEFAULT,
"The parallelism must be greater than zero.");
this.littleParallelism = littleParallelism;
return this;
}
/*
* Implementation notes:
*
* The requirement that "K extends CopyableValue<K>" can be removed when
* Flink has a self-join and GenerateTriplets is implemented as such.
*
* ProjectTriangles should eventually be replaced by ".projectFirst("*")"
* when projections use code generation.
*/
@Override
public DataSet<Result<K>> run(Graph<K, VV, EV> input)
throws Exception {
// u, v, bitmask where u < v
DataSet<Tuple3<K, K, ByteValue>> filteredByID = input
.getEdges()
.map(new OrderByID<K, EV>())
.setParallelism(littleParallelism)
.name("Order by ID")
.groupBy(0, 1)
.reduceGroup(new ReduceBitmask<K>())
.setParallelism(littleParallelism)
.name("Flatten by ID");
// u, v, (deg(u), deg(v))
DataSet<Edge<K, Tuple3<EV, Degrees, Degrees>>> pairDegrees = input
.run(new EdgeDegreesPair<K, VV, EV>()
.setParallelism(littleParallelism));
// u, v, bitmask where deg(u) < deg(v) or (deg(u) == deg(v) and u < v)
DataSet<Tuple3<K, K, ByteValue>> filteredByDegree = pairDegrees
.map(new OrderByDegree<K, EV>())
.setParallelism(littleParallelism)
.name("Order by degree")
.groupBy(0, 1)
.reduceGroup(new ReduceBitmask<K>())
.setParallelism(littleParallelism)
.name("Flatten by degree");
// u, v, w, bitmask where (u, v) and (u, w) are edges in graph
DataSet<Tuple4<K, K, K, ByteValue>> triplets = filteredByDegree
.groupBy(0)
.sortGroup(1, Order.ASCENDING)
.reduceGroup(new GenerateTriplets<K>())
.setParallelism(littleParallelism)
.name("Generate triplets");
// u, v, w, bitmask where (u, v), (u, w), and (v, w) are edges in graph
DataSet<Result<K>> triangles = triplets
.join(filteredByID, JoinOperatorBase.JoinHint.REPARTITION_HASH_SECOND)
.where(1, 2)
.equalTo(0, 1)
.with(new ProjectTriangles<K>())
.setParallelism(littleParallelism)
.name("Triangle listing");
if (sortTriangleVertices) {
triangles = triangles
.map(new SortTriangleVertices<K>())
.name("Sort triangle vertices");
}
return triangles;
}
/**
* Removes edge values while emitting a Tuple3 where f0 and f1 are,
* respectively, the lesser and greater of the source and target IDs.
* The third field is a bitmask representing the vertex order.
*
* @param <T> ID type
* @param <ET> edge value type
*/
private static final class OrderByID<T extends Comparable<T>, ET>
implements MapFunction<Edge<T, ET>, Tuple3<T, T, ByteValue>> {
private ByteValue forward = new ByteValue(EdgeOrder.FORWARD.getBitmask());
private ByteValue reverse = new ByteValue(EdgeOrder.REVERSE.getBitmask());
private Tuple3<T, T, ByteValue> output = new Tuple3<>();
@Override
public Tuple3<T, T, ByteValue> map(Edge<T, ET> value)
throws Exception {
if (value.f0.compareTo(value.f1) < 0) {
output.f0 = value.f0;
output.f1 = value.f1;
output.f2 = forward;
} else {
output.f0 = value.f1;
output.f1 = value.f0;
output.f2 = reverse;
}
return output;
}
}
/**
* Reduce bitmasks to a single value using bitwise-or.
*
* @param <T> ID type
*/
@ForwardedFields("0; 1")
private static final class ReduceBitmask<T>
implements GroupReduceFunction<Tuple3<T, T, ByteValue>, Tuple3<T, T, ByteValue>> {
@Override
public void reduce(Iterable<Tuple3<T, T, ByteValue>> values, Collector<Tuple3<T, T, ByteValue>> out)
throws Exception {
Tuple3<T, T, ByteValue> output = null;
byte bitmask = 0;
for (Tuple3<T, T, ByteValue> value: values) {
output = value;
bitmask |= value.f2.getValue();
}
output.f2.setValue(bitmask);
out.collect(output);
}
}
/**
* Removes edge values while emitting a Tuple3 where f0 and f1 are,
* respectively, the lesser and greater of the source and target IDs
* by degree count. If the source and target vertex degrees are equal
* then the IDs are compared and emitted in order. The third field is
* a bitmask representing the vertex order.
*
* @param <T> ID type
* @param <ET> edge value type
*/
private static final class OrderByDegree<T extends Comparable<T>, ET>
implements MapFunction<Edge<T, Tuple3<ET, Degrees, Degrees>>, Tuple3<T, T, ByteValue>> {
private ByteValue forward = new ByteValue((byte)(EdgeOrder.FORWARD.getBitmask() << 2));
private ByteValue reverse = new ByteValue((byte)(EdgeOrder.REVERSE.getBitmask() << 2));
private Tuple3<T, T, ByteValue> output = new Tuple3<>();
@Override
public Tuple3<T, T, ByteValue> map(Edge<T, Tuple3<ET, Degrees, Degrees>> value)
throws Exception {
Tuple3<ET, Degrees, Degrees> degrees = value.f2;
long sourceDegree = degrees.f1.getDegree().getValue();
long targetDegree = degrees.f2.getDegree().getValue();
if (sourceDegree < targetDegree ||
(sourceDegree == targetDegree && value.f0.compareTo(value.f1) < 0)) {
output.f0 = value.f0;
output.f1 = value.f1;
output.f2 = forward;
} else {
output.f0 = value.f1;
output.f1 = value.f0;
output.f2 = reverse;
}
return output;
}
}
/**
* Generates the set of triplets by the pairwise enumeration of the open
* neighborhood for each vertex. The number of triplets is quadratic in
* the vertex degree; however, data skew is minimized by only generating
* triplets from the vertex with least degree.
*
* @param <T> ID type
*/
@ForwardedFields("0")
private static final class GenerateTriplets<T extends CopyableValue<T>>
implements GroupReduceFunction<Tuple3<T, T, ByteValue>, Tuple4<T, T, T, ByteValue>> {
private Tuple4<T, T, T, ByteValue> output = new Tuple4<>(null, null, null, new ByteValue());
private List<Tuple2<T, ByteValue>> visited = new ArrayList<>();
@Override
public void reduce(Iterable<Tuple3<T, T, ByteValue>> values, Collector<Tuple4<T, T, T, ByteValue>> out)
throws Exception {
int visitedCount = 0;
Iterator<Tuple3<T, T, ByteValue>> iter = values.iterator();
while (true) {
Tuple3<T, T, ByteValue> edge = iter.next();
byte bitmask = edge.f2.getValue();
output.f0 = edge.f0;
output.f2 = edge.f1;
for (int i = 0; i < visitedCount; i++) {
Tuple2<T, ByteValue> previous = visited.get(i);
output.f1 = previous.f0;
output.f3.setValue((byte)(previous.f1.getValue() | bitmask));
// u, v, w, bitmask
out.collect(output);
}
if (! iter.hasNext()) {
break;
}
byte shiftedBitmask = (byte)(bitmask << 2);
if (visitedCount == visited.size()) {
visited.add(new Tuple2<>(edge.f1.copy(), new ByteValue(shiftedBitmask)));
} else {
Tuple2<T, ByteValue> update = visited.get(visitedCount);
edge.f1.copyTo(update.f0);
update.f1.setValue(shiftedBitmask);
}
visitedCount += 1;
}
}
}
/**
* Simply project the triplet as a triangle while collapsing triplet and edge bitmasks.
*
* @param <T> ID type
*/
@ForwardedFieldsFirst("0; 1; 2")
@ForwardedFieldsSecond("0; 1")
private static final class ProjectTriangles<T>
implements JoinFunction<Tuple4<T, T, T, ByteValue>, Tuple3<T, T, ByteValue>, Result<T>> {
private Result<T> output = new Result<>(null, null, null, new ByteValue());
@Override
public Result<T> join(Tuple4<T, T, T, ByteValue> triplet, Tuple3<T, T, ByteValue> edge)
throws Exception {
output.f0 = triplet.f0;
output.f1 = triplet.f1;
output.f2 = triplet.f2;
output.f3.setValue((byte)(triplet.f3.getValue() | edge.f2.getValue()));
return output;
}
}
/**
* Reorders the vertices of each emitted triangle (K0, K1, K2, bitmask)
* into sorted order such that K0 < K1 < K2.
*
* @param <T> ID type
*/
private static final class SortTriangleVertices<T extends Comparable<T>>
implements MapFunction<Result<T>, Result<T>> {
@Override
public Result<T> map(Result<T> value)
throws Exception {
// by the triangle listing algorithm we know f1 < f2
if (value.f0.compareTo(value.f1) > 0) {
byte bitmask = value.f3.getValue();
T temp_val = value.f0;
value.f0 = value.f1;
if (temp_val.compareTo(value.f2) < 0) {
value.f1 = temp_val;
int f0f1 = ((bitmask & 0b100000) >>> 1) | ((bitmask & 0b010000) << 1);
int f0f2 = (bitmask & 0b001100) >>> 2;
int f1f2 = (bitmask & 0b000011) << 2;
value.f3.setValue((byte)(f0f1 | f0f2 | f1f2));
} else {
value.f1 = value.f2;
value.f2 = temp_val;
int f0f1 = (bitmask & 0b000011) << 4;
int f0f2 = ((bitmask & 0b100000) >>> 3) | ((bitmask & 0b010000) >>> 1);
int f1f2 = ((bitmask & 0b001000) >>> 3) | ((bitmask & 0b000100) >>> 1);
value.f3.setValue((byte)(f0f1 | f0f2 | f1f2));
}
}
return value;
}
}
/**
* Wraps the vertex type to encapsulate results from the Triangle Listing algorithm.
*
* @param <T> ID type
*/
public static class Result<T>
extends Tuple4<T, T, T, ByteValue> {
/**
* No-args constructor.
*/
public Result() {}
/**
* Populates parent tuple with constructor parameters.
*
* @param value0 1st triangle vertex ID
* @param value1 2nd triangle vertex ID
* @param value2 3rd triangle vertex ID
* @param value3 bitmask indicating presence of six possible edges between triangle vertices
*/
public Result(T value0, T value1, T value2, ByteValue value3) {
super(value0, value1, value2, value3);
}
/**
* Format values into a human-readable string.
*
* @return verbose string
*/
public String toVerboseString() {
byte bitmask = f3.getValue();
return "1st vertex ID: " + f0
+ ", 2nd vertex ID: " + f1
+ ", 3rd vertex ID: " + f2
+ ", edge directions: " + f0 + maskToString(bitmask, 4) + f1
+ ", " + f0 + maskToString(bitmask, 2) + f2
+ ", " + f1 + maskToString(bitmask, 0) + f2;
}
private String maskToString(byte mask, int shift) {
switch((mask >>> shift) & 0b000011) {
case 0b01:
// EdgeOrder.FORWARD
return "->";
case 0b10:
// EdgeOrder.REVERSE
return "<-";
case 0b11:
// EdgeOrder.MUTUAL
return "<->";
default:
throw new IllegalArgumentException("Bitmask is missing an edge (mask = "
+ mask + ", shift = " + shift);
}
}
}
}
......@@ -58,6 +58,13 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
return this;
}
/*
* Implementation notes:
*
* The requirement that "K extends CopyableValue<K>" can be removed when
* removed from TriangleListing.
*/
@Override
public GlobalClusteringCoefficient<K, VV, EV> run(Graph<K, VV, EV> input)
throws Exception {
......@@ -78,7 +85,10 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
@Override
public Result getResult() {
return new Result(vertexMetrics.getResult().getNumberOfTriplets(), 3 * triangleCount.getResult());
// each triangle is counted from each of the three vertices
long numberOfTriangles = 3 * triangleCount.getResult();
return new Result(vertexMetrics.getResult().getNumberOfTriplets(), numberOfTriangles);
}
/**
......@@ -86,8 +96,15 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
*/
public static class Result {
private long tripletCount;
private long triangleCount;
/**
* Instantiate an immutable result.
*
* @param tripletCount triplet count
* @param triangleCount triangle count
*/
public Result(long tripletCount, long triangleCount) {
this.tripletCount = tripletCount;
this.triangleCount = triangleCount;
......@@ -121,13 +138,15 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
*
* @return global clustering coefficient score
*/
public double getLocalClusteringCoefficientScore() {
public double getGlobalClusteringCoefficientScore() {
return (tripletCount == 0) ? Double.NaN : triangleCount / (double)tripletCount;
}
@Override
public String toString() {
return "triplet count: " + tripletCount + ", triangle count:" + triangleCount;
return "triplet count: " + tripletCount
+ ", triangle count: " + triangleCount
+ ", global clustering coefficient: " + getGlobalClusteringCoefficientScore();
}
@Override
......
......@@ -102,7 +102,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Result<K>>> {
// u, triangle count
DataSet<Tuple2<K, LongValue>> vertexTriangleCount = triangleVertices
.groupBy(0)
.reduce(new CountVertices<K>())
.reduce(new CountTriangles<K>())
.name("Count triangles");
// u, deg(u)
......@@ -145,12 +145,12 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Result<K>>> {
}
/**
* Combines the count of each vertex ID.
* Sums the triangle count for each vertex ID.
*
* @param <T> ID type
*/
@FunctionAnnotation.ForwardedFields("0")
private static class CountVertices<T>
private static class CountTriangles<T>
implements ReduceFunction<Tuple2<T, LongValue>> {
@Override
public Tuple2<T, LongValue> reduce(Tuple2<T, LongValue> left, Tuple2<T, LongValue> right)
......@@ -185,7 +185,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Result<K>>> {
}
/**
* Wraps the vertex type to encapsulate results from the local clustering coefficient algorithm.
* Wraps the vertex type to encapsulate results from the Local Clustering Coefficient algorithm.
*
* @param <T> ID type
*/
......@@ -195,9 +195,6 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Result<K>>> {
private Murmur3_32 hasher = new Murmur3_32(HASH_SEED);
/**
* The no-arg constructor instantiates contained objects.
*/
public Result() {
f1 = new Tuple2<>();
}
......@@ -238,6 +235,11 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Result<K>>> {
return (neighborPairs == 0) ? Double.NaN : getTriangleCount().getValue() / (double)neighborPairs;
}
/**
* Format values into a human-readable string.
*
* @return verbose string
*/
public String toVerboseString() {
return "Vertex ID: " + f0
+ ", vertex degree: " + getDegree()
......
......@@ -66,7 +66,9 @@ extends AbstractGraphAnalytic<K, VV, EV, Long> {
.setSortTriangleVertices(false)
.setLittleParallelism(littleParallelism));
triangles.output(new CountHelper<Tuple3<K, K, K>>(id)).name("Count triangles");
triangles
.output(new CountHelper<Tuple3<K, K, K>>(id))
.name("Count triangles");
return this;
}
......
......@@ -220,6 +220,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Tuple3<K, K, K>>> {
*
* @param <T> ID type
*/
@ForwardedFields("0")
private static final class GenerateTriplets<T extends CopyableValue<T>>
implements GroupReduceFunction<Tuple2<T, T>, Tuple3<T, T, T>> {
private Tuple3<T, T, T> output = new Tuple3<>();
......@@ -269,9 +270,9 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Tuple3<K, K, K>>> {
private static final class ProjectTriangles<T>
implements JoinFunction<Tuple3<T, T, T>, Tuple2<T, T>, Tuple3<T, T, T>> {
@Override
public Tuple3<T, T, T> join(Tuple3<T, T, T> first, Tuple2<T, T> second)
public Tuple3<T, T, T> join(Tuple3<T, T, T> triplet, Tuple2<T, T> edge)
throws Exception {
return first;
return triplet;
}
}
......
/*
*
* * Licensed to the Apache Software Foundation (ASF) under one
* * or more contributor license agreements. See the NOTICE file
* * distributed with this work for additional information
* * regarding copyright ownership. The ASF licenses this file
* * to you under the Apache License, Version 2.0 (the
* * "License"); you may not use this file except in compliance
* * with the License. You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
*
*/
package org.apache.flink.graph.library.metric.directed;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.graph.AbstractGraphAnalytic;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees;
import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
import org.apache.flink.graph.library.metric.directed.VertexMetrics.Result;
import org.apache.flink.types.CopyableValue;
import org.apache.flink.util.AbstractID;
import java.io.IOException;
import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
/**
* Compute the number of vertices, number of edges, and number of triplets in
* a directed graph.
*
* @param <K> graph ID type
* @param <VV> vertex value type
* @param <EV> edge value type
*/
public class VertexMetrics<K extends Comparable<K> & CopyableValue<K>, VV, EV>
extends AbstractGraphAnalytic<K, VV, EV, Result> {
private String id = new AbstractID().toString();
// Optional configuration
private boolean includeZeroDegreeVertices = false;
private int parallelism = PARALLELISM_DEFAULT;
/**
* By default only the edge set is processed for the computation of degree.
* When this flag is set an additional join is performed against the vertex
* set in order to output vertices with a degree of zero.
*
* @param includeZeroDegreeVertices whether to output vertices with a
* degree of zero
* @return this
*/
public VertexMetrics<K, VV, EV> setIncludeZeroDegreeVertices(boolean includeZeroDegreeVertices) {
this.includeZeroDegreeVertices = includeZeroDegreeVertices;
return this;
}
/**
* Override the operator parallelism.
*
* @param parallelism operator parallelism
* @return this
*/
public VertexMetrics<K, VV, EV> setParallelism(int parallelism) {
this.parallelism = parallelism;
return this;
}
@Override
public VertexMetrics<K, VV, EV> run(Graph<K, VV, EV> input)
throws Exception {
super.run(input);
DataSet<Vertex<K, Degrees>> vertexDegree = input
.run(new VertexDegrees<K, VV, EV>()
.setIncludeZeroDegreeVertices(includeZeroDegreeVertices)
.setParallelism(parallelism));
vertexDegree
.output(new VertexMetricsHelper<K>(id))
.name("Vertex metrics");
return this;
}
@Override
public Result getResult() {
JobExecutionResult res = env.getLastJobExecutionResult();
long vertexCount = res.getAccumulatorResult(id + "-0");
long edgeCount = res.getAccumulatorResult(id + "-1");
long tripletCount = res.getAccumulatorResult(id + "-2");
return new Result(vertexCount, edgeCount / 2, tripletCount);
}
/**
* Helper class to collect vertex metrics.
*
* @param <T> ID type
*/
private static class VertexMetricsHelper<T>
extends RichOutputFormat<Vertex<T, Degrees>> {
private final String id;
private long vertexCount;
private long edgeCount;
private long tripletCount;
/**
* This helper class collects vertex metrics by scanning over and
* discarding elements from the given DataSet.
*
* The unique id is required because Flink's accumulator namespace is
* among all operators.
*
* @param id unique string used for accumulator names
*/
public VertexMetricsHelper(String id) {
this.id = id;
}
@Override
public void configure(Configuration parameters) {}
@Override
public void open(int taskNumber, int numTasks) throws IOException {}
@Override
public void writeRecord(Vertex<T, Degrees> record) throws IOException {
long degree = record.f1.getDegree().getValue();
long outDegree = record.f1.getOutDegree().getValue();
vertexCount++;
edgeCount += outDegree;
tripletCount += degree * (degree - 1) / 2;
}
@Override
public void close() throws IOException {
getRuntimeContext().addAccumulator(id + "-0", new LongCounter(vertexCount));
getRuntimeContext().addAccumulator(id + "-1", new LongCounter(edgeCount));
getRuntimeContext().addAccumulator(id + "-2", new LongCounter(tripletCount));
}
}
/**
* Wraps vertex metrics.
*/
public static class Result {
private long vertexCount;
private long edgeCount;
private long tripletCount;
public Result(long vertexCount, long edgeCount, long tripletCount) {
this.vertexCount = vertexCount;
this.edgeCount = edgeCount;
this.tripletCount = tripletCount;
}
/**
* Get the number of vertices.
*
* @return number of vertices
*/
public long getNumberOfVertices() {
return vertexCount;
}
/**
* Get the number of edges.
*
* @return number of edges
*/
public long getNumberOfEdges() {
return edgeCount;
}
/**
* Get the number of triplets.
*
* @return number of triplets
*/
public long getNumberOfTriplets() {
return tripletCount;
}
@Override
public String toString() {
return "vertex count: " + vertexCount
+ ", edge count:" + edgeCount
+ ", triplet count: " + tripletCount;
}
@Override
public int hashCode() {
return new HashCodeBuilder()
.append(vertexCount)
.append(edgeCount)
.append(tripletCount)
.hashCode();
}
@Override
public boolean equals(Object obj) {
if (obj == null) { return false; }
if (obj == this) { return true; }
if (obj.getClass() != getClass()) { return false; }
Result rhs = (Result)obj;
return new EqualsBuilder()
.append(vertexCount, rhs.vertexCount)
.append(edgeCount, rhs.edgeCount)
.append(tripletCount, rhs.tripletCount)
.isEquals();
}
}
}
......@@ -111,7 +111,9 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
.setReduceOnTargetId(reduceOnTargetId)
.setParallelism(parallelism));
vertexDegree.output(new VertexMetricsHelper<K>(id)).name("Vertex metrics");
vertexDegree
.output(new VertexMetricsHelper<K>(id))
.name("Vertex metrics");
return this;
}
......
......@@ -67,11 +67,11 @@ public class Murmur3_32 implements Serializable {
count++;
input *= 0xcc9e2d51;
input = input << 15;
input = Integer.rotateLeft(input, 15);
input *= 0x1b873593;
hash ^= input;
hash = hash << 13;
hash = Integer.rotateLeft(hash, 13);
hash = hash * 5 + 0xe6546b64;
return this;
......
......@@ -64,11 +64,11 @@ public class AsmTestBase {
Object[][] edges = new Object[][] {
new Object[]{0, 1},
new Object[]{0, 2},
new Object[]{1, 2},
new Object[]{1, 3},
new Object[]{2, 1},
new Object[]{2, 3},
new Object[]{3, 1},
new Object[]{3, 4},
new Object[]{3, 5},
new Object[]{5, 3},
};
List<Edge<IntValue,NullValue>> directedEdgeList = new LinkedList<>();
......
......@@ -40,13 +40,13 @@ extends AsmTestBase {
public void testWithSimpleGraph()
throws Exception {
String expectedResult =
"(0,1,((null),(2,2,0),(3,2,1)))\n" +
"(0,2,((null),(2,2,0),(3,1,2)))\n" +
"(1,2,((null),(3,2,1),(3,1,2)))\n" +
"(1,3,((null),(3,2,1),(4,2,2)))\n" +
"(2,3,((null),(3,1,2),(4,2,2)))\n" +
"(0,1,((null),(2,2,0),(3,0,3)))\n" +
"(0,2,((null),(2,2,0),(3,2,1)))\n" +
"(2,1,((null),(3,2,1),(3,0,3)))\n" +
"(2,3,((null),(3,2,1),(4,2,2)))\n" +
"(3,1,((null),(4,2,2),(3,0,3)))\n" +
"(3,4,((null),(4,2,2),(1,0,1)))\n" +
"(3,5,((null),(4,2,2),(1,0,1)))";
"(5,3,((null),(1,1,0),(4,2,2)))";
DataSet<Edge<IntValue, Tuple3<NullValue, Degrees, Degrees>>> degrees = directedSimpleGraph
.run(new EdgeDegreesPair<IntValue, NullValue, NullValue>());
......@@ -57,10 +57,10 @@ extends AsmTestBase {
@Test
public void testWithRMatGraph()
throws Exception {
ChecksumHashCode degreesChecksum = DataSetUtils.checksumHashCode(directedRMatGraph
ChecksumHashCode checksum = DataSetUtils.checksumHashCode(directedRMatGraph
.run(new EdgeDegreesPair<LongValue, NullValue, NullValue>()));
assertEquals(12009, degreesChecksum.getCount());
assertEquals(0x00001660b256c74eL, degreesChecksum.getChecksum());
assertEquals(12009, checksum.getCount());
assertEquals(0x0000176fe94702a3L, checksum.getChecksum());
}
}
......@@ -42,11 +42,11 @@ extends AsmTestBase {
String expectedResult =
"(0,1,((null),(2,2,0)))\n" +
"(0,2,((null),(2,2,0)))\n" +
"(1,2,((null),(3,2,1)))\n" +
"(1,3,((null),(3,2,1)))\n" +
"(2,3,((null),(3,1,2)))\n" +
"(2,1,((null),(3,2,1)))\n" +
"(2,3,((null),(3,2,1)))\n" +
"(3,1,((null),(4,2,2)))\n" +
"(3,4,((null),(4,2,2)))\n" +
"(3,5,((null),(4,2,2)))";
"(5,3,((null),(1,1,0)))";
DataSet<Edge<IntValue, Tuple2<NullValue, Degrees>>> degrees = directedSimpleGraph
.run(new EdgeSourceDegrees<IntValue, NullValue, NullValue>());
......@@ -57,10 +57,10 @@ extends AsmTestBase {
@Test
public void testWithRMatGraph()
throws Exception {
ChecksumHashCode sourceDegreesChecksum = DataSetUtils.checksumHashCode(directedRMatGraph
ChecksumHashCode checksum = DataSetUtils.checksumHashCode(directedRMatGraph
.run(new EdgeSourceDegrees<LongValue, NullValue, NullValue>()));
assertEquals(12009, sourceDegreesChecksum.getCount());
assertEquals(0x000015c4731764b0L, sourceDegreesChecksum.getChecksum());
assertEquals(12009, checksum.getCount());
assertEquals(0x0000162435fde1d9L, checksum.getChecksum());
}
}
......@@ -40,13 +40,13 @@ extends AsmTestBase {
public void testWithSimpleGraph()
throws Exception {
String expectedResult =
"(0,1,((null),(3,2,1)))\n" +
"(0,2,((null),(3,1,2)))\n" +
"(1,2,((null),(3,1,2)))\n" +
"(1,3,((null),(4,2,2)))\n" +
"(0,1,((null),(3,0,3)))\n" +
"(0,2,((null),(3,2,1)))\n" +
"(2,1,((null),(3,0,3)))\n" +
"(2,3,((null),(4,2,2)))\n" +
"(3,1,((null),(3,0,3)))\n" +
"(3,4,((null),(1,0,1)))\n" +
"(3,5,((null),(1,0,1)))";
"(5,3,((null),(4,2,2)))";
DataSet<Edge<IntValue, Tuple2<NullValue, Degrees>>> degrees = directedSimpleGraph
.run(new EdgeTargetDegrees<IntValue, NullValue, NullValue>());
......@@ -57,10 +57,10 @@ extends AsmTestBase {
@Test
public void testWithRMatGraph()
throws Exception {
ChecksumHashCode targetDegreesChecksum = DataSetUtils.checksumHashCode(directedRMatGraph
ChecksumHashCode checksum = DataSetUtils.checksumHashCode(directedRMatGraph
.run(new EdgeTargetDegrees<LongValue, NullValue, NullValue>()));
assertEquals(12009, targetDegreesChecksum.getCount());
assertEquals(0x000015e65749b923L, targetDegreesChecksum.getChecksum());
assertEquals(12009, checksum.getCount());
assertEquals(0x0000160af450cc81L, checksum.getChecksum());
}
}
......@@ -43,11 +43,11 @@ extends AsmTestBase {
String expectedResult =
"(0,(2,2,0))\n" +
"(1,(3,2,1))\n" +
"(2,(3,1,2))\n" +
"(1,(3,0,3))\n" +
"(2,(3,2,1))\n" +
"(3,(4,2,2))\n" +
"(4,(1,0,1))\n" +
"(5,(1,0,1))";
"(5,(1,1,0))";
TestBaseUtils.compareResultAsText(vertexDegrees.collect(), expectedResult);
}
......@@ -95,10 +95,10 @@ extends AsmTestBase {
@Test
public void testWithRMatGraph()
throws Exception {
ChecksumHashCode degreesChecksum = DataSetUtils.checksumHashCode(directedRMatGraph
ChecksumHashCode checksum = DataSetUtils.checksumHashCode(directedRMatGraph
.run(new VertexDegrees<LongValue, NullValue, NullValue>()));
assertEquals(902, degreesChecksum.getCount());
assertEquals(0x000001527b0f9e80L, degreesChecksum.getChecksum());
assertEquals(902, checksum.getCount());
assertEquals(0x000001a3305dd86aL, checksum.getChecksum());
}
}
......@@ -43,11 +43,11 @@ extends AsmTestBase {
String expectedResult =
"(0,0)\n" +
"(1,1)\n" +
"(2,2)\n" +
"(1,3)\n" +
"(2,1)\n" +
"(3,2)\n" +
"(4,1)\n" +
"(5,1)";
"(5,0)";
TestBaseUtils.compareResultAsText(vertexDegrees.collect(), expectedResult);
}
......
......@@ -43,11 +43,11 @@ extends AsmTestBase {
String expectedResult =
"(0,2)\n" +
"(1,2)\n" +
"(2,1)\n" +
"(1,0)\n" +
"(2,2)\n" +
"(3,2)\n" +
"(4,0)\n" +
"(5,0)";
"(5,1)";
TestBaseUtils.compareResultAsText(vertexDegrees.collect(), expectedResult);
}
......
/*
*
* * Licensed to the Apache Software Foundation (ASF) under one
* * or more contributor license agreements. See the NOTICE file
* * distributed with this work for additional information
* * regarding copyright ownership. The ASF licenses this file
* * to you under the Apache License, Version 2.0 (the
* * "License"); you may not use this file except in compliance
* * with the License. You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
*
*/
package org.apache.flink.graph.library.clustering.directed;
import org.apache.commons.math3.util.CombinatoricsUtils;
import org.apache.flink.graph.asm.AsmTestBase;
import org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient.Result;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.LongValue;
import org.apache.flink.types.NullValue;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
public class GlobalClusteringCoefficientTest
extends AsmTestBase {
@Test
public void testWithSimpleGraph()
throws Exception {
Result expectedResult = new Result(13, 6);
Result globalClusteringCoefficient = new GlobalClusteringCoefficient<IntValue, NullValue, NullValue>()
.run(directedSimpleGraph)
.execute();
assertEquals(expectedResult, globalClusteringCoefficient);
}
@Test
public void testWithCompleteGraph()
throws Exception {
long expectedDegree = completeGraphVertexCount - 1;
long expectedCount = completeGraphVertexCount * CombinatoricsUtils.binomialCoefficient((int)expectedDegree, 2);
Result expectedResult = new Result(expectedCount, expectedCount);
Result globalClusteringCoefficient = new GlobalClusteringCoefficient<LongValue, NullValue, NullValue>()
.run(completeGraph)
.execute();
assertEquals(expectedResult, globalClusteringCoefficient);
}
@Test
public void testWithEmptyGraph()
throws Exception {
Result expectedResult = new Result(0, 0);
Result globalClusteringCoefficient = new GlobalClusteringCoefficient<LongValue, NullValue, NullValue>()
.run(emptyGraph)
.execute();
assertEquals(expectedResult, globalClusteringCoefficient);
}
@Test
public void testWithRMatGraph()
throws Exception {
Result expectedResult = new Result(1003442, 225147);
Result globalClusteringCoefficient = new GlobalClusteringCoefficient<LongValue, NullValue, NullValue>()
.run(directedRMatGraph)
.execute();
assertEquals(expectedResult, globalClusteringCoefficient);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.graph.library.clustering.directed;
import org.apache.commons.math3.util.CombinatoricsUtils;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.Utils.ChecksumHashCode;
import org.apache.flink.api.java.utils.DataSetUtils;
import org.apache.flink.graph.asm.AsmTestBase;
import org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient.Result;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.LongValue;
import org.apache.flink.types.NullValue;
import org.junit.Test;
import java.util.List;
import static org.junit.Assert.assertEquals;
public class LocalClusteringCoefficientTest
extends AsmTestBase {
@Test
public void testSimpleGraph()
throws Exception {
DataSet<Result<IntValue>> cc = directedSimpleGraph
.run(new LocalClusteringCoefficient<IntValue, NullValue, NullValue>());
String expectedResult =
"(0,(2,1))\n" +
"(1,(3,2))\n" +
"(2,(3,2))\n" +
"(3,(4,1))\n" +
"(4,(1,0))\n" +
"(5,(1,0))";
TestBaseUtils.compareResultAsText(cc.collect(), expectedResult);
}
@Test
public void testCompleteGraph()
throws Exception {
long expectedDegree = completeGraphVertexCount - 1;
long expectedTriangleCount = 2 * CombinatoricsUtils.binomialCoefficient((int)expectedDegree, 2);
DataSet<Result<LongValue>> cc = completeGraph
.run(new LocalClusteringCoefficient<LongValue, NullValue, NullValue>());
List<Result<LongValue>> results = cc.collect();
assertEquals(completeGraphVertexCount, results.size());
for (Result<LongValue> result : results) {
assertEquals(expectedDegree, result.getDegree().getValue());
assertEquals(expectedTriangleCount, result.getTriangleCount().getValue());
}
}
@Test
public void testRMatGraph()
throws Exception {
ChecksumHashCode checksum = DataSetUtils.checksumHashCode(directedRMatGraph
.run(new LocalClusteringCoefficient<LongValue, NullValue, NullValue>()));
assertEquals(902, checksum.getCount());
assertEquals(0x000001bf83866775L, checksum.getChecksum());
}
}
/*
*
* * Licensed to the Apache Software Foundation (ASF) under one
* * or more contributor license agreements. See the NOTICE file
* * distributed with this work for additional information
* * regarding copyright ownership. The ASF licenses this file
* * to you under the Apache License, Version 2.0 (the
* * "License"); you may not use this file except in compliance
* * with the License. You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
*
*/
package org.apache.flink.graph.library.clustering.directed;
import org.apache.commons.math3.util.CombinatoricsUtils;
import org.apache.flink.graph.asm.AsmTestBase;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.LongValue;
import org.apache.flink.types.NullValue;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
public class TriangleCountTest
extends AsmTestBase {
@Test
public void testWithSimpleGraph()
throws Exception {
long triangleCount = new TriangleCount<IntValue, NullValue, NullValue>()
.run(directedSimpleGraph)
.execute();
assertEquals(2, triangleCount);
}
@Test
public void testWithCompleteGraph()
throws Exception {
long expectedDegree = completeGraphVertexCount - 1;
long expectedCount = completeGraphVertexCount * CombinatoricsUtils.binomialCoefficient((int)expectedDegree, 2) / 3;
long triangleCount = new TriangleCount<LongValue, NullValue, NullValue>()
.run(completeGraph)
.execute();
assertEquals(expectedCount, triangleCount);
}
@Test
public void testWithEmptyGraph()
throws Exception {
long triangleCount = new TriangleCount<LongValue, NullValue, NullValue>()
.run(emptyGraph)
.execute();
assertEquals(0, triangleCount);
}
@Test
public void testWithRMatGraph()
throws Exception {
long triangleCount = new TriangleCount<LongValue, NullValue, NullValue>()
.run(directedRMatGraph)
.execute();
assertEquals(75049, triangleCount);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.graph.library.clustering.directed;
import org.apache.commons.math3.util.CombinatoricsUtils;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.Utils.ChecksumHashCode;
import org.apache.flink.api.java.utils.DataSetUtils;
import org.apache.flink.graph.asm.AsmTestBase;
import org.apache.flink.graph.library.clustering.directed.TriangleListing.Result;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.LongValue;
import org.apache.flink.types.NullValue;
import org.junit.Test;
import java.util.List;
import static org.junit.Assert.assertEquals;
public class TriangleListingTest
extends AsmTestBase {
@Test
public void testSimpleGraph()
throws Exception {
DataSet<Result<IntValue>> tl = directedSimpleGraph
.run(new TriangleListing<IntValue, NullValue, NullValue>()
.setSortTriangleVertices(true));
String expectedResult =
"(0,1,2,22)\n" +
"(1,2,3,41)";
TestBaseUtils.compareResultAsText(tl.collect(), expectedResult);
}
@Test
public void testCompleteGraph()
throws Exception {
long expectedDegree = completeGraphVertexCount - 1;
long expectedCount = completeGraphVertexCount * CombinatoricsUtils.binomialCoefficient((int)expectedDegree, 2) / 3;
DataSet<Result<LongValue>> tl = completeGraph
.run(new TriangleListing<LongValue, NullValue, NullValue>());
List<Result<LongValue>> results = tl.collect();
assertEquals(expectedCount, results.size());
for (Result<LongValue> result : results) {
assertEquals(0b111111, result.f3.getValue());
}
}
@Test
public void testRMatGraph()
throws Exception {
DataSet<Result<LongValue>> tl = directedRMatGraph
.run(new TriangleListing<LongValue, NullValue, NullValue>()
.setSortTriangleVertices(true));
ChecksumHashCode checksum = DataSetUtils.checksumHashCode(tl);
assertEquals(75049, checksum.getCount());
assertEquals(0x00000033111f1054L, checksum.getChecksum());
}
}
......@@ -76,12 +76,10 @@ extends AsmTestBase {
@Test
public void testRMatGraph()
throws Exception {
DataSet<Result<LongValue>> cc = undirectedRMatGraph
.run(new LocalClusteringCoefficient<LongValue, NullValue, NullValue>());
ChecksumHashCode checksum = DataSetUtils.checksumHashCode(cc);
ChecksumHashCode checksum = DataSetUtils.checksumHashCode(undirectedRMatGraph
.run(new LocalClusteringCoefficient<LongValue, NullValue, NullValue>()));
assertEquals(902, checksum.getCount());
assertEquals(0x000001b08e783277L, checksum.getChecksum());
assertEquals(0x000001cab2d3677bL, checksum.getChecksum());
}
}
/*
*
* * Licensed to the Apache Software Foundation (ASF) under one
* * or more contributor license agreements. See the NOTICE file
* * distributed with this work for additional information
* * regarding copyright ownership. The ASF licenses this file
* * to you under the Apache License, Version 2.0 (the
* * "License"); you may not use this file except in compliance
* * with the License. You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
*
*/
package org.apache.flink.graph.library.metric.directed;
import org.apache.commons.math3.util.CombinatoricsUtils;
import org.apache.flink.graph.asm.AsmTestBase;
import org.apache.flink.graph.library.metric.directed.VertexMetrics.Result;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.LongValue;
import org.apache.flink.types.NullValue;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
public class VertexMetricsTest
extends AsmTestBase {
@Test
public void testWithSimpleGraph()
throws Exception {
Result expectedResult = new Result(6, 7, 13);
Result vertexMetrics = new VertexMetrics<IntValue, NullValue, NullValue>()
.run(undirectedSimpleGraph)
.execute();
assertEquals(expectedResult, vertexMetrics);
}
@Test
public void testWithCompleteGraph()
throws Exception {
long expectedDegree = completeGraphVertexCount - 1;
long expectedEdges = completeGraphVertexCount * expectedDegree / 2;
long expectedTriplets = completeGraphVertexCount * CombinatoricsUtils.binomialCoefficient((int)expectedDegree, 2);
Result expectedResult = new Result(completeGraphVertexCount, expectedEdges, expectedTriplets);
Result vertexMetrics = new VertexMetrics<LongValue, NullValue, NullValue>()
.run(completeGraph)
.execute();
assertEquals(expectedResult, vertexMetrics);
}
@Test
public void testWithEmptyGraph()
throws Exception {
Result expectedResult;
expectedResult = new Result(0, 0, 0);
Result withoutZeroDegreeVertices = new VertexMetrics<LongValue, NullValue, NullValue>()
.setIncludeZeroDegreeVertices(false)
.run(emptyGraph)
.execute();
assertEquals(withoutZeroDegreeVertices, expectedResult);
expectedResult = new Result(3, 0, 0);
Result withZeroDegreeVertices = new VertexMetrics<LongValue, NullValue, NullValue>()
.setIncludeZeroDegreeVertices(true)
.run(emptyGraph)
.execute();
assertEquals(expectedResult, withZeroDegreeVertices);
}
@Test
public void testWithRMatGraph()
throws Exception {
Result expectedResult = new Result(902, 10442, 1003442);
Result withoutZeroDegreeVertices = new VertexMetrics<LongValue, NullValue, NullValue>()
.run(undirectedRMatGraph)
.execute();
assertEquals(expectedResult, withoutZeroDegreeVertices);
}
}
......@@ -132,6 +132,6 @@ extends AsmTestBase {
ChecksumHashCode checksum = DataSetUtils.checksumHashCode(ji);
assertEquals(13954, checksum.getCount());
assertEquals(0x0000179f83a2a873L, checksum.getChecksum());
assertEquals(0x00001b1a1f7a9d0bL, checksum.getChecksum());
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册