提交 59cf7032 编写于 作者: G Greg Hogan

[hotfix] [gelly] Driver updates

- refactor SimpleDriver to call internal plan method
- add CLI parameters for RMatGraph, AdamicAdar, JaccardIndex
- remove unused data from VertexDegrees
- JaccardIndex now filters on > rather than >=
- handle null in ValueArrayTypeInfo
- add NonForwardingIdentityMapper to GraphUtils
上级 fb1ef081
......@@ -20,6 +20,7 @@ package org.apache.flink.graph;
import org.apache.commons.lang3.StringEscapeUtils;
import org.apache.commons.lang3.text.StrBuilder;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.CsvOutputFormat;
import org.apache.flink.api.java.utils.ParameterTool;
......@@ -188,20 +189,21 @@ public class Runner {
public static void main(String[] args) throws Exception {
// Set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
ExecutionConfig config = env.getConfig();
// should not have any non-Flink data types
env.getConfig().disableAutoTypeRegistration();
env.getConfig().disableForceAvro();
env.getConfig().disableForceKryo();
config.disableAutoTypeRegistration();
config.disableForceAvro();
config.disableForceKryo();
ParameterTool parameters = ParameterTool.fromArgs(args);
env.getConfig().setGlobalJobParameters(parameters);
config.setGlobalJobParameters(parameters);
// integration tests run with with object reuse both disabled and enabled
if (parameters.has("__disable_object_reuse")) {
env.getConfig().disableObjectReuse();
config.disableObjectReuse();
} else {
env.getConfig().enableObjectReuse();
config.enableObjectReuse();
}
// Usage
......
......@@ -20,9 +20,11 @@ package org.apache.flink.graph.drivers;
import org.apache.commons.lang3.text.StrBuilder;
import org.apache.commons.lang3.text.WordUtils;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.drivers.output.CSV;
import org.apache.flink.graph.drivers.output.Print;
import org.apache.flink.graph.drivers.parameter.DoubleParameter;
import org.apache.flink.graph.drivers.parameter.LongParameter;
import org.apache.flink.graph.library.similarity.AdamicAdar.Result;
import org.apache.flink.types.CopyableValue;
......@@ -33,8 +35,16 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
* Driver for {@link org.apache.flink.graph.library.similarity.AdamicAdar}.
*/
public class AdamicAdar<K extends CopyableValue<K>, VV, EV>
extends SimpleDriver<Result<K>>
implements Driver<K, VV, EV>, CSV, Print {
extends SimpleDriver<K, VV, EV, Result<K>>
implements CSV, Print {
private DoubleParameter minRatio = new DoubleParameter(this, "minimum_ratio")
.setDefaultValue(0.0)
.setMinimumValue(0.0, true);
private DoubleParameter minScore = new DoubleParameter(this, "minimum_score")
.setDefaultValue(0.0)
.setMinimumValue(0.0, true);
private LongParameter littleParallelism = new LongParameter(this, "little_parallelism")
.setDefaultValue(PARALLELISM_DEFAULT);
......@@ -61,11 +71,13 @@ implements Driver<K, VV, EV>, CSV, Print {
}
@Override
public void plan(Graph<K, VV, EV> graph) throws Exception {
protected DataSet<Result<K>> simplePlan(Graph<K, VV, EV> graph) throws Exception {
int lp = littleParallelism.getValue().intValue();
result = graph
return graph
.run(new org.apache.flink.graph.library.similarity.AdamicAdar<K, VV, EV>()
.setMinimumRatio(minRatio.getValue().floatValue())
.setMinimumScore(minScore.getValue().floatValue())
.setLittleParallelism(lp));
}
}
......@@ -20,6 +20,7 @@ package org.apache.flink.graph.drivers;
import org.apache.commons.lang3.text.StrBuilder;
import org.apache.commons.lang3.text.WordUtils;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.GraphAnalytic;
import org.apache.flink.graph.asm.result.PrintableResult;
......@@ -43,8 +44,8 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
* @see org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient
*/
public class ClusteringCoefficient<K extends Comparable<K> & CopyableValue<K>, VV, EV>
extends SimpleDriver<PrintableResult>
implements Driver<K, VV, EV>, CSV, Hash, Print {
extends SimpleDriver<K, VV, EV, PrintableResult>
implements CSV, Hash, Print {
private static final String DIRECTED = "directed";
......@@ -85,15 +86,11 @@ implements Driver<K, VV, EV>, CSV, Hash, Print {
}
@Override
public void plan(Graph<K, VV, EV> graph) throws Exception {
protected DataSet<PrintableResult> simplePlan(Graph<K, VV, EV> graph) throws Exception {
int lp = littleParallelism.getValue().intValue();
switch (order.getValue()) {
case DIRECTED:
result = graph
.run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<K, VV, EV>()
.setLittleParallelism(lp));
globalClusteringCoefficient = graph
.run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<K, VV, EV>()
.setLittleParallelism(lp));
......@@ -101,13 +98,14 @@ implements Driver<K, VV, EV>, CSV, Hash, Print {
averageClusteringCoefficient = graph
.run(new org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient<K, VV, EV>()
.setLittleParallelism(lp));
break;
case UNDIRECTED:
result = graph
.run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<K, VV, EV>()
@SuppressWarnings("unchecked")
DataSet<PrintableResult> directedResult = (DataSet<PrintableResult>) (DataSet<?>) graph
.run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<K, VV, EV>()
.setLittleParallelism(lp));
return directedResult;
case UNDIRECTED:
globalClusteringCoefficient = graph
.run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<K, VV, EV>()
.setLittleParallelism(lp));
......@@ -115,7 +113,15 @@ implements Driver<K, VV, EV>, CSV, Hash, Print {
averageClusteringCoefficient = graph
.run(new org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient<K, VV, EV>()
.setLittleParallelism(lp));
break;
@SuppressWarnings("unchecked")
DataSet<PrintableResult> undirectedResult = (DataSet<PrintableResult>) (DataSet<?>) graph
.run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<K, VV, EV>()
.setLittleParallelism(lp));
return undirectedResult;
default:
throw new RuntimeException("Unknown order: " + order);
}
}
......
......@@ -78,12 +78,9 @@ implements Driver<K, VV, EV>, CSV, Hash, Print {
@Override
public void print(String executionName) throws Exception {
Collect<Vertex<K, K>> collector = new Collect<>();
List<Vertex<K, K>> results = new Collect<Vertex<K, K>>().run(components).execute(executionName);
// Refactored due to openjdk7 compile error: https://travis-ci.org/greghogan/flink/builds/200487761
List<Vertex<K, K>> records = collector.run(components).execute(executionName);
for (Vertex<K, K> result : records) {
for (Vertex<K, K> result : results) {
System.out.println(result);
}
}
......
......@@ -82,10 +82,7 @@ implements Driver<K, VV, EV>, CSV, Hash, Print {
@Override
public void print(String executionName) throws Exception {
Collect<Edge<K, EV>> collector = new Collect<>();
// Refactored due to openjdk7 compile error: https://travis-ci.org/greghogan/flink/builds/200487761
List<Edge<K, EV>> records = collector.run(edges).execute(executionName);
List<Edge<K, EV>> records = new Collect<Edge<K, EV>>().run(edges).execute(executionName);
if (hasNullValueEdges(edges)) {
for (Edge<K, EV> result : records) {
......
......@@ -20,6 +20,7 @@ package org.apache.flink.graph.drivers;
import org.apache.commons.lang3.text.StrBuilder;
import org.apache.commons.lang3.text.WordUtils;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.drivers.output.CSV;
import org.apache.flink.graph.drivers.output.Print;
......@@ -30,8 +31,8 @@ import org.apache.flink.graph.library.link_analysis.HITS.Result;
* Driver for {@link org.apache.flink.graph.library.link_analysis.HITS}.
*/
public class HITS<K, VV, EV>
extends SimpleDriver<Result<K>>
implements Driver<K, VV, EV>, CSV, Print {
extends SimpleDriver<K, VV, EV, Result<K>>
implements CSV, Print {
private static final int DEFAULT_ITERATIONS = 10;
......@@ -59,8 +60,8 @@ implements Driver<K, VV, EV>, CSV, Print {
}
@Override
public void plan(Graph<K, VV, EV> graph) throws Exception {
result = graph
protected DataSet<Result<K>> simplePlan(Graph<K, VV, EV> graph) throws Exception {
return graph
.run(new org.apache.flink.graph.library.link_analysis.HITS<K, VV, EV>(
iterationConvergence.getValue().iterations,
iterationConvergence.getValue().convergenceThreshold));
......
......@@ -20,6 +20,7 @@ package org.apache.flink.graph.drivers;
import org.apache.commons.lang3.text.StrBuilder;
import org.apache.commons.lang3.text.WordUtils;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.drivers.output.CSV;
import org.apache.flink.graph.drivers.output.Hash;
......@@ -34,8 +35,24 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
* Driver for {@link org.apache.flink.graph.library.similarity.JaccardIndex}.
*/
public class JaccardIndex<K extends CopyableValue<K>, VV, EV>
extends SimpleDriver<Result<K>>
implements Driver<K, VV, EV>, CSV, Hash, Print {
extends SimpleDriver<K, VV, EV, Result<K>>
implements CSV, Hash, Print {
private LongParameter minNumerator = new LongParameter(this, "minimum_numerator")
.setDefaultValue(0)
.setMinimumValue(0);
private LongParameter minDenominator = new LongParameter(this, "minimum_denominator")
.setDefaultValue(1)
.setMinimumValue(1);
private LongParameter maxNumerator = new LongParameter(this, "maximum_numerator")
.setDefaultValue(1)
.setMinimumValue(0);
private LongParameter maxDenominator = new LongParameter(this, "maximum_denominator")
.setDefaultValue(1)
.setMinimumValue(1);
private LongParameter littleParallelism = new LongParameter(this, "little_parallelism")
.setDefaultValue(PARALLELISM_DEFAULT);
......@@ -64,11 +81,13 @@ implements Driver<K, VV, EV>, CSV, Hash, Print {
}
@Override
public void plan(Graph<K, VV, EV> graph) throws Exception {
protected DataSet<Result<K>> simplePlan(Graph<K, VV, EV> graph) throws Exception {
int lp = littleParallelism.getValue().intValue();
result = graph
return graph
.run(new org.apache.flink.graph.library.similarity.JaccardIndex<K, VV, EV>()
.setMinimumScore(minNumerator.getValue().intValue(), minDenominator.getValue().intValue())
.setMaximumScore(maxNumerator.getValue().intValue(), maxDenominator.getValue().intValue())
.setLittleParallelism(lp));
}
}
......@@ -19,6 +19,7 @@
package org.apache.flink.graph.drivers;
import org.apache.commons.lang3.text.StrBuilder;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.drivers.output.CSV;
import org.apache.flink.graph.drivers.output.Print;
......@@ -30,8 +31,8 @@ import org.apache.flink.graph.library.link_analysis.PageRank.Result;
* @see org.apache.flink.graph.library.link_analysis.PageRank
*/
public class PageRank<K, VV, EV>
extends SimpleDriver<Result<K>>
implements Driver<K, VV, EV>, CSV, Print {
extends SimpleDriver<K, VV, EV, Result<K>>
implements CSV, Print {
private static final int DEFAULT_ITERATIONS = 10;
......@@ -64,8 +65,8 @@ implements Driver<K, VV, EV>, CSV, Print {
}
@Override
public void plan(Graph<K, VV, EV> graph) throws Exception {
result = graph
protected DataSet<Result<K>> simplePlan(Graph<K, VV, EV> graph) throws Exception {
return graph
.run(new org.apache.flink.graph.library.link_analysis.PageRank<K, VV, EV>(
dampingFactor.getValue(),
iterationConvergence.getValue().iterations,
......
......@@ -19,6 +19,7 @@
package org.apache.flink.graph.drivers;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.asm.dataset.ChecksumHashCode;
import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum;
import org.apache.flink.graph.asm.dataset.Collect;
......@@ -33,30 +34,74 @@ import java.util.List;
*
* @param <R> algorithm's result type
*/
public abstract class SimpleDriver<R extends PrintableResult>
extends ParameterizedBase {
public abstract class SimpleDriver<K, VV, EV, R extends PrintableResult>
extends ParameterizedBase
implements Driver<K, VV, EV> {
protected DataSet<? extends R> result;
private DataSet<R> result;
protected DataSet<R> getResult() {
return result;
}
/**
* Plan the algorithm and return the result {@link DataSet}.
*
* @param graph input graph
* @return driver output
* @throws Exception on error
*/
protected abstract DataSet<R> simplePlan(Graph<K, VV, EV> graph) throws Exception;
@Override
public void plan(Graph<K, VV, EV> graph) throws Exception {
result = simplePlan(graph);
}
/**
* Print hash of execution results.
*
* Does *not* implement/override {@code Hash} since {@link Driver}
* implementations designate the appropriate outputs.
*
* @param executionName job name
* @throws Exception on error
*/
public void hash(String executionName) throws Exception {
Checksum checksum = new ChecksumHashCode<R>()
.run((DataSet<R>) result)
.run(result)
.execute(executionName);
System.out.println(checksum);
}
/**
* Print execution results.
*
* Does *not* implement/override {@code Print} since {@link Driver}
* implementations designate the appropriate outputs.
*
* @param executionName job name
* @throws Exception on error
*/
public void print(String executionName) throws Exception {
Collect<R> collector = new Collect<>();
// Refactored due to openjdk7 compile error: https://travis-ci.org/greghogan/flink/builds/200487761
List<R> records = collector.run((DataSet<R>) result).execute(executionName);
List<R> results = new Collect<R>().run(result).execute(executionName);
for (R result : records) {
for (R result : results) {
System.out.println(result.toPrintableString());
}
}
/**
* Write execution results to file using CSV format.
*
* Does *not* implement/override {@code CSV} since {@link Driver}
* implementations designate the appropriate outputs.
*
* @param filename output filename
* @param lineDelimiter CSV delimiter between lines
* @param fieldDelimiter CSV delimiter between fields
*/
public void writeCSV(String filename, String lineDelimiter, String fieldDelimiter) {
result
.writeAsCsv(filename, lineDelimiter, fieldDelimiter)
......
......@@ -20,6 +20,7 @@ package org.apache.flink.graph.drivers;
import org.apache.commons.lang3.text.StrBuilder;
import org.apache.commons.lang3.text.WordUtils;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.GraphAnalytic;
import org.apache.flink.graph.asm.result.PrintableResult;
......@@ -42,8 +43,8 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
* @see org.apache.flink.graph.library.clustering.undirected.TriadicCensus
*/
public class TriangleListing<K extends Comparable<K> & CopyableValue<K>, VV, EV>
extends SimpleDriver<PrintableResult>
implements Driver<K, VV, EV>, CSV, Hash, Print {
extends SimpleDriver<K, VV, EV, PrintableResult>
implements CSV, Hash, Print {
private static final String DIRECTED = "directed";
......@@ -83,35 +84,40 @@ implements Driver<K, VV, EV>, CSV, Hash, Print {
}
@Override
public void plan(Graph<K, VV, EV> graph) throws Exception {
protected DataSet<PrintableResult> simplePlan(Graph<K, VV, EV> graph) throws Exception {
int lp = littleParallelism.getValue().intValue();
switch (order.getValue()) {
case DIRECTED:
result = graph
.run(new org.apache.flink.graph.library.clustering.directed.TriangleListing<K, VV, EV>()
.setSortTriangleVertices(sortTriangleVertices.getValue())
.setLittleParallelism(lp));
if (computeTriadicCensus.getValue()) {
triadicCensus = graph
.run(new org.apache.flink.graph.library.clustering.directed.TriadicCensus<K, VV, EV>()
.setLittleParallelism(lp));
}
break;
case UNDIRECTED:
result = graph
.run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing<K, VV, EV>()
@SuppressWarnings("unchecked")
DataSet<PrintableResult> directedResult = (DataSet<PrintableResult>) (DataSet<?>) graph
.run(new org.apache.flink.graph.library.clustering.directed.TriangleListing<K, VV, EV>()
.setSortTriangleVertices(sortTriangleVertices.getValue())
.setLittleParallelism(lp));
return directedResult;
case UNDIRECTED:
if (computeTriadicCensus.getValue()) {
triadicCensus = graph
.run(new org.apache.flink.graph.library.clustering.undirected.TriadicCensus<K, VV, EV>()
.setLittleParallelism(lp));
}
break;
@SuppressWarnings("unchecked")
DataSet<PrintableResult> undirectedResult = (DataSet<PrintableResult>) (DataSet<?>) graph
.run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing<K, VV, EV>()
.setSortTriangleVertices(sortTriangleVertices.getValue())
.setLittleParallelism(lp));
return undirectedResult;
default:
throw new RuntimeException("Unknown order: " + order);
}
}
......
......@@ -96,6 +96,9 @@ implements Input<K, NullValue, NullValue> {
.setMinimumValue(0.0, true)
.setMaximumValue(2.0, true);
private LongParameter seed = new LongParameter(this, "seed")
.setDefaultValue(JDKRandomGeneratorFactory.DEFAULT_SEED);
private Simplify simplify = new Simplify(this);
private LongParameter littleParallelism = new LongParameter(this, "little_parallelism")
......
......@@ -21,11 +21,11 @@ package org.apache.flink.graph.asm.degree.annotate.directed;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.EdgeOrder;
......@@ -118,7 +118,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, Degrees>> {
public DataSet<Vertex<K, Degrees>> runInternal(Graph<K, VV, EV> input)
throws Exception {
// s, t, bitmask
DataSet<Tuple3<K, K, ByteValue>> edgesWithOrder = input.getEdges()
DataSet<Tuple2<K, ByteValue>> vertexWithEdgeOrder = input.getEdges()
.flatMap(new EmitAndFlipEdge<K, EV>())
.setParallelism(parallelism)
.name("Emit and flip edge")
......@@ -128,9 +128,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, Degrees>> {
.name("Reduce bitmask");
// s, d(s)
DataSet<Vertex<K, Degrees>> vertexDegrees = edgesWithOrder
DataSet<Vertex<K, Degrees>> vertexDegrees = vertexWithEdgeOrder
.groupBy(0)
.sortGroup(1, Order.ASCENDING)
.reduceGroup(new DegreeCount<K>())
.setParallelism(parallelism)
.name("Degree count");
......@@ -178,22 +177,22 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, Degrees>> {
*
* @param <T> ID type
*/
@ForwardedFields("0; 1")
@ForwardedFields("0")
private static final class ReduceBitmask<T>
implements GroupReduceFunction<Tuple3<T, T, ByteValue>, Tuple3<T, T, ByteValue>> {
implements GroupReduceFunction<Tuple3<T, T, ByteValue>, Tuple2<T, ByteValue>> {
private Tuple2<T, ByteValue> output = new Tuple2<>(null, new ByteValue());
@Override
public void reduce(Iterable<Tuple3<T, T, ByteValue>> values, Collector<Tuple3<T, T, ByteValue>> out)
public void reduce(Iterable<Tuple3<T, T, ByteValue>> values, Collector<Tuple2<T, ByteValue>> out)
throws Exception {
Tuple3<T, T, ByteValue> output = null;
byte bitmask = 0;
for (Tuple3<T, T, ByteValue> value: values) {
output = value;
output.f0 = value.f0;
bitmask |= value.f2.getValue();
}
output.f2.setValue(bitmask);
output.f1.setValue(bitmask);
out.collect(output);
}
}
......@@ -203,21 +202,22 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, Degrees>> {
*
* @param <T> ID type
*/
@ForwardedFields("0")
private static class DegreeCount<T>
implements GroupReduceFunction<Tuple3<T, T, ByteValue>, Vertex<T, Degrees>> {
implements GroupReduceFunction<Tuple2<T, ByteValue>, Vertex<T, Degrees>> {
private Vertex<T, Degrees> output = new Vertex<>(null, new Degrees());
@Override
public void reduce(Iterable<Tuple3<T, T, ByteValue>> values, Collector<Vertex<T, Degrees>> out)
public void reduce(Iterable<Tuple2<T, ByteValue>> values, Collector<Vertex<T, Degrees>> out)
throws Exception {
long degree = 0;
long outDegree = 0;
long inDegree = 0;
for (Tuple3<T, T, ByteValue> edge : values) {
for (Tuple2<T, ByteValue> edge : values) {
output.f0 = edge.f0;
byte bitmask = edge.f2.getValue();
byte bitmask = edge.f1.getValue();
degree++;
......
......@@ -25,13 +25,15 @@ import org.apache.flink.types.DoubleValue;
class Functions {
private Functions() {}
/**
* Sum vertices' scores.
*
* @param <T> ID type
*/
@ForwardedFields("0")
static class SumScore<T>
protected static final class SumScore<T>
implements ReduceFunction<Tuple2<T, DoubleValue>> {
@Override
public Tuple2<T, DoubleValue> reduce(Tuple2<T, DoubleValue> left, Tuple2<T, DoubleValue> right)
......
......@@ -45,6 +45,7 @@ import org.apache.flink.graph.asm.result.UnaryResult;
import org.apache.flink.graph.library.link_analysis.Functions.SumScore;
import org.apache.flink.graph.library.link_analysis.PageRank.Result;
import org.apache.flink.graph.utils.GraphUtils;
import org.apache.flink.graph.utils.GraphUtils.IdentityMapper;
import org.apache.flink.graph.utils.Murmur3_32;
import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
import org.apache.flink.types.DoubleValue;
......@@ -175,6 +176,10 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
.run(new VertexDegrees<K, VV, EV>()
.setParallelism(parallelism));
// prevent Exception "The dam has been closed." in TempBarrier
// for a simplified Graph as in PageRankITCase (see FLINK-5623)
vertexDegree = vertexDegree.map(new IdentityMapper<Vertex<K, Degrees>>());
// vertex count
DataSet<LongValue> vertexCount = GraphUtils.count(vertexDegree);
......
......@@ -78,7 +78,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
private int maximumScoreNumerator = 1;
private int maximumScoreDenominator = 0;
private int maximumScoreDenominator = 1;
private int littleParallelism = PARALLELISM_DEFAULT;
......@@ -121,7 +121,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
}
/**
* Filter out Jaccard Index scores greater than or equal to the given maximum fraction.
* Filter out Jaccard Index scores greater than the given maximum fraction.
*
* @param numerator numerator of the maximum score
* @param denominator denominator of the maximum score
......@@ -253,6 +253,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
* number of groups and {@link GenerateGroups} emits each edge into each group.
*
* @param <T> ID type
* @param <ET> edge value type
*/
@ForwardedFields("0->1; 1->2")
private static class GenerateGroupSpans<T, ET>
......@@ -439,7 +440,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
if (unboundedScores ||
(count * minimumScoreDenominator >= distinctNeighbors * minimumScoreNumerator
&& count * maximumScoreDenominator < distinctNeighbors * maximumScoreNumerator)) {
&& count * maximumScoreDenominator <= distinctNeighbors * maximumScoreNumerator)) {
output.f0 = edge.f0;
output.f1 = edge.f1;
output.f2.setValue(count);
......
......@@ -30,6 +30,7 @@ import org.apache.flink.types.LongValue;
import org.apache.flink.types.NullValue;
import org.apache.flink.types.StringValue;
import org.apache.flink.types.Value;
import org.apache.flink.util.Preconditions;
import java.util.HashMap;
import java.util.Map;
......@@ -54,7 +55,7 @@ public class ValueArrayTypeInfo<T> extends TypeInformation<ValueArray<T>> implem
public ValueArrayTypeInfo(TypeInformation<T> valueType) {
this.valueType = valueType;
this.type = valueType.getTypeClass();
this.type = valueType == null ? null : valueType.getTypeClass();
}
@Override
......@@ -85,12 +86,16 @@ public class ValueArrayTypeInfo<T> extends TypeInformation<ValueArray<T>> implem
@Override
public boolean isKeyType() {
Preconditions.checkNotNull(type, "TypeInformation type class is required");
return Comparable.class.isAssignableFrom(type);
}
@Override
@SuppressWarnings("unchecked")
public TypeSerializer<ValueArray<T>> createSerializer(ExecutionConfig executionConfig) {
Preconditions.checkNotNull(type, "TypeInformation type class is required");
if (IntValue.class.isAssignableFrom(type)) {
return (TypeSerializer<ValueArray<T>>) (TypeSerializer<?>) new IntValueArraySerializer();
} else if (LongValue.class.isAssignableFrom(type)) {
......@@ -107,6 +112,8 @@ public class ValueArrayTypeInfo<T> extends TypeInformation<ValueArray<T>> implem
@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
public TypeComparator<ValueArray<T>> createComparator(boolean sortOrderAscending, ExecutionConfig executionConfig) {
Preconditions.checkNotNull(type, "TypeInformation type class is required");
if (IntValue.class.isAssignableFrom(type)) {
return (TypeComparator<ValueArray<T>>) (TypeComparator<?>) new IntValueArrayComparator(sortOrderAscending);
} else if (LongValue.class.isAssignableFrom(type)) {
......@@ -131,6 +138,8 @@ public class ValueArrayTypeInfo<T> extends TypeInformation<ValueArray<T>> implem
@Override
public int hashCode() {
Preconditions.checkNotNull(type, "TypeInformation type class is required");
return type.hashCode();
}
......@@ -154,6 +163,8 @@ public class ValueArrayTypeInfo<T> extends TypeInformation<ValueArray<T>> implem
@Override
public String toString() {
Preconditions.checkNotNull(type, "TypeInformation type class is required");
return "ValueArrayType<" + type.getSimpleName() + ">";
}
}
......@@ -36,6 +36,9 @@ public class ValueArrayTypeInfoFactory<T> extends TypeInfoFactory<ValueArray<T>>
@Override
public TypeInformation<ValueArray<T>> createTypeInfo(Type t, Map<String, TypeInformation<?>> genericParameters) {
return new ValueArrayTypeInfo(genericParameters.get("T"));
@SuppressWarnings("unchecked")
TypeInformation<ValueArray<T>> typeInfo = new ValueArrayTypeInfo(genericParameters.get("T"));
return typeInfo;
}
}
......@@ -22,8 +22,10 @@ import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.graph.asm.translate.TranslateFunction;
import org.apache.flink.types.LongValue;
import static org.apache.flink.api.java.typeutils.ValueTypeInfo.LONG_VALUE_TYPE_INFO;
......@@ -51,6 +53,7 @@ public class GraphUtils {
*
* @param <T> element type
*/
@ForwardedFields("*")
public static final class IdentityMapper<T>
implements MapFunction<T, T> {
public T map(T value) {
......@@ -58,6 +61,20 @@ public class GraphUtils {
}
}
/**
* The identity mapper returns the input as output.
*
* This does not forward fields and is used to break an operator chain.
*
* @param <T> element type
*/
public static final class NonForwardingIdentityMapper<T>
implements MapFunction<T, T> {
public T map(T value) {
return value;
}
}
/**
* Map each element to a value.
*
......@@ -65,7 +82,7 @@ public class GraphUtils {
* @param <O> output type
*/
public static class MapTo<I, O>
implements MapFunction<I, O>, ResultTypeQueryable<O> {
implements MapFunction<I, O>, ResultTypeQueryable<O>, TranslateFunction<I, O> {
private final O value;
/**
......@@ -78,7 +95,13 @@ public class GraphUtils {
}
@Override
public O map(I o) throws Exception {
public O map(I input) throws Exception {
return value;
}
@Override
public O translate(I input, O reuse)
throws Exception {
return value;
}
......
......@@ -81,6 +81,8 @@ extends AsmTestBase {
String expectedResult =
"(0,1,1,4)\n" +
"(0,2,1,4)\n" +
"(0,3,2,4)\n" +
"(1,2,2,4)\n" +
"(1,3,1,6)\n" +
"(1,4,1,3)\n" +
"(1,5,1,3)\n" +
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册