提交 149e7a01 编写于 作者: G Greg Hogan

[FLINK-3965] [gelly] Delegating GraphAlgorithm

A delegating GraphAlgorithm wraps a GraphAlgorithm result with a
delegating proxy object. The delegated object can be replaced when the
same algorithm is run on the same input with a mergeable configuration.
This allows algorithms to be composed of implicitly reusable algorithms
without publicly sharing intermediate DataSets.

This closes #2032
上级 7ab6837f
......@@ -1749,7 +1749,7 @@ public abstract class DataSet<T> {
// --------------------------------------------------------------------------------------------
protected static void checkSameExecutionContext(DataSet<?> set1, DataSet<?> set2) {
if (set1.context != set2.context) {
if (set1.getExecutionEnvironment() != set2.getExecutionEnvironment()) {
throw new IllegalArgumentException("The two inputs have different execution contexts.");
}
}
......
......@@ -24,10 +24,11 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.GraphAlgorithm;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeDegreeWithVertexDegree;
import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
import org.apache.flink.util.Preconditions;
import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
......@@ -40,7 +41,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
* @param <EV> edge value type
*/
public class EdgeDegreesPair<K, VV, EV>
implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, Tuple3<EV, Degrees, Degrees>>>> {
extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Edge<K, Tuple3<EV, Degrees, Degrees>>> {
// Optional configuration
private int parallelism = PARALLELISM_DEFAULT;
......@@ -58,7 +59,27 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, Tuple3<EV, Degrees, Degrees
}
@Override
public DataSet<Edge<K, Tuple3<EV, Degrees, Degrees>>> run(Graph<K, VV, EV> input)
protected String getAlgorithmName() {
return EdgeDegreesPair.class.getName();
}
@Override
protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) {
Preconditions.checkNotNull(other);
if (! EdgeDegreesPair.class.isAssignableFrom(other.getClass())) {
return false;
}
EdgeDegreesPair rhs = (EdgeDegreesPair) other;
parallelism = Math.min(parallelism, rhs.parallelism);
return true;
}
@Override
public DataSet<Edge<K, Tuple3<EV, Degrees, Degrees>>> runInternal(Graph<K, VV, EV> input)
throws Exception {
// s, t, d(s)
DataSet<Edge<K, Tuple2<EV, Degrees>>> edgeSourceDegrees = input
......
......@@ -23,10 +23,11 @@ import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.GraphAlgorithm;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeWithVertexDegree;
import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
import org.apache.flink.util.Preconditions;
import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
......@@ -39,7 +40,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
* @param <EV> edge value type
*/
public class EdgeSourceDegrees<K, VV, EV>
implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, Tuple2<EV, Degrees>>>> {
extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Edge<K, Tuple2<EV, Degrees>>> {
// Optional configuration
private int parallelism = PARALLELISM_DEFAULT;
......@@ -57,7 +58,27 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, Tuple2<EV, Degrees>>>> {
}
@Override
public DataSet<Edge<K, Tuple2<EV, Degrees>>> run(Graph<K, VV, EV> input)
protected String getAlgorithmName() {
return EdgeSourceDegrees.class.getName();
}
@Override
protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) {
Preconditions.checkNotNull(other);
if (! EdgeSourceDegrees.class.isAssignableFrom(other.getClass())) {
return false;
}
EdgeSourceDegrees rhs = (EdgeSourceDegrees) other;
parallelism = Math.min(parallelism, rhs.parallelism);
return true;
}
@Override
public DataSet<Edge<K, Tuple2<EV, Degrees>>> runInternal(Graph<K, VV, EV> input)
throws Exception {
// s, d(s)
DataSet<Vertex<K, Degrees>> vertexDegrees = input
......
......@@ -23,10 +23,11 @@ import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.GraphAlgorithm;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeWithVertexDegree;
import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
import org.apache.flink.util.Preconditions;
import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
......@@ -39,7 +40,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
* @param <EV> edge value type
*/
public class EdgeTargetDegrees<K, VV, EV>
implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, Tuple2<EV, Degrees>>>> {
extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Edge<K, Tuple2<EV, Degrees>>> {
// Optional configuration
private int parallelism = PARALLELISM_DEFAULT;
......@@ -57,7 +58,27 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, Tuple2<EV, Degrees>>>> {
}
@Override
public DataSet<Edge<K, Tuple2<EV, Degrees>>> run(Graph<K, VV, EV> input)
protected String getAlgorithmName() {
return EdgeTargetDegrees.class.getName();
}
@Override
protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) {
Preconditions.checkNotNull(other);
if (! EdgeTargetDegrees.class.isAssignableFrom(other.getClass())) {
return false;
}
EdgeTargetDegrees rhs = (EdgeTargetDegrees) other;
parallelism = Math.min(parallelism, rhs.parallelism);
return true;
}
@Override
public DataSet<Edge<K, Tuple2<EV, Degrees>>> runInternal(Graph<K, VV, EV> input)
throws Exception {
// t, d(t)
DataSet<Vertex<K, Degrees>> vertexDegrees = input
......
......@@ -30,13 +30,15 @@ import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.EdgeOrder;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.GraphAlgorithm;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
import org.apache.flink.graph.utils.Murmur3_32;
import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
import org.apache.flink.graph.utils.proxy.OptionalBoolean;
import org.apache.flink.types.ByteValue;
import org.apache.flink.types.LongValue;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;
import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
......@@ -48,10 +50,10 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
* @param <EV> edge value type
*/
public class VertexDegrees<K, VV, EV>
implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, Degrees>>> {
extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Vertex<K, Degrees>> {
// Optional configuration
private boolean includeZeroDegreeVertices = false;
private OptionalBoolean includeZeroDegreeVertices = new OptionalBoolean(false, true);
private int parallelism = PARALLELISM_DEFAULT;
......@@ -65,7 +67,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, Degrees>>> {
* @return this
*/
public VertexDegrees<K, VV, EV> setIncludeZeroDegreeVertices(boolean includeZeroDegreeVertices) {
this.includeZeroDegreeVertices = includeZeroDegreeVertices;
this.includeZeroDegreeVertices.set(includeZeroDegreeVertices);
return this;
}
......@@ -83,7 +85,36 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, Degrees>>> {
}
@Override
public DataSet<Vertex<K, Degrees>> run(Graph<K, VV, EV> input)
protected String getAlgorithmName() {
return VertexOutDegree.class.getName();
}
@Override
protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) {
Preconditions.checkNotNull(other);
if (! VertexDegrees.class.isAssignableFrom(other.getClass())) {
return false;
}
VertexDegrees rhs = (VertexDegrees) other;
// verify that configurations can be merged
if (includeZeroDegreeVertices.conflictsWith(rhs.includeZeroDegreeVertices)) {
return false;
}
// merge configurations
includeZeroDegreeVertices.mergeWith(rhs.includeZeroDegreeVertices);
parallelism = Math.min(parallelism, rhs.parallelism);
return true;
}
@Override
public DataSet<Vertex<K, Degrees>> runInternal(Graph<K, VV, EV> input)
throws Exception {
// s, t, bitmask
DataSet<Tuple3<K, K, ByteValue>> edgesWithOrder = input.getEdges()
......@@ -103,7 +134,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, Degrees>>> {
.setParallelism(parallelism)
.name("Degree count");
if (includeZeroDegreeVertices) {
if (includeZeroDegreeVertices.get()) {
vertexDegrees = input.getVertices()
.leftOuterJoin(vertexDegrees)
.where(0)
......
......@@ -20,11 +20,12 @@ package org.apache.flink.graph.asm.degree.annotate.directed;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.GraphAlgorithm;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.DegreeCount;
import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinVertexWithVertexDegree;
import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.MapEdgeToTargetId;
import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
import org.apache.flink.graph.utils.proxy.OptionalBoolean;
import org.apache.flink.types.LongValue;
import org.apache.flink.util.Preconditions;
......@@ -38,10 +39,10 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
* @param <EV> edge value type
*/
public class VertexInDegree<K, VV, EV>
implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, LongValue>>> {
extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Vertex<K, LongValue>> {
// Optional configuration
private boolean includeZeroDegreeVertices = false;
private OptionalBoolean includeZeroDegreeVertices = new OptionalBoolean(false, true);
private int parallelism = PARALLELISM_DEFAULT;
......@@ -55,7 +56,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, LongValue>>> {
* @return this
*/
public VertexInDegree<K, VV, EV> setIncludeZeroDegreeVertices(boolean includeZeroDegreeVertices) {
this.includeZeroDegreeVertices = includeZeroDegreeVertices;
this.includeZeroDegreeVertices.set(includeZeroDegreeVertices);
return this;
}
......@@ -76,7 +77,36 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, LongValue>>> {
}
@Override
public DataSet<Vertex<K, LongValue>> run(Graph<K, VV, EV> input)
protected String getAlgorithmName() {
return VertexInDegree.class.getName();
}
@Override
protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) {
Preconditions.checkNotNull(other);
if (! VertexInDegree.class.isAssignableFrom(other.getClass())) {
return false;
}
VertexInDegree rhs = (VertexInDegree) other;
// verify that configurations can be merged
if (includeZeroDegreeVertices.conflictsWith(rhs.includeZeroDegreeVertices)) {
return false;
}
// merge configurations
includeZeroDegreeVertices.mergeWith(rhs.includeZeroDegreeVertices);
parallelism = Math.min(parallelism, rhs.parallelism);
return true;
}
@Override
public DataSet<Vertex<K, LongValue>> runInternal(Graph<K, VV, EV> input)
throws Exception {
// t
DataSet<Vertex<K, LongValue>> targetIds = input
......@@ -92,7 +122,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, LongValue>>> {
.setParallelism(parallelism)
.name("Degree count");
if (includeZeroDegreeVertices) {
if (includeZeroDegreeVertices.get()) {
targetDegree = input.getVertices()
.leftOuterJoin(targetDegree)
.where(0)
......
......@@ -20,11 +20,12 @@ package org.apache.flink.graph.asm.degree.annotate.directed;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.GraphAlgorithm;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.DegreeCount;
import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinVertexWithVertexDegree;
import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.MapEdgeToSourceId;
import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
import org.apache.flink.graph.utils.proxy.OptionalBoolean;
import org.apache.flink.types.LongValue;
import org.apache.flink.util.Preconditions;
......@@ -38,10 +39,10 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
* @param <EV> edge value type
*/
public class VertexOutDegree<K, VV, EV>
implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, LongValue>>> {
extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Vertex<K, LongValue>> {
// Optional configuration
private boolean includeZeroDegreeVertices = false;
private OptionalBoolean includeZeroDegreeVertices = new OptionalBoolean(false, true);
private int parallelism = PARALLELISM_DEFAULT;
......@@ -55,7 +56,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, LongValue>>> {
* @return this
*/
public VertexOutDegree<K, VV, EV> setIncludeZeroDegreeVertices(boolean includeZeroDegreeVertices) {
this.includeZeroDegreeVertices = includeZeroDegreeVertices;
this.includeZeroDegreeVertices.set(includeZeroDegreeVertices);
return this;
}
......@@ -76,7 +77,36 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, LongValue>>> {
}
@Override
public DataSet<Vertex<K, LongValue>> run(Graph<K, VV, EV> input)
protected String getAlgorithmName() {
return VertexOutDegree.class.getName();
}
@Override
protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) {
Preconditions.checkNotNull(other);
if (! VertexOutDegree.class.isAssignableFrom(other.getClass())) {
return false;
}
VertexOutDegree rhs = (VertexOutDegree) other;
// verify that configurations can be merged
if (includeZeroDegreeVertices.conflictsWith(rhs.includeZeroDegreeVertices)) {
return false;
}
// merge configurations
includeZeroDegreeVertices.mergeWith(rhs.includeZeroDegreeVertices);
parallelism = Math.min(parallelism, rhs.parallelism);
return true;
}
@Override
public DataSet<Vertex<K, LongValue>> runInternal(Graph<K, VV, EV> input)
throws Exception {
// s
DataSet<Vertex<K, LongValue>> sourceIds = input
......@@ -92,7 +122,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, LongValue>>> {
.setParallelism(parallelism)
.name("Degree count");
if (includeZeroDegreeVertices) {
if (includeZeroDegreeVertices.get()) {
sourceDegree = input.getVertices()
.leftOuterJoin(sourceDegree)
.where(0)
......
......@@ -24,9 +24,10 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.GraphAlgorithm;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeDegreeWithVertexDegree;
import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
import org.apache.flink.graph.utils.proxy.OptionalBoolean;
import org.apache.flink.types.LongValue;
import org.apache.flink.util.Preconditions;
......@@ -41,10 +42,10 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
* @param <EV> edge value type
*/
public class EdgeDegreePair<K, VV, EV>
implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, Tuple3<EV, LongValue, LongValue>>>> {
extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Edge<K, Tuple3<EV, LongValue, LongValue>>> {
// Optional configuration
protected boolean reduceOnTargetId = false;
private OptionalBoolean reduceOnTargetId = new OptionalBoolean(false, false);
private int parallelism = PARALLELISM_DEFAULT;
......@@ -58,7 +59,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, Tuple3<EV, LongValue, LongV
* @return this
*/
public EdgeDegreePair<K, VV, EV> setReduceOnTargetId(boolean reduceOnTargetId) {
this.reduceOnTargetId = reduceOnTargetId;
this.reduceOnTargetId.set(reduceOnTargetId);
return this;
}
......@@ -79,18 +80,39 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, Tuple3<EV, LongValue, LongV
}
@Override
public DataSet<Edge<K, Tuple3<EV, LongValue, LongValue>>> run(Graph<K, VV, EV> input)
protected String getAlgorithmName() {
return EdgeDegreePair.class.getName();
}
@Override
protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) {
Preconditions.checkNotNull(other);
if (! EdgeSourceDegree.class.isAssignableFrom(other.getClass())) {
return false;
}
EdgeDegreePair rhs = (EdgeDegreePair) other;
reduceOnTargetId.mergeWith(rhs.reduceOnTargetId);
parallelism = Math.min(parallelism, rhs.parallelism);
return true;
}
@Override
public DataSet<Edge<K, Tuple3<EV, LongValue, LongValue>>> runInternal(Graph<K, VV, EV> input)
throws Exception {
// s, t, d(s)
DataSet<Edge<K, Tuple2<EV, LongValue>>> edgeSourceDegrees = input
.run(new EdgeSourceDegree<K, VV, EV>()
.setReduceOnTargetId(reduceOnTargetId)
.setReduceOnTargetId(reduceOnTargetId.get())
.setParallelism(parallelism));
// t, d(t)
DataSet<Vertex<K, LongValue>> vertexDegrees = input
.run(new VertexDegree<K, VV, EV>()
.setReduceOnTargetId(reduceOnTargetId)
.setReduceOnTargetId(reduceOnTargetId.get())
.setParallelism(parallelism));
// s, t, (d(s), d(t))
......
......@@ -23,9 +23,10 @@ import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.GraphAlgorithm;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeWithVertexDegree;
import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
import org.apache.flink.graph.utils.proxy.OptionalBoolean;
import org.apache.flink.types.LongValue;
import org.apache.flink.util.Preconditions;
......@@ -39,10 +40,10 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
* @param <EV> edge value type
*/
public class EdgeSourceDegree<K, VV, EV>
implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, Tuple2<EV, LongValue>>>> {
extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Edge<K, Tuple2<EV, LongValue>>> {
// Optional configuration
private boolean reduceOnTargetId = false;
private OptionalBoolean reduceOnTargetId = new OptionalBoolean(false, false);
private int parallelism = PARALLELISM_DEFAULT;
......@@ -56,7 +57,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, Tuple2<EV, LongValue>>>> {
* @return this
*/
public EdgeSourceDegree<K, VV, EV> setReduceOnTargetId(boolean reduceOnTargetId) {
this.reduceOnTargetId = reduceOnTargetId;
this.reduceOnTargetId.set(reduceOnTargetId);
return this;
}
......@@ -77,12 +78,33 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, Tuple2<EV, LongValue>>>> {
}
@Override
public DataSet<Edge<K, Tuple2<EV, LongValue>>> run(Graph<K, VV, EV> input)
protected String getAlgorithmName() {
return EdgeSourceDegree.class.getName();
}
@Override
protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) {
Preconditions.checkNotNull(other);
if (! EdgeSourceDegree.class.isAssignableFrom(other.getClass())) {
return false;
}
EdgeSourceDegree rhs = (EdgeSourceDegree) other;
reduceOnTargetId.mergeWith(rhs.reduceOnTargetId);
parallelism = Math.min(parallelism, rhs.parallelism);
return true;
}
@Override
public DataSet<Edge<K, Tuple2<EV, LongValue>>> runInternal(Graph<K, VV, EV> input)
throws Exception {
// s, d(s)
DataSet<Vertex<K, LongValue>> vertexDegrees = input
.run(new VertexDegree<K, VV, EV>()
.setReduceOnTargetId(reduceOnTargetId)
.setReduceOnTargetId(reduceOnTargetId.get())
.setParallelism(parallelism));
// s, t, d(s)
......
......@@ -23,9 +23,10 @@ import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.GraphAlgorithm;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeWithVertexDegree;
import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
import org.apache.flink.graph.utils.proxy.OptionalBoolean;
import org.apache.flink.types.LongValue;
import org.apache.flink.util.Preconditions;
......@@ -39,10 +40,10 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
* @param <EV> edge value type
*/
public class EdgeTargetDegree<K, VV, EV>
implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, Tuple2<EV, LongValue>>>> {
extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Edge<K, Tuple2<EV, LongValue>>> {
// Optional configuration
private boolean reduceOnSourceId = false;
private OptionalBoolean reduceOnSourceId = new OptionalBoolean(false, false);
private int parallelism = PARALLELISM_DEFAULT;
......@@ -56,7 +57,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, Tuple2<EV, LongValue>>>> {
* @return this
*/
public EdgeTargetDegree<K, VV, EV> setReduceOnSourceId(boolean reduceOnSourceId) {
this.reduceOnSourceId = reduceOnSourceId;
this.reduceOnSourceId.set(reduceOnSourceId);
return this;
}
......@@ -77,12 +78,33 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, Tuple2<EV, LongValue>>>> {
}
@Override
public DataSet<Edge<K, Tuple2<EV, LongValue>>> run(Graph<K, VV, EV> input)
protected String getAlgorithmName() {
return EdgeTargetDegree.class.getName();
}
@Override
protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) {
Preconditions.checkNotNull(other);
if (! EdgeSourceDegree.class.isAssignableFrom(other.getClass())) {
return false;
}
EdgeTargetDegree rhs = (EdgeTargetDegree) other;
reduceOnSourceId.mergeWith(rhs.reduceOnSourceId);
parallelism = Math.min(parallelism, rhs.parallelism);
return true;
}
@Override
public DataSet<Edge<K, Tuple2<EV, LongValue>>> runInternal(Graph<K, VV, EV> input)
throws Exception {
// t, d(t)
DataSet<Vertex<K, LongValue>> vertexDegrees = input
.run(new VertexDegree<K, VV, EV>()
.setReduceOnTargetId(!reduceOnSourceId)
.setReduceOnTargetId(!reduceOnSourceId.get())
.setParallelism(parallelism));
// s, t, d(t)
......
......@@ -20,10 +20,11 @@ package org.apache.flink.graph.asm.degree.annotate.undirected;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.GraphAlgorithm;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.utils.proxy.OptionalBoolean;
import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.DegreeCount;
import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinVertexWithVertexDegree;
import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.MapEdgeToSourceId;
......@@ -41,12 +42,12 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
* @param <EV> edge value type
*/
public class VertexDegree<K, VV, EV>
implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, LongValue>>> {
extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Vertex<K, LongValue>> {
// Optional configuration
private boolean includeZeroDegreeVertices = false;
private OptionalBoolean includeZeroDegreeVertices = new OptionalBoolean(false, true);
private boolean reduceOnTargetId = false;
private OptionalBoolean reduceOnTargetId = new OptionalBoolean(false, false);
private int parallelism = PARALLELISM_DEFAULT;
......@@ -60,7 +61,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, LongValue>>> {
* @return this
*/
public VertexDegree<K, VV, EV> setIncludeZeroDegreeVertices(boolean includeZeroDegreeVertices) {
this.includeZeroDegreeVertices = includeZeroDegreeVertices;
this.includeZeroDegreeVertices.set(includeZeroDegreeVertices);
return this;
}
......@@ -75,7 +76,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, LongValue>>> {
* @return this
*/
public VertexDegree<K, VV, EV> setReduceOnTargetId(boolean reduceOnTargetId) {
this.reduceOnTargetId = reduceOnTargetId;
this.reduceOnTargetId.set(reduceOnTargetId);
return this;
}
......@@ -96,9 +97,39 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, LongValue>>> {
}
@Override
public DataSet<Vertex<K, LongValue>> run(Graph<K, VV, EV> input)
protected String getAlgorithmName() {
return VertexDegree.class.getName();
}
@Override
protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) {
Preconditions.checkNotNull(other);
if (! VertexDegree.class.isAssignableFrom(other.getClass())) {
return false;
}
VertexDegree rhs = (VertexDegree) other;
// verify that configurations can be merged
if (includeZeroDegreeVertices.conflictsWith(rhs.includeZeroDegreeVertices)) {
return false;
}
// merge configurations
includeZeroDegreeVertices.mergeWith(rhs.includeZeroDegreeVertices);
reduceOnTargetId.mergeWith(rhs.reduceOnTargetId);
parallelism = Math.min(parallelism, rhs.parallelism);
return true;
}
@Override
public DataSet<Vertex<K, LongValue>> runInternal(Graph<K, VV, EV> input)
throws Exception {
MapFunction<Edge<K, EV>, Vertex<K, LongValue>> mapEdgeToId = reduceOnTargetId ?
MapFunction<Edge<K, EV>, Vertex<K, LongValue>> mapEdgeToId = reduceOnTargetId.get() ?
new MapEdgeToTargetId<K, EV>() : new MapEdgeToSourceId<K, EV>();
// v
......@@ -115,8 +146,9 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, LongValue>>> {
.setParallelism(parallelism)
.name("Degree count");
if (includeZeroDegreeVertices) {
degree = input.getVertices()
if (includeZeroDegreeVertices.get()) {
degree = input
.getVertices()
.leftOuterJoin(degree)
.where(0)
.equalTo(0)
......
......@@ -27,9 +27,10 @@ import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFir
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.GraphAlgorithm;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.asm.degree.annotate.undirected.VertexDegree;
import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingGraph;
import org.apache.flink.graph.utils.proxy.OptionalBoolean;
import org.apache.flink.types.LongValue;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;
......@@ -46,15 +47,15 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
* @param <EV> edge value type
*/
public class MaximumDegree<K, VV, EV>
implements GraphAlgorithm<K, VV, EV, Graph<K, VV, EV>> {
extends GraphAlgorithmDelegatingGraph<K, VV, EV, K, VV, EV> {
// Required configuration
private long maximumDegree;
// Optional configuration
private boolean reduceOnTargetId = false;
private OptionalBoolean reduceOnTargetId = new OptionalBoolean(false, false);
private boolean broadcastHighDegreeVertices = false;
private OptionalBoolean broadcastHighDegreeVertices = new OptionalBoolean(false, false);
private int parallelism = PARALLELISM_DEFAULT;
......@@ -79,7 +80,7 @@ implements GraphAlgorithm<K, VV, EV, Graph<K, VV, EV>> {
* @return this
*/
public MaximumDegree<K, VV, EV> setReduceOnTargetId(boolean reduceOnTargetId) {
this.reduceOnTargetId = reduceOnTargetId;
this.reduceOnTargetId.set(reduceOnTargetId);
return this;
}
......@@ -96,7 +97,7 @@ implements GraphAlgorithm<K, VV, EV, Graph<K, VV, EV>> {
* @return this
*/
public MaximumDegree<K, VV, EV> setBroadcastHighDegreeVertices(boolean broadcastHighDegreeVertices) {
this.broadcastHighDegreeVertices = broadcastHighDegreeVertices;
this.broadcastHighDegreeVertices.set(broadcastHighDegreeVertices);
return this;
}
......@@ -113,6 +114,36 @@ implements GraphAlgorithm<K, VV, EV, Graph<K, VV, EV>> {
return this;
}
@Override
protected String getAlgorithmName() {
return MaximumDegree.class.getName();
}
@Override
protected boolean mergeConfiguration(GraphAlgorithmDelegatingGraph other) {
Preconditions.checkNotNull(other);
if (! MaximumDegree.class.isAssignableFrom(other.getClass())) {
return false;
}
MaximumDegree rhs = (MaximumDegree) other;
// verify that configurations can be merged
if (maximumDegree != rhs.maximumDegree) {
return false;
}
// merge configurations
reduceOnTargetId.mergeWith(rhs.reduceOnTargetId);
broadcastHighDegreeVertices.mergeWith(rhs.broadcastHighDegreeVertices);
parallelism = Math.min(parallelism, rhs.parallelism);
return true;
}
/*
* Implementation notes:
*
......@@ -121,12 +152,12 @@ implements GraphAlgorithm<K, VV, EV, Graph<K, VV, EV>> {
*/
@Override
public Graph<K, VV, EV> run(Graph<K, VV, EV> input)
public Graph<K, VV, EV> runInternal(Graph<K, VV, EV> input)
throws Exception {
// u, d(u)
DataSet<Vertex<K, LongValue>> vertexDegree = input
.run(new VertexDegree<K, VV, EV>()
.setReduceOnTargetId(reduceOnTargetId)
.setReduceOnTargetId(reduceOnTargetId.get())
.setParallelism(parallelism));
// u, d(u) if d(u) > maximumDegree
......@@ -135,7 +166,7 @@ implements GraphAlgorithm<K, VV, EV, Graph<K, VV, EV>> {
.setParallelism(parallelism)
.name("Filter high-degree vertices");
JoinHint joinHint = broadcastHighDegreeVertices ? JoinHint.BROADCAST_HASH_SECOND : JoinHint.REPARTITION_HASH_SECOND;
JoinHint joinHint = broadcastHighDegreeVertices.get() ? JoinHint.BROADCAST_HASH_SECOND : JoinHint.REPARTITION_HASH_SECOND;
// Vertices
DataSet<Vertex<K, VV>> vertices = input
......@@ -151,17 +182,17 @@ implements GraphAlgorithm<K, VV, EV, Graph<K, VV, EV>> {
DataSet<Edge<K, EV>> edges = input
.getEdges()
.leftOuterJoin(highDegreeVertices, joinHint)
.where(reduceOnTargetId ? 1 : 0)
.where(reduceOnTargetId.get() ? 1 : 0)
.equalTo(0)
.with(new ProjectEdge<K, EV>())
.setParallelism(parallelism)
.name("Project low-degree edges by " + (reduceOnTargetId ? "target" : "source"))
.name("Project low-degree edges by " + (reduceOnTargetId.get() ? "target" : "source"))
.leftOuterJoin(highDegreeVertices, joinHint)
.where(reduceOnTargetId ? 0 : 1)
.where(reduceOnTargetId.get() ? 0 : 1)
.equalTo(0)
.with(new ProjectEdge<K, EV>())
.setParallelism(parallelism)
.name("Project low-degree edges by " + (reduceOnTargetId ? "source" : "target"));
.name("Project low-degree edges by " + (reduceOnTargetId.get() ? "source" : "target"));
// Graph
return Graph.fromDataSet(vertices, edges, input.getContext());
......
......@@ -24,7 +24,7 @@ import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.GraphAlgorithm;
import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingGraph;
import org.apache.flink.types.CopyableValue;
import org.apache.flink.util.Preconditions;
......@@ -38,7 +38,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
* @param <EV> edge value type
*/
public class Simplify<K extends Comparable<K> & CopyableValue<K>, VV, EV>
implements GraphAlgorithm<K, VV, EV, Graph<K, VV, EV>> {
extends GraphAlgorithmDelegatingGraph<K, VV, EV, K, VV, EV> {
// Optional configuration
private int parallelism = PARALLELISM_DEFAULT;
......@@ -59,7 +59,27 @@ implements GraphAlgorithm<K, VV, EV, Graph<K, VV, EV>> {
}
@Override
public Graph<K, VV, EV> run(Graph<K, VV, EV> input)
protected String getAlgorithmName() {
return Simplify.class.getName();
}
@Override
protected boolean mergeConfiguration(GraphAlgorithmDelegatingGraph other) {
Preconditions.checkNotNull(other);
if (! Simplify.class.isAssignableFrom(other.getClass())) {
return false;
}
Simplify rhs = (Simplify) other;
parallelism = Math.min(parallelism, rhs.parallelism);
return true;
}
@Override
public Graph<K, VV, EV> runInternal(Graph<K, VV, EV> input)
throws Exception {
// Edges
DataSet<Edge<K, EV>> edges = input
......
......@@ -24,7 +24,7 @@ import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.GraphAlgorithm;
import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingGraph;
import org.apache.flink.types.CopyableValue;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;
......@@ -40,7 +40,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
* @param <EV> edge value type
*/
public class Simplify<K extends Comparable<K> & CopyableValue<K>, VV, EV>
implements GraphAlgorithm<K, VV, EV, Graph<K, VV, EV>> {
extends GraphAlgorithmDelegatingGraph<K, VV, EV, K, VV, EV> {
// Required configuration
private boolean clipAndFlip;
......@@ -77,7 +77,35 @@ implements GraphAlgorithm<K, VV, EV, Graph<K, VV, EV>> {
}
@Override
public Graph<K, VV, EV> run(Graph<K, VV, EV> input)
protected String getAlgorithmName() {
return Simplify.class.getName();
}
@Override
protected boolean mergeConfiguration(GraphAlgorithmDelegatingGraph other) {
Preconditions.checkNotNull(other);
if (! Simplify.class.isAssignableFrom(other.getClass())) {
return false;
}
Simplify rhs = (Simplify) other;
// verify that configurations can be merged
if (clipAndFlip != rhs.clipAndFlip) {
return false;
}
// merge configurations
parallelism = Math.min(parallelism, rhs.parallelism);
return true;
}
@Override
public Graph<K, VV, EV> runInternal(Graph<K, VV, EV> input)
throws Exception {
// Edges
DataSet<Edge<K, EV>> edges = input
......
......@@ -21,7 +21,7 @@ package org.apache.flink.graph.asm.translate;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.GraphAlgorithm;
import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingGraph;
import org.apache.flink.util.Preconditions;
import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
......@@ -36,7 +36,7 @@ import static org.apache.flink.graph.asm.translate.Translate.translateEdgeValues
* @param <NEW> new edge value type
*/
public class TranslateEdgeValues<K, VV, OLD, NEW>
implements GraphAlgorithm<K, VV, OLD, Graph<K, VV, NEW>> {
extends GraphAlgorithmDelegatingGraph<K, VV, OLD, K, VV, NEW> {
// Required configuration
private TranslateFunction<OLD,NEW> translator;
......@@ -71,7 +71,36 @@ implements GraphAlgorithm<K, VV, OLD, Graph<K, VV, NEW>> {
}
@Override
public Graph<K, VV, NEW> run(Graph<K, VV, OLD> input) throws Exception {
protected String getAlgorithmName() {
return TranslateEdgeValues.class.getName();
}
@Override
protected boolean mergeConfiguration(GraphAlgorithmDelegatingGraph other) {
Preconditions.checkNotNull(other);
if (! TranslateEdgeValues.class.isAssignableFrom(other.getClass())) {
return false;
}
TranslateEdgeValues rhs = (TranslateEdgeValues) other;
// verify that configurations can be merged
if (translator != rhs.translator) {
return false;
}
// merge configurations
parallelism = Math.min(parallelism, rhs.parallelism);
return true;
}
@Override
public Graph<K, VV, NEW> runInternal(Graph<K, VV, OLD> input)
throws Exception {
DataSet<Edge<K, NEW>> translatedEdges = translateEdgeValues(input.getEdges(), translator, parallelism);
return Graph.fromDataSet(input.getVertices(), translatedEdges, input.getContext());
......
......@@ -21,8 +21,8 @@ package org.apache.flink.graph.asm.translate;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.GraphAlgorithm;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingGraph;
import org.apache.flink.util.Preconditions;
import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
......@@ -38,7 +38,7 @@ import static org.apache.flink.graph.asm.translate.Translate.translateVertexIds;
* @param <EV> edge value type
*/
public class TranslateGraphIds<OLD, NEW, VV, EV>
implements GraphAlgorithm<OLD, VV, EV, Graph<NEW, VV, EV>> {
extends GraphAlgorithmDelegatingGraph<OLD, VV, EV, NEW, VV, EV> {
// Required configuration
private TranslateFunction<OLD,NEW> translator;
......@@ -73,7 +73,36 @@ implements GraphAlgorithm<OLD, VV, EV, Graph<NEW, VV, EV>> {
}
@Override
public Graph<NEW, VV, EV> run(Graph<OLD, VV, EV> input) throws Exception {
protected String getAlgorithmName() {
return TranslateGraphIds.class.getName();
}
@Override
protected boolean mergeConfiguration(GraphAlgorithmDelegatingGraph other) {
Preconditions.checkNotNull(other);
if (! TranslateGraphIds.class.isAssignableFrom(other.getClass())) {
return false;
}
TranslateGraphIds rhs = (TranslateGraphIds) other;
// verify that configurations can be merged
if (translator != rhs.translator) {
return false;
}
// merge configurations
parallelism = Math.min(parallelism, rhs.parallelism);
return true;
}
@Override
public Graph<NEW, VV, EV> runInternal(Graph<OLD, VV, EV> input)
throws Exception {
// Vertices
DataSet<Vertex<NEW, VV>> translatedVertices = translateVertexIds(input.getVertices(), translator, parallelism);
......
......@@ -20,8 +20,8 @@ package org.apache.flink.graph.asm.translate;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.GraphAlgorithm;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingGraph;
import org.apache.flink.util.Preconditions;
import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
......@@ -36,7 +36,7 @@ import static org.apache.flink.graph.asm.translate.Translate.translateVertexValu
* @param <EV> edge value type
*/
public class TranslateVertexValues<K, OLD, NEW, EV>
implements GraphAlgorithm<K, OLD, EV, Graph<K, NEW, EV>> {
extends GraphAlgorithmDelegatingGraph<K, OLD, EV, K, NEW, EV> {
// Required configuration
private TranslateFunction<OLD, NEW> translator;
......@@ -71,7 +71,36 @@ implements GraphAlgorithm<K, OLD, EV, Graph<K, NEW, EV>> {
}
@Override
public Graph<K, NEW, EV> run(Graph<K, OLD, EV> input) throws Exception {
protected String getAlgorithmName() {
return TranslateVertexValues.class.getName();
}
@Override
protected boolean mergeConfiguration(GraphAlgorithmDelegatingGraph other) {
Preconditions.checkNotNull(other);
if (! TranslateVertexValues.class.isAssignableFrom(other.getClass())) {
return false;
}
TranslateVertexValues rhs = (TranslateVertexValues) other;
// verify that configurations can be merged
if (translator != rhs.translator) {
return false;
}
// merge configurations
parallelism = Math.min(parallelism, rhs.parallelism);
return true;
}
@Override
public Graph<K, NEW, EV> runInternal(Graph<K, OLD, EV> input)
throws Exception {
DataSet<Vertex<K, NEW>> translatedVertices = translateVertexValues(input.getVertices(), translator, parallelism);
return Graph.fromDataSet(translatedVertices, input.getEdges(), input.getContext());
......
......@@ -25,12 +25,12 @@ import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.functions.FunctionAnnotation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.GraphAlgorithm;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees;
import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
import org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient.Result;
import org.apache.flink.graph.utils.Murmur3_32;
import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
import org.apache.flink.types.CopyableValue;
import org.apache.flink.types.LongValue;
import org.apache.flink.util.Collector;
......@@ -55,7 +55,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
* @param <EV> edge value type
*/
public class LocalClusteringCoefficient<K extends Comparable<K> & CopyableValue<K>, VV, EV>
implements GraphAlgorithm<K, VV, EV, DataSet<Result<K>>> {
extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Result<K>> {
// Optional configuration
private int littleParallelism = PARALLELISM_DEFAULT;
......@@ -74,6 +74,25 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Result<K>>> {
return this;
}
@Override
protected String getAlgorithmName() {
return LocalClusteringCoefficient.class.getName();
}
@Override
protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) {
Preconditions.checkNotNull(other);
if (! LocalClusteringCoefficient.class.isAssignableFrom(other.getClass())) {
return false;
}
LocalClusteringCoefficient rhs = (LocalClusteringCoefficient) other;
littleParallelism = Math.min(littleParallelism, rhs.littleParallelism);
return true;
}
/*
* Implementation notes:
......@@ -86,12 +105,11 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Result<K>>> {
*/
@Override
public DataSet<Result<K>> run(Graph<K, VV, EV> input)
public DataSet<Result<K>> runInternal(Graph<K, VV, EV> input)
throws Exception {
// u, v, w, bitmask
DataSet<TriangleListing.Result<K>> triangles = input
.run(new TriangleListing<K,VV,EV>()
.setSortTriangleVertices(false)
.setLittleParallelism(littleParallelism));
// u, edge count
......
......@@ -33,9 +33,11 @@ import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.EdgeOrder;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.GraphAlgorithm;
import org.apache.flink.graph.asm.degree.annotate.directed.EdgeDegreesPair;
import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
import org.apache.flink.graph.library.clustering.directed.TriangleListing.Result;
import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
import org.apache.flink.graph.utils.proxy.OptionalBoolean;
import org.apache.flink.types.ByteValue;
import org.apache.flink.types.CopyableValue;
import org.apache.flink.util.Collector;
......@@ -60,10 +62,10 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
* @param <EV> edge value type
*/
public class TriangleListing<K extends Comparable<K> & CopyableValue<K>, VV, EV>
implements GraphAlgorithm<K, VV, EV, DataSet<TriangleListing.Result<K>>> {
extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Result<K>> {
// Optional configuration
private boolean sortTriangleVertices = false;
private OptionalBoolean sortTriangleVertices = new OptionalBoolean(false, false);
private int littleParallelism = PARALLELISM_DEFAULT;
......@@ -75,7 +77,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<TriangleListing.Result<K>>> {
* @return this
*/
public TriangleListing<K, VV, EV> setSortTriangleVertices(boolean sortTriangleVertices) {
this.sortTriangleVertices = sortTriangleVertices;
this.sortTriangleVertices.set(sortTriangleVertices);
return this;
}
......@@ -95,6 +97,27 @@ implements GraphAlgorithm<K, VV, EV, DataSet<TriangleListing.Result<K>>> {
return this;
}
@Override
protected String getAlgorithmName() {
return TriangleListing.class.getName();
}
@Override
protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) {
Preconditions.checkNotNull(other);
if (! TriangleListing.class.isAssignableFrom(other.getClass())) {
return false;
}
TriangleListing rhs = (TriangleListing) other;
sortTriangleVertices.mergeWith(rhs.sortTriangleVertices);
littleParallelism = Math.min(littleParallelism, rhs.littleParallelism);
return true;
}
/*
* Implementation notes:
*
......@@ -106,7 +129,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<TriangleListing.Result<K>>> {
*/
@Override
public DataSet<Result<K>> run(Graph<K, VV, EV> input)
public DataSet<Result<K>> runInternal(Graph<K, VV, EV> input)
throws Exception {
// u, v, bitmask where u < v
DataSet<Tuple3<K, K, ByteValue>> filteredByID = input
......@@ -151,7 +174,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<TriangleListing.Result<K>>> {
.setParallelism(littleParallelism)
.name("Triangle listing");
if (sortTriangleVertices) {
if (sortTriangleVertices.get()) {
triangles = triangles
.map(new SortTriangleVertices<K>())
.name("Sort triangle vertices");
......
......@@ -26,11 +26,11 @@ import org.apache.flink.api.java.functions.FunctionAnnotation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.GraphAlgorithm;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.asm.degree.annotate.undirected.VertexDegree;
import org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient.Result;
import org.apache.flink.graph.utils.Murmur3_32;
import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
import org.apache.flink.types.CopyableValue;
import org.apache.flink.types.LongValue;
import org.apache.flink.util.Collector;
......@@ -55,7 +55,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
* @param <EV> edge value type
*/
public class LocalClusteringCoefficient<K extends Comparable<K> & CopyableValue<K>, VV, EV>
implements GraphAlgorithm<K, VV, EV, DataSet<Result<K>>> {
extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Result<K>> {
// Optional configuration
private int littleParallelism = PARALLELISM_DEFAULT;
......@@ -75,6 +75,26 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Result<K>>> {
return this;
}
@Override
protected String getAlgorithmName() {
return LocalClusteringCoefficient.class.getName();
}
@Override
protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) {
Preconditions.checkNotNull(other);
if (! LocalClusteringCoefficient.class.isAssignableFrom(other.getClass())) {
return false;
}
LocalClusteringCoefficient rhs = (LocalClusteringCoefficient) other;
littleParallelism = Math.min(littleParallelism, rhs.littleParallelism);
return true;
}
/*
* Implementation notes:
*
......@@ -86,12 +106,11 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Result<K>>> {
*/
@Override
public DataSet<Result<K>> run(Graph<K, VV, EV> input)
public DataSet<Result<K>> runInternal(Graph<K, VV, EV> input)
throws Exception {
// u, v, w
DataSet<Tuple3<K,K,K>> triangles = input
.run(new TriangleListing<K,VV,EV>()
.setSortTriangleVertices(false)
.setLittleParallelism(littleParallelism));
// u, 1
......
......@@ -32,8 +32,9 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.GraphAlgorithm;
import org.apache.flink.graph.asm.degree.annotate.undirected.EdgeDegreePair;
import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
import org.apache.flink.graph.utils.proxy.OptionalBoolean;
import org.apache.flink.types.CopyableValue;
import org.apache.flink.types.LongValue;
import org.apache.flink.util.Collector;
......@@ -62,10 +63,10 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
* @param <EV> edge value type
*/
public class TriangleListing<K extends Comparable<K> & CopyableValue<K>, VV, EV>
implements GraphAlgorithm<K, VV, EV, DataSet<Tuple3<K, K, K>>> {
extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Tuple3<K, K, K>> {
// Optional configuration
private boolean sortTriangleVertices = false;
private OptionalBoolean sortTriangleVertices = new OptionalBoolean(false, false);
private int littleParallelism = PARALLELISM_DEFAULT;
......@@ -77,7 +78,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Tuple3<K, K, K>>> {
* @return this
*/
public TriangleListing<K, VV, EV> setSortTriangleVertices(boolean sortTriangleVertices) {
this.sortTriangleVertices = sortTriangleVertices;
this.sortTriangleVertices.set(sortTriangleVertices);
return this;
}
......@@ -97,6 +98,27 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Tuple3<K, K, K>>> {
return this;
}
@Override
protected String getAlgorithmName() {
return TriangleListing.class.getName();
}
@Override
protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) {
Preconditions.checkNotNull(other);
if (! TriangleListing.class.isAssignableFrom(other.getClass())) {
return false;
}
TriangleListing rhs = (TriangleListing) other;
sortTriangleVertices.mergeWith(rhs.sortTriangleVertices);
littleParallelism = Math.min(littleParallelism, rhs.littleParallelism);
return true;
}
/*
* Implementation notes:
*
......@@ -108,7 +130,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Tuple3<K, K, K>>> {
*/
@Override
public DataSet<Tuple3<K, K, K>> run(Graph<K, VV, EV> input)
public DataSet<Tuple3<K, K, K>> runInternal(Graph<K, VV, EV> input)
throws Exception {
// u, v where u < v
DataSet<Tuple2<K, K>> filteredByID = input
......@@ -145,7 +167,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Tuple3<K, K, K>>> {
.setParallelism(littleParallelism)
.name("Triangle listing");
if (sortTriangleVertices) {
if (sortTriangleVertices.get()) {
triangles = triangles
.map(new SortTriangleVertices<K>())
.name("Sort triangle vertices");
......
......@@ -36,9 +36,10 @@ import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.GraphAlgorithm;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.library.link_analysis.HITS.Result;
import org.apache.flink.graph.utils.Murmur3_32;
import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
import org.apache.flink.types.DoubleValue;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;
......@@ -62,7 +63,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
* @param <EV> edge value type
*/
public class HITS<K, VV, EV>
implements GraphAlgorithm<K, VV, EV, DataSet<HITS.Result<K>>> {
extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Result<K>> {
private static final String CHANGE_IN_SCORES = "change in scores";
......@@ -128,7 +129,32 @@ implements GraphAlgorithm<K, VV, EV, DataSet<HITS.Result<K>>> {
}
@Override
public DataSet<Result<K>> run(Graph<K, VV, EV> input)
protected String getAlgorithmName() {
return HITS.class.getName();
}
@Override
protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) {
Preconditions.checkNotNull(other);
if (! HITS.class.isAssignableFrom(other.getClass())) {
return false;
}
HITS rhs = (HITS) other;
// merge configurations
maxIterations = Math.max(maxIterations, rhs.maxIterations);
convergenceThreshold = Math.min(convergenceThreshold, rhs.convergenceThreshold);
parallelism = Math.min(parallelism, rhs.parallelism);
return true;
}
@Override
public DataSet<Result<K>> runInternal(Graph<K, VV, EV> input)
throws Exception {
DataSet<Tuple2<K, K>> edges = input
.getEdges()
......
......@@ -33,11 +33,11 @@ import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.GraphAlgorithm;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.asm.degree.annotate.undirected.VertexDegree;
import org.apache.flink.graph.library.similarity.AdamicAdar.Result;
import org.apache.flink.graph.utils.Murmur3_32;
import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
import org.apache.flink.types.CopyableValue;
import org.apache.flink.types.FloatValue;
import org.apache.flink.types.IntValue;
......@@ -71,7 +71,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
* @param <EV> edge value type
*/
public class AdamicAdar<K extends CopyableValue<K>, VV, EV>
implements GraphAlgorithm<K, VV, EV, DataSet<Result<K>>> {
extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Result<K>> {
private static final int GROUP_SIZE = 64;
......@@ -127,6 +127,35 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Result<K>>> {
return this;
}
@Override
protected String getAlgorithmName() {
return AdamicAdar.class.getName();
}
@Override
protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) {
Preconditions.checkNotNull(other);
if (! AdamicAdar.class.isAssignableFrom(other.getClass())) {
return false;
}
AdamicAdar rhs = (AdamicAdar) other;
// verify that configurations can be merged
if (minimumRatio != rhs.minimumRatio ||
minimumScore != rhs.minimumScore) {
return false;
}
// merge configurations
littleParallelism = Math.min(littleParallelism, rhs.littleParallelism);
return true;
}
/*
* Implementation notes:
*
......@@ -136,7 +165,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Result<K>>> {
*/
@Override
public DataSet<Result<K>> run(Graph<K, VV, EV> input)
public DataSet<Result<K>> runInternal(Graph<K, VV, EV> input)
throws Exception {
// s, d(s), 1/log(d(s))
DataSet<Tuple3<K, LongValue, FloatValue>> inverseLogDegree = input
......
......@@ -28,10 +28,10 @@ import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.GraphAlgorithm;
import org.apache.flink.graph.asm.degree.annotate.undirected.EdgeTargetDegree;
import org.apache.flink.graph.library.similarity.JaccardIndex.Result;
import org.apache.flink.graph.utils.Murmur3_32;
import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
import org.apache.flink.types.CopyableValue;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.LongValue;
......@@ -61,7 +61,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
* @param <EV> edge value type
*/
public class JaccardIndex<K extends CopyableValue<K>, VV, EV>
implements GraphAlgorithm<K, VV, EV, DataSet<Result<K>>> {
extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Result<K>> {
public static final int DEFAULT_GROUP_SIZE = 64;
......@@ -153,6 +153,39 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Result<K>>> {
return this;
}
@Override
protected String getAlgorithmName() {
return JaccardIndex.class.getName();
}
@Override
protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) {
Preconditions.checkNotNull(other);
if (! JaccardIndex.class.isAssignableFrom(other.getClass())) {
return false;
}
JaccardIndex rhs = (JaccardIndex) other;
// verify that configurations can be merged
if (unboundedScores != rhs.unboundedScores ||
minimumScoreNumerator != rhs.minimumScoreNumerator ||
minimumScoreDenominator != rhs.minimumScoreDenominator ||
maximumScoreNumerator != rhs.maximumScoreNumerator ||
maximumScoreDenominator != rhs.maximumScoreDenominator) {
return false;
}
// merge configurations
groupSize = Math.max(groupSize, rhs.groupSize);
littleParallelism = Math.min(littleParallelism, rhs.littleParallelism);
return true;
}
/*
* Implementation notes:
*
......@@ -162,7 +195,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Result<K>>> {
*/
@Override
public DataSet<Result<K>> run(Graph<K, VV, EV> input)
public DataSet<Result<K>> runInternal(Graph<K, VV, EV> input)
throws Exception {
// s, t, d(t)
DataSet<Edge<K, Tuple2<EV, LongValue>>> neighborDegree = input
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.graph.utils.proxy;
import javassist.util.proxy.MethodFilter;
import javassist.util.proxy.MethodHandler;
import javassist.util.proxy.ProxyFactory;
import javassist.util.proxy.ProxyObject;
import org.objenesis.ObjenesisStd;
import java.lang.reflect.Method;
/**
* Wraps an object with a proxy delegate whose method handler invokes all
* method calls on the wrapped object. This object can be later replaced.
*
* @param <X>
*/
public class Delegate<X> {
private X obj;
private X proxy = null;
/**
* Set the initial delegated object.
*
* @param obj delegated object
*/
public Delegate(X obj) {
setObject(obj);
}
/**
* Change the delegated object.
*
* @param obj delegated object
*/
public void setObject(X obj) {
this.obj = obj;
}
/**
* Instantiates and returns a proxy object which subclasses the
* delegated object. The proxy's method handler invokes all methods
* on the delegated object that is set at the time of invocation.
*
* @return delegating proxy
*/
@SuppressWarnings("unchecked")
public X getProxy() {
if (proxy != null) {
return proxy;
}
ProxyFactory factory = new ProxyFactory();
factory.setSuperclass(obj.getClass());
// create the class and instantiate an instance without calling a constructor
Class<? extends X> proxyClass = factory.createClass(new MethodFilter() {
@Override
public boolean isHandled(Method method) {
return true;
}
});
proxy = new ObjenesisStd().newInstance(proxyClass);
// create and set a handler to invoke all method calls on the delegated object
((ProxyObject) proxy).setHandler(new MethodHandler() {
@Override
public Object invoke(Object self, Method thisMethod, Method proceed, Object[] args) throws Throwable {
// method visibility may be restricted
thisMethod.setAccessible(true);
return thisMethod.invoke(obj, args);
}
});
return proxy;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.graph.utils.proxy;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.GraphAlgorithm;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* A {@link GraphAlgorithm} transforms an input {@link Graph} into an output of
* type {@code T}. A {@code GraphAlgorithmDelegatingDataSet} wraps the resultant
* {@link DataSet} with a delegating proxy object. The delegated object can be
* replaced when the same algorithm is run on the same input with a mergeable
* configuration. This allows algorithms to be composed of implicitly reusable
* algorithms without publicly sharing intermediate {@link DataSet}s.
*
* @param <K> ID type
* @param <VV> vertex value type
* @param <EV> edge value type
* @param <T> output type
*/
public abstract class GraphAlgorithmDelegatingDataSet<K, VV, EV, T>
implements GraphAlgorithm<K, VV, EV, DataSet<T>> {
// each algorithm and input pair may map to multiple configurations
private static Map<GraphAlgorithmDelegatingDataSet, List<GraphAlgorithmDelegatingDataSet>> cache =
Collections.synchronizedMap(new HashMap<GraphAlgorithmDelegatingDataSet, List<GraphAlgorithmDelegatingDataSet>>());
private Graph<K,VV,EV> input;
private Delegate<DataSet<T>> delegate;
/**
* Algorithms are identified by name rather than by class to allow subclassing.
*
* @return name of the algorithm, which may be shared by multiple classes
* implementing the same algorithm and generating the same output
*/
protected abstract String getAlgorithmName();
/**
* An algorithm must first test whether the configurations can be merged
* before merging individual fields.
*
* @param other the algorithm with which to compare and merge
* @return true if and only if configuration has been merged and the
* algorithm's output can be reused
*/
protected abstract boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other);
/**
* The implementation of the algorithm, renamed from {@link GraphAlgorithm#run(Graph)}.
*
* @param input the input graph
* @return the algorithm's output
* @throws Exception
*/
protected abstract DataSet<T> runInternal(Graph<K, VV, EV> input) throws Exception;
@Override
public final int hashCode() {
return new HashCodeBuilder(17, 37)
.append(input)
.append(getAlgorithmName())
.toHashCode();
}
@Override
public final boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (obj == this) {
return true;
}
if (! GraphAlgorithmDelegatingDataSet.class.isAssignableFrom(obj.getClass())) {
return false;
}
GraphAlgorithmDelegatingDataSet rhs = (GraphAlgorithmDelegatingDataSet) obj;
return new EqualsBuilder()
.append(input, rhs.input)
.append(getAlgorithmName(), rhs.getAlgorithmName())
.isEquals();
}
@Override
@SuppressWarnings("unchecked")
public final DataSet<T> run(Graph<K, VV, EV> input)
throws Exception {
this.input = input;
if (cache.containsKey(this)) {
for (GraphAlgorithmDelegatingDataSet<K, VV, EV, T> other : cache.get(this)) {
if (mergeConfiguration(other)) {
// configuration has been merged so generate new output
DataSet<T> output = runInternal(input);
// update delegatee object and reuse delegate
other.delegate.setObject(output);
delegate = other.delegate;
return delegate.getProxy();
}
}
}
// no mergeable configuration found so generate new output
DataSet<T> output = runInternal(input);
// create a new delegate to wrap the algorithm output
delegate = new Delegate<>(output);
// cache this result
if (cache.containsKey(this)) {
cache.get(this).add(this);
} else {
cache.put(this, new ArrayList(Collections.singletonList(this)));
}
return delegate.getProxy();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.graph.utils.proxy;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.GraphAlgorithm;
import org.apache.flink.graph.Vertex;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* A {@link GraphAlgorithm} transforms an input {@link Graph} into an output of
* type {@code T}. A {@code GraphAlgorithmDelegatingGraph} wraps the resultant
* {@link Graph} with a delegating proxy object. The delegated object can be
* replaced when the same algorithm is run on the same input with a mergeable
* configuration. This allows algorithms to be composed of implicitly reusable
* algorithms without publicly sharing intermediate {@link DataSet}s.
*
* @param <IN_K> input ID type
* @param <IN_VV> input vertex value type
* @param <IN_EV> input edge value type
* @param <OUT_K> output ID type
* @param <OUT_VV> output vertex value type
* @param <OUT_EV> output edge value type
*/
public abstract class GraphAlgorithmDelegatingGraph<IN_K, IN_VV, IN_EV, OUT_K, OUT_VV, OUT_EV>
implements GraphAlgorithm<IN_K, IN_VV, IN_EV, Graph<OUT_K, OUT_VV, OUT_EV>> {
// each algorithm and input pair may map to multiple configurations
private static Map<GraphAlgorithmDelegatingGraph, List<GraphAlgorithmDelegatingGraph>> cache =
Collections.synchronizedMap(new HashMap<GraphAlgorithmDelegatingGraph, List<GraphAlgorithmDelegatingGraph>>());
private Graph<IN_K, IN_VV, IN_EV> input;
private Delegate<DataSet<Vertex<OUT_K, OUT_VV>>> verticesDelegate;
private Delegate<DataSet<Edge<OUT_K, OUT_EV>>> edgesDelegate;
/**
* Algorithms are identified by name rather than by class to allow subclassing.
*
* @return name of the algorithm, which may be shared by multiple classes
* implementing the same algorithm and generating the same output
*/
protected abstract String getAlgorithmName();
/**
* An algorithm must first test whether the configurations can be merged
* before merging individual fields.
*
* @param other the algorithm with which to compare and merge
* @return true if and only if configuration has been merged and the
* algorithm's output can be reused
*/
protected abstract boolean mergeConfiguration(GraphAlgorithmDelegatingGraph other);
/**
* The implementation of the algorithm, renamed from {@link GraphAlgorithm#run(Graph)}.
*
* @param input the input graph
* @return the algorithm's output
* @throws Exception
*/
protected abstract Graph<OUT_K, OUT_VV, OUT_EV> runInternal(Graph<IN_K, IN_VV, IN_EV> input) throws Exception;
@Override
public final int hashCode() {
return new HashCodeBuilder(17, 37)
.append(input)
.append(getAlgorithmName())
.toHashCode();
}
@Override
public final boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (obj == this) {
return true;
}
if (! GraphAlgorithmDelegatingGraph.class.isAssignableFrom(obj.getClass())) {
return false;
}
GraphAlgorithmDelegatingGraph rhs = (GraphAlgorithmDelegatingGraph) obj;
return new EqualsBuilder()
.append(input, rhs.input)
.append(getAlgorithmName(), rhs.getAlgorithmName())
.isEquals();
}
@Override
@SuppressWarnings("unchecked")
public final Graph<OUT_K, OUT_VV, OUT_EV> run(Graph<IN_K, IN_VV, IN_EV> input)
throws Exception {
this.input = input;
if (cache.containsKey(this)) {
for (GraphAlgorithmDelegatingGraph<IN_K, IN_VV, IN_EV, OUT_K, OUT_VV, OUT_EV> other : cache.get(this)) {
if (mergeConfiguration(other)) {
// configuration has been merged so generate new output
Graph<OUT_K, OUT_VV, OUT_EV> output = runInternal(input);
// update delegatee object and reuse delegate
other.verticesDelegate.setObject(output.getVertices());
verticesDelegate = other.verticesDelegate;
other.edgesDelegate.setObject(output.getEdges());
edgesDelegate = other.edgesDelegate;
return Graph.fromDataSet(verticesDelegate.getProxy(), edgesDelegate.getProxy(), output.getContext());
}
}
}
// no mergeable configuration found so generate new output
Graph<OUT_K, OUT_VV, OUT_EV> output = runInternal(input);
// create a new delegate to wrap the algorithm output
verticesDelegate = new Delegate<>(output.getVertices());
edgesDelegate = new Delegate<>(output.getEdges());
// cache this result
if (cache.containsKey(this)) {
cache.get(this).add(this);
} else {
cache.put(this, new ArrayList(Collections.singletonList(this)));
}
return Graph.fromDataSet(verticesDelegate.getProxy(), edgesDelegate.getProxy(), output.getContext());
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.graph.utils.proxy;
import org.apache.flink.graph.GraphAlgorithm;
/**
* A multi-state boolean.
* <br/>
* This class is used by {@link GraphAlgorithm} configuration options to set a
* default value which can be overwritten. The default value is also used when
* algorithm configurations are merged and conflict.
*/
public class OptionalBoolean {
protected enum State {
UNSET,
FALSE,
TRUE,
CONFLICTING
}
private State state = State.UNSET;
private final boolean valueIfUnset;
private final boolean valueIfConflicting;
/**
* An {@code OptionalBoolean} has three possible states: true, false, and
* "unset". The value is set when merged with a value of true or false. The
* state returns to unset either explicitly or when true is merged with false.
*
* @param valueIfUnset the value to return when the object's state is unset
* @param valueIfConflicting the value to return when the object's state is conflicting
*/
public OptionalBoolean(boolean valueIfUnset, boolean valueIfConflicting) {
this.valueIfUnset = valueIfUnset;
this.valueIfConflicting = valueIfConflicting;
}
/**
* Get the boolean state.
*
* @return boolean state
*/
public boolean get() {
switch (state) {
case UNSET:
return valueIfUnset;
case FALSE:
return false;
case TRUE:
return true;
case CONFLICTING:
return valueIfConflicting;
default:
throw new RuntimeException("Unknown state");
}
}
/**
* Set the boolean state.
*
* @param value boolean state
*/
public void set(boolean value) {
this.state = (value ? State.TRUE : State.FALSE);
}
/**
* Reset to the unset state.
*/
public void unset() {
this.state = State.UNSET;
}
/**
* Get the actual state.
*
* @return actual state
*/
protected State getState() {
return state;
}
/**
* The conflicting states are true with false and false with true.
*
* @param other object to test with
* @return whether the objects conflict
*/
public boolean conflictsWith(OptionalBoolean other) {
return state == State.CONFLICTING
|| other.state == State.CONFLICTING
|| (state == State.TRUE && other.state == State.FALSE)
|| (state == State.FALSE && other.state == State.TRUE);
}
/**
* State transitions:
* if the states are the same then no change
* if either state is unset then change to the other state
* if the states are conflicting then set to the conflicting state
*
* @param other object from which to merge state
*/
public void mergeWith(OptionalBoolean other) {
if (state == other.state) {
// no change in state
} else if (state == State.UNSET) {
state = other.state;
} else if (other.state == State.UNSET) {
// no change in state
} else {
state = State.CONFLICTING;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.graph.utils.proxy;
import org.apache.flink.graph.utils.proxy.OptionalBoolean.State;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class OptionalBooleanTest {
private OptionalBoolean u;
private OptionalBoolean f;
private OptionalBoolean t;
private OptionalBoolean c;
@Before
public void setup() {
u = new OptionalBoolean(false, true);
f = new OptionalBoolean(false, true);
t = new OptionalBoolean(false, true);
c = new OptionalBoolean(false, true);
f.set(false);
t.set(true);
c.set(true);
c.mergeWith(f);
}
@Test
public void testIsMismatchedWith()
throws Exception {
// unset, unset
assertFalse(u.conflictsWith(u));
// unset, false
assertFalse(u.conflictsWith(f));
// unset, true
assertFalse(u.conflictsWith(t));
// unset, conflicting
assertTrue(u.conflictsWith(c));
// false, unset
assertFalse(f.conflictsWith(u));
// false, false
assertFalse(f.conflictsWith(f));
// false, true
assertTrue(f.conflictsWith(t));
// false, conflicting
assertTrue(f.conflictsWith(c));
// true, unset
assertFalse(t.conflictsWith(u));
// true, false
assertTrue(t.conflictsWith(f));
// true, true
assertFalse(t.conflictsWith(t));
// true, conflicting
assertTrue(t.conflictsWith(c));
// conflicting, unset
assertTrue(c.conflictsWith(u));
// conflicting, false
assertTrue(c.conflictsWith(f));
// conflicting, true
assertTrue(c.conflictsWith(t));
// conflicting, conflicting
assertTrue(c.conflictsWith(c));
}
@Test
public void testMergeWith()
throws Exception {
// unset, unset => unset
u.mergeWith(u);
assertEquals(State.UNSET, u.getState());
// unset, false => false
u.mergeWith(f);
assertEquals(State.FALSE, u.getState());
u.unset();
// unset, true => true
u.mergeWith(t);
assertEquals(State.TRUE, u.getState());
u.unset();
// unset, conflicting => conflicting
u.mergeWith(c);
assertEquals(State.CONFLICTING, u.getState());
u.unset();
// false, unset => false
f.mergeWith(u);
assertEquals(State.FALSE, f.getState());
// false, false => false
f.mergeWith(f);
assertEquals(State.FALSE, f.getState());
// false, true => conflicting
f.mergeWith(t);
assertEquals(State.CONFLICTING, f.getState());
f.set(false);
// false, conflicting => conflicting
f.mergeWith(c);
assertEquals(State.CONFLICTING, f.getState());
f.set(false);
// true, unset => true
t.mergeWith(u);
assertEquals(State.TRUE, t.getState());
// true, false => conflicting
t.mergeWith(f);
assertEquals(State.CONFLICTING, t.getState());
t.set(true);
// true, true => true
t.mergeWith(t);
assertEquals(State.TRUE, t.getState());
// true, conflicting => conflicting
t.mergeWith(c);
assertEquals(State.CONFLICTING, t.getState());
t.set(true);
// conflicting, unset => conflicting
c.mergeWith(u);
assertEquals(State.CONFLICTING, c.getState());
// conflicting, false => conflicting
c.mergeWith(f);
assertEquals(State.CONFLICTING, c.getState());
// conflicting, true => conflicting
c.mergeWith(t);
assertEquals(State.CONFLICTING, c.getState());
// conflicting, conflicting => conflicting
c.mergeWith(c);
assertEquals(State.CONFLICTING, c.getState());
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册