提交 1dfbcbaa 编写于 作者: A andralungu 提交者: vasia

[FLINK-1523] [gelly] Added Vertex Centric Configuration Tests

This commit squashes the following:

Pratially addressed inline comments

Added test for removal of a non-SP-edge
上级 585d27d0
......@@ -352,11 +352,11 @@ When the aggregation computation does not require access to the vertex value (fo
Vertex-centric Iterations
-----------
Gelly wraps Flink's [Spargel API](spargel_guide.html) to provide methods for vertex-centric iterations.
Like in Spargel, the user only needs to implement two functions: a `VertexUpdateFunction`, which defines how a vertex will update its value
based on the received messages and a `MessagingFunction`, which allows a vertex to send out messages for the next superstep.
These functions and the maximum number of iterations to run are given as parameters to Gelly's `runVertexCentricIteration`.
This method will execute the vertex-centric iteration on the input Graph and return a new Graph, with updated vertex values:
Gelly wraps Flink's [Spargel API](spargel_guide.html) to provide methods for vertex-centric iterations. Like in Spargel, the user only needs to implement two functions: a `VertexUpdateFunction`, which defines how a vertex will update its value based on the received messages and a `MessagingFunction`, which allows a vertex to send out messages for the next superstep.
These functions and the maximum number of iterations to run are given as parameters to Gelly's `runVertexCentricIteration`. This method will execute the vertex-centric iteration on the input Graph and return a new Graph, with updated vertex values:
A vertex-centric iteration can be extended with information such as the total number of vertices, the in degree and out degree.
Additionally, the neighborhood type (in/out/all) over which to run the vertex-centric iteration can be specified. By default, the updates from the in-neighbors are used to modify the current vertex's state and messages are sent to out-neighbors.
{% highlight java %}
Graph<Long, Double, Double> graph = ...
......@@ -388,6 +388,14 @@ all aggregates globally once per superstep and makes them available in the next
* <strong>Broadcast Variables</strong>: DataSets can be added as [Broadcast Variables]({{site.baseurl}}/apis/programming_guide.html#broadcast-variables) to the `VertexUpdateFunction` and `MessagingFunction`, using the `addBroadcastSetForUpdateFunction()` and `addBroadcastSetForMessagingFunction()` methods, respectively.
* <strong>Number of Vertices</strong>: Accessing the total number of vertices within the iteration. This property can be set using the `setOptNumVertices()` method.
The number of vertices can then be accessed in the vertex update function and in the messaging function using the `getNumberOfVertices()` method.
* <strong>Degrees</strong>: Accessing the in/out degree for a vertex within an iteration. This property can be set using the `setOptDegrees()` method.
The in/out degrees can then be accessed in the vertex update function and in the messaging function, per vertex using `vertex.getInDegree()` or `vertex.getOutDegree()`.
* <strong>Messaging Direction</strong>: By default, a vertex sends messages to its out-neighbors and updates its value based on messages received from its in-neighbors. This configuration option allows users to change the messaging direction to either `EdgeDirection.IN`, `EdgeDirection.OUT`, `EdgeDirection.ALL`. The messaging direction also dictates the update direction which would be `EdgeDirection.OUT`, `EdgeDirection.IN` and `EdgeDirection.ALL`, respectively. This property can be set using the `setDirection()` method.
{% highlight java %}
Graph<Long, Double, Double> graph = ...
......@@ -438,60 +446,35 @@ public static final class Messenger extends MessagingFunction {...}
{% endhighlight %}
### Vertex-Centric Iteration Extensions
A vertex-centric iteration can be extended with information such as the total number of vertices,
the in degree and out degree. Additionally, the neighborhood type (in/out/all) over which to
run the vertex-centric iteration can be specified. By default, the updates from the in-neighbors are used
to modify the current vertex's state and messages are sent to out-neighbors.
In order to activate these options, the following parameters must be set to true:
<strong>Number of Vertices</strong>: Accessing the total number of vertices within the iteration. This property
can be set using the `setOptNumVertices()` method.
The number of vertices can then be accessed in the vertex update function and in the messaging function
using the `getNumberOfVertices()` method.
<strong>Degrees</strong>: Accessing the in/out degree for a vertex within an iteration. This property can be set
using the `setOptDegrees()` method.
The in/out degrees can then be accessed in the vertex update function and in the messaging function, per vertex
using `vertex.getInDegree()` or `vertex.getOutDegree()`.
<strong>Messaging Direction</strong>: The direction in which messages are sent. This can be either EdgeDirection.IN,
EdgeDirection.OUT, EdgeDirection.ALL. The messaging direction also dictates the update direction which would be
EdgeDirection.OUT, EdgeDirection.IN and EdgeDirection.ALL, respectively. This property can be set using the
`setDirection()` method.
The following example illustrates the usage of the degree as well as the number of vertices options.
{% highlight java %}
Graph<Long, Double, Double> graph = ...
// create the vertex-centric iteration
VertexCentricIteration<Long, Double, Double, Double> iteration =
graph.createVertexCentricIteration(
new VertexDistanceUpdater(), new MinDistanceMessenger(), maxIterations);
Graph<Long, Double, Double> graph = ...
// set the messaging direction
iteration.setDirection(EdgeDirection.IN);
// configure the iteration
IterationConfiguration parameters = new IterationConfiguration();
// set the number of vertices option to true
iteration.setOptNumVertices(true);
parameters.setOptNumVertices(true);
// set the degree option to true
iteration.setOptDegrees(true);
parameters.setOptDegrees(true);
// run the computation
graph.runVertexCentricIteration(iteration);
// run the vertex-centric iteration, also passing the configuration parameters
Graph<Long, Double, Double> result =
graph.runVertexCentricIteration(
new VertexUpdater(), new Messenger(), maxIterations, parameters);
// user-defined functions
public static final class VertexDistanceUpdater {
public static final class VertexUpdater {
...
// get the number of vertices
long numVertices = getNumberOfVertices();
...
}
public static final class MinDistanceMessenger {
public static final class Messenger {
...
// decrement the number of out-degrees
outDegree = vertex.getOutDegree() - 1;
......@@ -500,6 +483,49 @@ public static final class MinDistanceMessenger {
{% endhighlight %}
The following example illustrates the usage of the edge direction option. Vertices update their values to contain a list of all their in-neighbors.
{% highlight java %}
Graph<Long, HashSet<Long>, Double> graph = ...
// configure the iteration
IterationConfiguration parameters = new IterationConfiguration();
// set the messaging direction
parameters.setDirection(EdgeDirection.IN);
// run the vertex-centric iteration, also passing the configuration parameters
DataSet<Vertex<Long, HashSet<Long>>> result =
graph.runVertexCentricIteration(
new VertexUpdater(), new Messenger(), maxIterations, parameters)
.getVertices();
// user-defined functions
public static final class VertexUpdater {
@Override
public void updateVertex(Vertex<Long, HashSet<Long>> vertex, MessageIterator<Long> messages) throws Exception {
vertex.getValue().clear();
for(long msg : messages) {
vertex.getValue().add(msg);
}
setNewVertexValue(vertex.getValue());
}
}
public static final class Messenger {
@Override
public void sendMessages(Vertex<Long, HashSet<Long>> vertex) throws Exception {
for (Edge<Long, Long> edge : getEdges()) {
sendMessageTo(edge.getSource(), vertex.getId());
}
}
}
{% endhighlight %}
[Back to top](#top)
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.graph;
/**
* The exception that gets thrown when the degree option or the number of vertices
* option in {@link org.apache.flink.graph.spargel.IterationConfiguration} was not set.
*/
public class InaccessibleMethodException extends Exception {
public InaccessibleMethodException() {}
public InaccessibleMethodException(String text) {
super(text);
}
}
......@@ -35,8 +35,8 @@ public class Vertex<K, V> extends Tuple2<K, V> {
private Long outDegree;
public Vertex(){
inDegree = 0L;
outDegree = 0L;
inDegree = -1L;
outDegree = -1L;
}
public Vertex(K k, V val) {
......@@ -62,7 +62,11 @@ public class Vertex<K, V> extends Tuple2<K, V> {
this.f1 = val;
}
public Long getInDegree() {
public Long getInDegree() throws Exception{
if(inDegree == -1) {
throw new InaccessibleMethodException("The degree option was not set. To access the degrees, " +
"call iterationConfiguration.setOptDegrees(true).");
}
return inDegree;
}
......@@ -70,7 +74,11 @@ public class Vertex<K, V> extends Tuple2<K, V> {
this.inDegree = inDegree;
}
public Long getOutDegree() {
public Long getOutDegree() throws Exception{
if(outDegree == -1) {
throw new InaccessibleMethodException("The degree option was not set. To access the degrees, " +
"call iterationConfiguration.setOptDegrees(true).");
}
return outDegree;
}
......
......@@ -20,11 +20,8 @@ package org.apache.flink.graph.example;
import org.apache.flink.api.common.ProgramDescription;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.EdgeDirection;
import org.apache.flink.graph.Graph;
......@@ -34,12 +31,21 @@ import org.apache.flink.graph.spargel.IterationConfiguration;
import org.apache.flink.graph.spargel.MessageIterator;
import org.apache.flink.graph.spargel.MessagingFunction;
import org.apache.flink.graph.spargel.VertexUpdateFunction;
import org.apache.flink.graph.utils.Tuple2ToVertexMap;
import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
/**
* Incremental Single Sink Shortest Paths Example.
* Incremental Single Sink Shortest Paths Example. Shortest Paths are incrementally updated
* upon edge removal.
*
* This example illustrates the usage of vertex-centric iteration's
* messaging direction configuration option.
*
* The program takes as input the resulted graph after a SSSP computation,
* an edge to be removed and the initial graph(i.e. before SSSP was computed).
* In the following description, SP-graph is used as an abbreviation for
* the graph resulted from the SSSP computation. We denote the edges that belong to this
* graph by SP-edges.
*
* - If the removed edge does not belong to the SP-graph, no computation is necessary.
* The edge is simply removed from the graph.
......@@ -55,7 +61,8 @@ import org.apache.flink.graph.spargel.VertexUpdateFunction;
* or when we reach a vertex with no SP-in-neighbors.
*
* Usage <code>IncrementalSSSPExample &lt;vertex path&gt; &lt;edge path&gt; &lt;edges in SSSP&gt;
* &lt;edge to be removed&gt; &lt;result path&gt; &lt;number of iterations&gt;</code><br>
* &lt;src id edge to be removed&gt; &lt;trg id edge to be removed&gt; &lt;val edge to be removed&gt;
* &lt;result path&gt; &lt;number of iterations&gt;</code><br>
* If no parameters are provided, the program is run with default data from
* {@link org.apache.flink.graph.example.utils.IncrementalSSSPData}
*/
......@@ -137,7 +144,7 @@ public class IncrementalSSSPExample implements ProgramDescription {
* @param edgesInSSSP
* @return
*/
private static boolean isInSSSP(final Edge<Long, Double> edgeToBeRemoved, DataSet<Edge<Long, Double>> edgesInSSSP) throws Exception {
public static boolean isInSSSP(final Edge<Long, Double> edgeToBeRemoved, DataSet<Edge<Long, Double>> edgesInSSSP) throws Exception {
return edgesInSSSP.filter(new FilterFunction<Edge<Long, Double>>() {
@Override
......@@ -204,7 +211,11 @@ public class IncrementalSSSPExample implements ProgramDescription {
private static String edgesInSSSPInputPath = null;
private static String edgeToBeRemoved = null;
private static Long srcEdgeToBeRemoved = null;
private static Long trgEdgeToBeRemoved = null;
private static Double valEdgeToBeRemoved = null;
private static String outputPath = null;
......@@ -212,19 +223,23 @@ public class IncrementalSSSPExample implements ProgramDescription {
private static boolean parseParameters(String[] args) {
if (args.length > 0) {
if (args.length == 6) {
if (args.length == 8) {
fileOutput = true;
verticesInputPath = args[0];
edgesInputPath = args[1];
edgesInSSSPInputPath = args[2];
edgeToBeRemoved = args[3];
outputPath = args[4];
maxIterations = Integer.parseInt(args[5]);
srcEdgeToBeRemoved = Long.parseLong(args[3]);
trgEdgeToBeRemoved = Long.parseLong(args[4]);
valEdgeToBeRemoved = Double.parseDouble(args[5]);
outputPath = args[6];
maxIterations = Integer.parseInt(args[7]);
} else {
System.out.println("Executing IncrementalSSSP example with default parameters and built-in default data.");
System.out.println("Provide parameters to read input data from files.");
System.out.println("See the documentation for the correct format of input files.");
System.out.println("Usage: IncrementalSSSP <vertex path> <edge path> <edges in SSSP> <edge to be removed> <output path> <max iterations>");
System.out.println("Usage: IncrementalSSSP <vertex path> <edge path> <edges in SSSP> " +
"<src id edge to be removed> <trg id edge to be removed> <val edge to be removed> " +
"<output path> <max iterations>");
return false;
}
......@@ -237,15 +252,10 @@ public class IncrementalSSSPExample implements ProgramDescription {
return env.readCsvFile(verticesInputPath)
.lineDelimiter("\n")
.types(Long.class, Double.class)
.map(new MapFunction<Tuple2<Long, Double>, Vertex<Long, Double>>() {
@Override
public Vertex<Long, Double> map(Tuple2<Long, Double> tuple2) throws Exception {
return new Vertex<Long, Double>(tuple2.f0, tuple2.f1);
}
});
.map(new Tuple2ToVertexMap<Long, Double>());
} else {
System.err.println("Usage: IncrementalSSSP <vertex path> <edge path> <edges in SSSP> <edge to be removed> " +
System.err.println("Usage: IncrementalSSSP <vertex path> <edge path> <edges in SSSP> " +
"<src id edge to be removed> <trg id edge to be removed> <val edge to be removed> " +
"<output path> <max iterations>");
return IncrementalSSSPData.getDefaultVertexDataSet(env);
}
......@@ -256,15 +266,10 @@ public class IncrementalSSSPExample implements ProgramDescription {
return env.readCsvFile(edgesInputPath)
.lineDelimiter("\n")
.types(Long.class, Long.class, Double.class)
.map(new MapFunction<Tuple3<Long, Long, Double>, Edge<Long, Double>>() {
@Override
public Edge<Long, Double> map(Tuple3<Long, Long, Double> tuple3) throws Exception {
return new Edge(tuple3.f0, tuple3.f1, tuple3.f2);
}
});
.map(new Tuple3ToEdgeMap<Long, Double>());
} else {
System.err.println("Usage: IncrementalSSSP <vertex path> <edge path> <edges in SSSP> <edge to be removed> " +
System.err.println("Usage: IncrementalSSSP <vertex path> <edge path> <edges in SSSP> " +
"<src id edge to be removed> <trg id edge to be removed> <val edge to be removed> " +
"<output path> <max iterations>");
return IncrementalSSSPData.getDefaultEdgeDataSet(env);
}
......@@ -275,15 +280,10 @@ public class IncrementalSSSPExample implements ProgramDescription {
return env.readCsvFile(edgesInSSSPInputPath)
.lineDelimiter("\n")
.types(Long.class, Long.class, Double.class)
.map(new MapFunction<Tuple3<Long, Long, Double>, Edge<Long, Double>>() {
@Override
public Edge<Long, Double> map(Tuple3<Long, Long, Double> tuple3) throws Exception {
return new Edge(tuple3.f0, tuple3.f1, tuple3.f2);
}
});
.map(new Tuple3ToEdgeMap<Long, Double>());
} else {
System.err.println("Usage: IncrementalSSSP <vertex path> <edge path> <edges in SSSP> <edge to be removed> " +
System.err.println("Usage: IncrementalSSSP <vertex path> <edge path> <edges in SSSP> " +
"<src id edge to be removed> <trg id edge to be removed> <val edge to be removed> " +
"<output path> <max iterations>");
return IncrementalSSSPData.getDefaultEdgesInSSSP(env);
}
......@@ -291,12 +291,10 @@ public class IncrementalSSSPExample implements ProgramDescription {
private static Edge<Long, Double> getEdgeToBeRemoved() {
if (fileOutput) {
String [] edgeComponents = edgeToBeRemoved.split(",");
return new Edge<Long, Double>(Long.parseLong(edgeComponents[0]), Long.parseLong(edgeComponents[1]),
Double.parseDouble(edgeComponents[2]));
return new Edge<Long, Double>(srcEdgeToBeRemoved, trgEdgeToBeRemoved, valEdgeToBeRemoved);
} else {
System.err.println("Usage: IncrementalSSSP <vertex path> <edge path> <edges in SSSP> <edge to be removed> " +
System.err.println("Usage: IncrementalSSSP <vertex path> <edge path> <edges in SSSP> " +
"<src id edge to be removed> <trg id edge to be removed> <val edge to be removed> " +
"<output path> <max iterations>");
return IncrementalSSSPData.getDefaultEdgeToBeRemoved();
}
......
......@@ -34,7 +34,7 @@ public class IncrementalSSSPData {
public static final int NUM_VERTICES = 5;
public static final String VERTICES = "1,6.0\n" + "2,2.0\n" + "3,3.0\n" + "4,1.0\n" + "5, 0.0";
public static final String VERTICES = "1,6.0\n" + "2,2.0\n" + "3,3.0\n" + "4,1.0\n" + "5,0.0";
public static DataSet<Vertex<Long, Double>> getDefaultVertexDataSet(ExecutionEnvironment env) {
......@@ -77,7 +77,11 @@ public class IncrementalSSSPData {
return env.fromCollection(edges);
}
public static final String EDGE_TO_BE_REMOVED = "2,5,2.0";
public static final String SRC_EDGE_TO_BE_REMOVED = "2";
public static final String TRG_EDGE_TO_BE_REMOVED = "5";
public static final String VAL_EDGE_TO_BE_REMOVED = "2.0";
public static final Edge<Long, Double> getDefaultEdgeToBeRemoved() {
......
......@@ -29,6 +29,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.EdgeDirection;
import org.apache.flink.graph.InaccessibleMethodException;
import org.apache.flink.graph.Vertex;
import org.apache.flink.types.Value;
import org.apache.flink.util.Collector;
......@@ -50,9 +51,13 @@ public abstract class MessagingFunction<VertexKey, VertexValue, Message, EdgeVal
// inside an iteration.
// --------------------------------------------------------------------------------------------
private long numberOfVertices;
private long numberOfVertices = -1L;
public long getNumberOfVertices() {
public long getNumberOfVertices() throws Exception{
if (numberOfVertices == -1) {
throw new InaccessibleMethodException("The number of vertices option is not set. " +
"To access the number of vertices, call iterationConfiguration.setOptNumVertices(true).");
}
return numberOfVertices;
}
......
......@@ -95,31 +95,62 @@ public class VertexCentricConfiguration extends IterationConfiguration {
return this.bcVarsMessaging;
}
// ----------------------------------------------------------------------------------
// The direction, degrees and the total number of vertices should be optional.
// The user can access them by setting the direction, degrees or the numVertices options.
// ----------------------------------------------------------------------------------
/**
* Gets whether the degrees option is set.
* By default, the degrees option is not set.
*
* @return True, if the degree option is set, false otherwise.
*/
public boolean isOptDegrees() {
return optDegrees;
}
/**
* Sets the degree option.
* By default, the degrees option is not set.
*
* @param optDegrees True, to set this option, false otherwise.
*/
public void setOptDegrees(boolean optDegrees) {
this.optDegrees = optDegrees;
}
/**
* Gets whether the number of vertices option is set.
* By default, the number of vertices option is not set.
*
* @return True, if the number of vertices option is set, false otherwise.
*/
public boolean isOptNumVertices() {
return optNumVertices;
}
/**
* Sets the number of vertices option.
* By default, the number of vertices option is not set.
*
* @param optNumVertices True, to set this option, false otherwise.
*/
public void setOptNumVertices(boolean optNumVertices) {
this.optNumVertices = optNumVertices;
}
/**
* Gets the direction in which messages are sent in the MessagingFunction.
* By default the messaging direction is OUT.
*
* @return an EdgeDirection, which can be either IN, OUT or ALL.
*/
public EdgeDirection getDirection() {
return direction;
}
/**
* Sets the direction in which messages are sent in the MessagingFunction.
* By default the messaging direction is OUT.
*
* @param direction - IN, OUT or ALL
*/
public void setDirection(EdgeDirection direction) {
this.direction = direction;
}
......
......@@ -67,8 +67,9 @@ import com.google.common.base.Preconditions;
* edges of the vertex. The outgoing edges may optionally have an associated value, such as a weight.</li>
* </ul>
* <p>
* Vertex-centric graph iterations are instantiated by the
* {@link #withEdges(DataSet, VertexUpdateFunction, MessagingFunction, int)} method.
*
* Vertex-centric graph iterations are are run by calling
* {@link Graph#runVertexCentricIteration(VertexUpdateFunction, MessagingFunction, int)}.
*
* @param <VertexKey> The type of the vertex key (the vertex identifier).
* @param <VertexValue> The type of the vertex value (the state of the vertex).
......@@ -156,7 +157,7 @@ public class VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue>
// check whether the numVertices option is set and, if so, compute the total number of vertices
// and set it within the messaging and update functions
if (this.configuration !=null && this.configuration.isOptNumVertices()) {
if (this.configuration != null && this.configuration.isOptNumVertices()) {
try {
long numberOfVertices = graph.numberOfVertices();
messagingFunction.setNumberOfVertices(numberOfVertices);
......@@ -175,14 +176,12 @@ public class VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue>
// retrieve the direction in which the updates are made and in which the messages are sent
EdgeDirection messagingDirection = messagingFunction.getDirection();
DataSet<Tuple2<VertexKey, Message>> messages = null;
// check whether the degrees option is set and, if so, compute the in and the out degrees and
// add them to the vertex value
if(this.configuration != null && this.configuration.isOptDegrees()) {
return createResultVerticesWithDegrees(graph, messagingDirection, messages, messageTypeInfo);
return createResultVerticesWithDegrees(graph, messagingDirection, messageTypeInfo);
} else {
return createResultSimpleVertex(messagingDirection, messages, messageTypeInfo);
return createResultSimpleVertex(messagingDirection, messageTypeInfo);
}
}
......@@ -554,13 +553,13 @@ public class VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue>
* Creates the operator that represents this vertex centric graph computation for a simple vertex.
*
* @param messagingDirection
* @param messages
* @param messageTypeInfo
* @return the operator
*/
private DataSet<Vertex<VertexKey, VertexValue>> createResultSimpleVertex(EdgeDirection messagingDirection,
DataSet<Tuple2<VertexKey, Message>> messages,
TypeInformation<Tuple2<VertexKey, Message>> messageTypeInfo) {
DataSet<Tuple2<VertexKey, Message>> messages;
TypeInformation<Vertex<VertexKey, VertexValue>> vertexTypes = initialVertices.getType();
final DeltaIteration<Vertex<VertexKey, VertexValue>, Vertex<VertexKey, VertexValue>> iteration =
......@@ -599,16 +598,16 @@ public class VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue>
*
* @param graph
* @param messagingDirection
* @param messages
* @param messageTypeInfo
* @return the operator
*/
private DataSet<Vertex<VertexKey, VertexValue>> createResultVerticesWithDegrees(
Graph<VertexKey, VertexValue, EdgeValue> graph,
EdgeDirection messagingDirection,
DataSet<Tuple2<VertexKey, Message>> messages,
TypeInformation<Tuple2<VertexKey, Message>> messageTypeInfo) {
DataSet<Tuple2<VertexKey, Message>> messages;
this.updateFunction.setOptDegrees(this.configuration.isOptDegrees());
DataSet<Tuple2<VertexKey, Long>> inDegrees = graph.inDegrees();
......
......@@ -24,6 +24,7 @@ import java.util.Collection;
import org.apache.flink.api.common.aggregators.Aggregator;
import org.apache.flink.api.common.functions.IterationRuntimeContext;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.graph.InaccessibleMethodException;
import org.apache.flink.graph.Vertex;
import org.apache.flink.types.Value;
import org.apache.flink.util.Collector;
......@@ -46,9 +47,13 @@ public abstract class VertexUpdateFunction<VertexKey, VertexValue, Message> impl
// inside an iteration.
// --------------------------------------------------------------------------------------------
private long numberOfVertices;
private long numberOfVertices = -1L;
public long getNumberOfVertices() {
public long getNumberOfVertices() throws Exception{
if (numberOfVertices == -1) {
throw new InaccessibleMethodException("The number of vertices option is not set. " +
"To access the number of vertices, call iterationConfiguration.setOptNumVertices(true).");
}
return numberOfVertices;
}
......
......@@ -249,6 +249,28 @@ public class TestGraphUtils {
return vertices;
}
public static final List<Vertex<Long, Boolean>> getLongBooleanVertices() {
List<Vertex<Long, Boolean>> vertices = new ArrayList<Vertex<Long, Boolean>>();
vertices.add(new Vertex<Long, Boolean>(1L, true));
vertices.add(new Vertex<Long, Boolean>(2L, true));
vertices.add(new Vertex<Long, Boolean>(3L, true));
vertices.add(new Vertex<Long, Boolean>(4L, true));
vertices.add(new Vertex<Long, Boolean>(5L, true));
return vertices;
}
public static final List<Vertex<Long, Tuple3<Long, Long, Boolean>>> getLongVerticesWithDegrees() {
List<Vertex<Long, Tuple3<Long, Long, Boolean>>> vertices = new ArrayList<Vertex<Long, Tuple3<Long, Long, Boolean>>>();
vertices.add(new Vertex<Long, Tuple3<Long, Long, Boolean>>(1L, new Tuple3<Long, Long, Boolean>(1L, 2L, true)));
vertices.add(new Vertex<Long, Tuple3<Long, Long, Boolean>>(2L, new Tuple3<Long, Long, Boolean>(1L, 1L, true)));
vertices.add(new Vertex<Long, Tuple3<Long, Long, Boolean>>(3L, new Tuple3<Long, Long, Boolean>(2L, 2L, true)));
vertices.add(new Vertex<Long, Tuple3<Long, Long, Boolean>>(4L, new Tuple3<Long, Long, Boolean>(1L, 1L, true)));
vertices.add(new Vertex<Long, Tuple3<Long, Long, Boolean>>(5L, new Tuple3<Long, Long, Boolean>(2L, 1L, true)));
return vertices;
}
public static final DataSet<Edge<Long, Long>> getDisconnectedLongLongEdgeData(
ExecutionEnvironment env) {
List<Edge<Long, Long>> edges = new ArrayList<Edge<Long, Long>>();
......
......@@ -18,6 +18,7 @@
package org.apache.flink.graph.test;
import java.util.HashSet;
import java.util.List;
import org.apache.flink.api.common.aggregators.LongSumAggregator;
......@@ -25,6 +26,10 @@ import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.EdgeDirection;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.IterationConfiguration;
......@@ -130,6 +135,253 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
"5 15";
}
@Test
public void testIterationDefaultDirection() throws Exception {
/*
* Test that if no direction parameter is given, the iteration works as before
* (i.e. it collects messages from the in-neighbors and sends them to the out-neighbors)
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, HashSet<Long>, Long> graph = Graph
.fromCollection(TestGraphUtils.getLongLongVertices(), TestGraphUtils.getLongLongEdges(), env)
.mapVertices(new InitialiseHashSetMapper());
DataSet<Vertex<Long, HashSet<Long>>> resultedVertices = graph
.runVertexCentricIteration(new VertexUpdateDirection(), new IdMessengerTrg(), 5)
.getVertices();
resultedVertices.writeAsCsv(resultPath, "\n", "\t");
env.execute();
expectedResult = "1 [5]\n" +
"2 [1]\n" +
"3 [1, 2]\n" +
"4 [3]\n" +
"5 [3, 4]";
}
@Test
public void testIterationINDirection() throws Exception {
/*
* Test that if the direction parameter is set to IN,
* messages are collected from the out-neighbors and sent to the in-neighbors.
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, HashSet<Long>, Long> graph = Graph
.fromCollection(TestGraphUtils.getLongLongVertices(), TestGraphUtils.getLongLongEdges(), env)
.mapVertices(new InitialiseHashSetMapper());
// configure the iteration
IterationConfiguration parameters = new IterationConfiguration();
parameters.setDirection(EdgeDirection.IN);
DataSet<Vertex<Long, HashSet<Long>>> resultedVertices = graph
.runVertexCentricIteration(new VertexUpdateDirection(), new IdMessengerSrc(), 5, parameters)
.getVertices();
resultedVertices.writeAsCsv(resultPath, "\n", "\t");
env.execute();
expectedResult = "1 [2, 3]\n" +
"2 [3]\n" +
"3 [4, 5]\n" +
"4 [5]\n" +
"5 [1]";
}
@Test
public void testIterationALLDirection() throws Exception {
/*
* Test that if the direction parameter is set to ALL,
* messages are collected from all the neighbors and sent to all the neighbors.
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, HashSet<Long>, Long> graph = Graph
.fromCollection(TestGraphUtils.getLongLongVertices(), TestGraphUtils.getLongLongEdges(), env)
.mapVertices(new InitialiseHashSetMapper());
// configure the iteration
IterationConfiguration parameters = new IterationConfiguration();
parameters.setDirection(EdgeDirection.ALL);
DataSet<Vertex<Long, HashSet<Long>>> resultedVertices = graph
.runVertexCentricIteration(new VertexUpdateDirection(), new IdMessengerAll(), 5, parameters)
.getVertices();
resultedVertices.writeAsCsv(resultPath, "\n", "\t");
env.execute();
expectedResult = "1 [2, 3, 5]\n" +
"2 [1, 3]\n" +
"3 [1, 2, 4, 5]\n" +
"4 [3, 5]\n" +
"5 [1, 3, 4]";
}
@Test
public void testInDegreesSet() throws Exception {
/*
* Test that if the degrees are set, the in degrees can be accessed in every superstep and the value
* is correctly computed.
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(),
TestGraphUtils.getLongLongEdges(), env);
// configure the iteration
IterationConfiguration parameters = new IterationConfiguration();
parameters.setOptDegrees(true);
DataSet<Vertex<Long, Long>> verticesWithInDegree = graph.runVertexCentricIteration(new UpdateFunctionInDegree(),
new DummyMessageFunction(), 5, parameters).getVertices();
verticesWithInDegree.writeAsCsv(resultPath, "\n", "\t");
env.execute();
expectedResult = "1 1\n" +
"2 1\n" +
"3 2\n" +
"4 1\n" +
"5 2";
}
@Test
public void testOutDegreesSet() throws Exception {
/*
* Test that if the degrees are set, the out degrees can be accessed in every superstep and the value
* is correctly computed.
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(),
TestGraphUtils.getLongLongEdges(), env);
// configure the iteration
IterationConfiguration parameters = new IterationConfiguration();
parameters.setOptDegrees(true);
DataSet<Vertex<Long, Long>> verticesWithOutDegree = graph.runVertexCentricIteration(new UpdateFunctionOutDegree(),
new DummyMessageFunction(), 5, parameters).getVertices();
verticesWithOutDegree.writeAsCsv(resultPath, "\n", "\t");
env.execute();
expectedResult = "1 2\n" +
"2 1\n" +
"3 2\n" +
"4 1\n" +
"5 1";
}
@Test
public void testNumVerticesSet() throws Exception {
/*
* Test that if the number of vertices option is set, it can be accessed in every superstep and the value
* is correctly computed.
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(),
TestGraphUtils.getLongLongEdges(), env);
// configure the iteration
IterationConfiguration parameters = new IterationConfiguration();
parameters.setOptNumVertices(true);
DataSet<Vertex<Long, Long>> verticesWithNumVertices = graph.runVertexCentricIteration(new UpdateFunctionNumVertices(),
new DummyMessageFunction(), 5, parameters).getVertices();
verticesWithNumVertices.writeAsCsv(resultPath, "\n", "\t");
env.execute();
expectedResult = "1 5\n" +
"2 5\n" +
"3 5\n" +
"4 5\n" +
"5 5";
}
@Test
public void testDegrees() throws Exception {
/*
* Test that if the degrees are set, they can be accessed in every superstep and the value
* is correctly computed for both in and out degrees.
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, Tuple3<Long, Long, Boolean>, Long> graph = Graph.fromCollection(TestGraphUtils.getLongVerticesWithDegrees(),
TestGraphUtils.getLongLongEdges(), env);
// configure the iteration
IterationConfiguration parameters = new IterationConfiguration();
parameters.setOptDegrees(true);
DataSet<Vertex<Long, Tuple3<Long, Long, Boolean>>> verticesWithDegrees = graph.runVertexCentricIteration(
new UpdateFunctionDegrees(), new DegreeMessageFunction(), 5, parameters).getVertices();
verticesWithDegrees.map(new MapFunction<Vertex<Long,Tuple3<Long,Long,Boolean>>, Tuple2<Long, Boolean>>() {
@Override
public Tuple2<Long, Boolean> map(Vertex<Long, Tuple3<Long, Long, Boolean>> vertex) throws Exception {
return new Tuple2<Long, Boolean>(vertex.getId(), vertex.getValue().f2);
}
}).writeAsCsv(resultPath, "\n", "\t");
env.execute();
expectedResult = "1 true\n" +
"2 true\n" +
"3 true\n" +
"4 true\n" +
"5 true";
}
@Test
public void testDirectionALLAndDegrees() throws Exception {
/*
* Compute the number of neighbors in a vertex - centric manner, and verify that it is equal to
* the sum: inDegree + outDegree.
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, Boolean, Long> graph = Graph.fromCollection(TestGraphUtils.getLongBooleanVertices(),
TestGraphUtils.getLongLongEdges(), env);
// configure the iteration
IterationConfiguration parameters = new IterationConfiguration();
parameters.setOptDegrees(true);
parameters.setDirection(EdgeDirection.ALL);
DataSet<Vertex<Long, Boolean>> verticesWithNumNeighbors = graph.runVertexCentricIteration(
new VertexUpdateNumNeighbors(), new IdMessenger(), 1, parameters).getVertices();
verticesWithNumNeighbors.writeAsCsv(resultPath, "\n", "\t");
env.execute();
expectedResult = "1 true\n" +
"2 true\n" +
"3 true\n" +
"4 true\n" +
"5 true";
}
@SuppressWarnings("serial")
public static final class UpdateFunction extends VertexUpdateFunction<Long, Long, Long> {
......@@ -184,6 +436,45 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
}
}
@SuppressWarnings("serial")
public static final class UpdateFunctionInDegree extends VertexUpdateFunction<Long, Long, Long> {
@Override
public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages) {
try {
setNewVertexValue(vertex.getInDegree());
} catch (Exception e) {
e.printStackTrace();
}
}
}
@SuppressWarnings("serial")
public static final class UpdateFunctionOutDegree extends VertexUpdateFunction<Long, Long, Long> {
@Override
public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages) {
try {
setNewVertexValue(vertex.getOutDegree());
} catch (Exception e) {
e.printStackTrace();
}
}
}
@SuppressWarnings("serial")
public static final class UpdateFunctionNumVertices extends VertexUpdateFunction<Long, Long, Long> {
@Override
public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages) {
try {
setNewVertexValue(getNumberOfVertices());
} catch (Exception e) {
e.printStackTrace();
}
}
}
@SuppressWarnings("serial")
public static final class DummyUpdateFunction extends VertexUpdateFunction<Long, Long, Long> {
......@@ -203,6 +494,115 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
}
}
@SuppressWarnings("serial")
public static final class VertexUpdateDirection extends VertexUpdateFunction<Long, HashSet<Long>,
Long> {
@Override
public void updateVertex(Vertex<Long, HashSet<Long>> vertex, MessageIterator<Long> messages) throws Exception {
vertex.getValue().clear();
for(long msg : messages) {
vertex.getValue().add(msg);
}
setNewVertexValue(vertex.getValue());
}
}
@SuppressWarnings("serial")
public static final class VertexUpdateNumNeighbors extends VertexUpdateFunction<Long, Boolean,
Long> {
@Override
public void updateVertex(Vertex<Long, Boolean> vertex, MessageIterator<Long> messages) throws Exception {
long count = 0;
for(long msg : messages) {
count++;
}
setNewVertexValue(count == (vertex.getInDegree() + vertex.getOutDegree()));
}
}
@SuppressWarnings("serial")
public static final class UpdateFunctionDegrees extends VertexUpdateFunction<Long, Tuple3<Long, Long, Boolean>, Long> {
@Override
public void updateVertex(Vertex<Long, Tuple3<Long, Long, Boolean>> vertex, MessageIterator<Long> inMessages) {
try {
setNewVertexValue(new Tuple3(vertex.getValue().f0, vertex.getValue().f1, (vertex.getInDegree() == vertex.getValue().f0)
&& (vertex.getOutDegree() == vertex.getValue().f1) && vertex.getValue().f2));
} catch (Exception e) {
e.printStackTrace();
}
}
}
@SuppressWarnings("serial")
public static final class IdMessengerSrc extends MessagingFunction<Long, HashSet<Long>, Long, Long> {
@Override
public void sendMessages(Vertex<Long, HashSet<Long>> vertex) throws Exception {
for (Edge<Long, Long> edge : getEdges()) {
sendMessageTo(edge.getSource(), vertex.getId());
}
}
}
@SuppressWarnings("serial")
public static final class IdMessengerAll extends MessagingFunction<Long, HashSet<Long>, Long, Long> {
@Override
public void sendMessages(Vertex<Long, HashSet<Long>> vertex) throws Exception {
for (Edge<Long, Long> edge : getEdges()) {
if(edge.getSource() != vertex.getId()) {
sendMessageTo(edge.getSource(), vertex.getId());
} else {
sendMessageTo(edge.getTarget(), vertex.getId());
}
}
}
}
@SuppressWarnings("serial")
public static final class IdMessenger extends MessagingFunction<Long, Boolean, Long, Long> {
@Override
public void sendMessages(Vertex<Long, Boolean> vertex) throws Exception {
for (Edge<Long, Long> edge : getEdges()) {
if(edge.getSource() != vertex.getId()) {
sendMessageTo(edge.getSource(), vertex.getId());
} else {
sendMessageTo(edge.getTarget(), vertex.getId());
}
}
}
}
@SuppressWarnings("serial")
public static final class IdMessengerTrg extends MessagingFunction<Long, HashSet<Long>, Long, Long> {
@Override
public void sendMessages(Vertex<Long, HashSet<Long>> vertex) throws Exception {
for (Edge<Long, Long> edge : getEdges()) {
sendMessageTo(edge.getTarget(), vertex.getId());
}
}
}
@SuppressWarnings("serial")
public static final class DegreeMessageFunction extends MessagingFunction<Long, Tuple3<Long, Long, Boolean>, Long, Long> {
@Override
public void sendMessages(Vertex<Long, Tuple3<Long, Long, Boolean>> vertex) {
//send message to keep vertices active
sendMessageToAllNeighbors(vertex.getValue().f0);
}
}
@SuppressWarnings("serial")
public static final class AssignOneMapper implements MapFunction<Vertex<Long, Long>, Long> {
......@@ -210,4 +610,13 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
return 1l;
}
}
@SuppressWarnings("serial")
public static final class InitialiseHashSetMapper implements MapFunction<Vertex<Long, Long>, HashSet<Long>> {
@Override
public HashSet<Long> map(Vertex<Long, Long> value) throws Exception {
return new HashSet<Long>();
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.graph.test;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.spargel.MessageIterator;
import org.apache.flink.graph.spargel.MessagingFunction;
import org.apache.flink.graph.spargel.VertexUpdateFunction;
import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.junit.Assert.fail;
public class VertexCentricConfigurationWithExceptionITCase {
private static final int PARALLELISM = 4;
private static ForkableFlinkMiniCluster cluster;
@BeforeClass
public static void setupCluster() {
try {
Configuration config = new Configuration();
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM);
cluster = new ForkableFlinkMiniCluster(config, false);
}
catch (Exception e) {
e.printStackTrace();
fail("Error starting test cluster: " + e.getMessage());
}
}
@AfterClass
public static void tearDownCluster() {
try {
cluster.stop();
}
catch (Throwable t) {
t.printStackTrace();
fail("Cluster shutdown caused an exception: " + t.getMessage());
}
}
@Test
public void testOutDegreesNotSet() throws Exception {
/*
* Test that if the degrees are not set, the out degrees cannot be accessed - an
* exception is thrown.
*/
final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
"localhost", cluster.getJobManagerRPCPort());
env.setParallelism(PARALLELISM);
env.getConfig().disableSysoutLogging();
Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(),
TestGraphUtils.getLongLongEdges(), env);
try {
DataSet<Vertex<Long, Long>> verticesWithOutDegrees = graph.runVertexCentricIteration(new UpdateFunctionOutDegree(),
new DummyMessageFunction(), 5).getVertices();
verticesWithOutDegrees.output(new DiscardingOutputFormat<Vertex<Long, Long>>());
env.execute();
fail("The degree option not set test did not fail");
} catch (Exception e) {
// We expect the job to fail with an exception
}
}
@Test
public void testInDegreesNotSet() throws Exception {
/*
* Test that if the degrees are not set, the in degrees cannot be accessed - an
* exception is thrown.
*/
final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
"localhost", cluster.getJobManagerRPCPort());
env.setParallelism(PARALLELISM);
env.getConfig().disableSysoutLogging();
Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(),
TestGraphUtils.getLongLongEdges(), env);
try {
DataSet<Vertex<Long, Long>> verticesWithInDegrees = graph.runVertexCentricIteration(new UpdateFunctionInDegree(),
new DummyMessageFunction(), 5).getVertices();
verticesWithInDegrees.output(new DiscardingOutputFormat<Vertex<Long, Long>>());
env.execute();
fail("The degree option not set test did not fail");
} catch (Exception e) {
// We expect the job to fail with an exception
}
}
@Test
public void testNumVerticesNotSet() throws Exception {
/*
* Test that if the number of vertices option is not set, this number cannot be accessed -
* an exception id thrown.
*/
final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
"localhost", cluster.getJobManagerRPCPort());
env.setParallelism(PARALLELISM);
env.getConfig().disableSysoutLogging();
Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(),
TestGraphUtils.getLongLongEdges(), env);
try {
DataSet<Vertex<Long, Long>> verticesWithNumVertices = graph.runVertexCentricIteration(new UpdateFunctionNumVertices(),
new DummyMessageFunction(), 5).getVertices();
verticesWithNumVertices.output(new DiscardingOutputFormat<Vertex<Long, Long>>());
env.execute();
fail("The num vertices option not set test did not fail");
} catch (Exception e) {
// We expect the job to fail with an exception
}
}
@SuppressWarnings("serial")
public static final class UpdateFunctionInDegree extends VertexUpdateFunction<Long, Long, Long> {
@Override
public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages) throws Exception {
setNewVertexValue(vertex.getInDegree());
}
}
@SuppressWarnings("serial")
public static final class UpdateFunctionOutDegree extends VertexUpdateFunction<Long, Long, Long> {
@Override
public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages) throws Exception {
setNewVertexValue(vertex.getOutDegree());
}
}
@SuppressWarnings("serial")
public static final class UpdateFunctionNumVertices extends VertexUpdateFunction<Long, Long, Long> {
@Override
public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages) throws Exception {
setNewVertexValue(getNumberOfVertices());
}
}
@SuppressWarnings("serial")
public static final class DummyMessageFunction extends MessagingFunction<Long, Long, Long, Long> {
@Override
public void sendMessages(Vertex<Long, Long> vertex) {
//send message to keep vertices active
sendMessageToAllNeighbors(vertex.getValue());
}
}
}
......@@ -20,8 +20,15 @@ package org.apache.flink.graph.test.example;
import com.google.common.base.Charsets;
import com.google.common.io.Files;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.EdgeDirection;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.example.IncrementalSSSPExample;
import org.apache.flink.graph.example.utils.IncrementalSSSPData;
import org.apache.flink.graph.spargel.IterationConfiguration;
import org.apache.flink.test.util.MultipleProgramsTestBase;
import org.junit.After;
import org.junit.Before;
......@@ -73,10 +80,52 @@ public class IncrementalSSSPITCase extends MultipleProgramsTestBase {
@Test
public void testIncrementalSSSPExample() throws Exception {
IncrementalSSSPExample.main(new String[]{verticesPath, edgesPath, edgesInSSSPPath,
IncrementalSSSPData.EDGE_TO_BE_REMOVED, resultPath, IncrementalSSSPData.NUM_VERTICES + ""});
IncrementalSSSPData.SRC_EDGE_TO_BE_REMOVED, IncrementalSSSPData.TRG_EDGE_TO_BE_REMOVED,
IncrementalSSSPData.VAL_EDGE_TO_BE_REMOVED,resultPath, IncrementalSSSPData.NUM_VERTICES + ""});
expected = IncrementalSSSPData.RESULTED_VERTICES;
}
@Test
public void testIncrementalSSSPNonSPEdge() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Vertex<Long, Double>> vertices = IncrementalSSSPData.getDefaultVertexDataSet(env);
DataSet<Edge<Long, Double>> edges = IncrementalSSSPData.getDefaultEdgeDataSet(env);
DataSet<Edge<Long, Double>> edgesInSSSP = IncrementalSSSPData.getDefaultEdgesInSSSP(env);
// the edge to be removed is a non-SP edge
Edge<Long, Double> edgeToBeRemoved = new Edge<Long, Double>(3L, 5L, 5.0);
Graph<Long, Double, Double> graph = Graph.fromDataSet(vertices, edges, env);
// Assumption: all minimum weight paths are kept
Graph<Long, Double, Double> ssspGraph = Graph.fromDataSet(vertices, edgesInSSSP, env);
// remove the edge
graph.removeEdge(edgeToBeRemoved);
// configure the iteration
IterationConfiguration parameters = new IterationConfiguration();
if(IncrementalSSSPExample.isInSSSP(edgeToBeRemoved, edgesInSSSP)) {
parameters.setDirection(EdgeDirection.IN);
parameters.setOptDegrees(true);
// run the vertex centric iteration to propagate info
Graph<Long, Double, Double> result = ssspGraph.runVertexCentricIteration(
new IncrementalSSSPExample.VertexDistanceUpdater(),
new IncrementalSSSPExample.InvalidateMessenger(edgeToBeRemoved),
IncrementalSSSPData.NUM_VERTICES, parameters);
DataSet<Vertex<Long, Double>> resultedVertices = result.getVertices();
resultedVertices.writeAsCsv(resultPath, "\n", ",");
env.execute();
} else {
vertices.writeAsCsv(resultPath, "\n", ",");
env.execute();
}
expected = IncrementalSSSPData.VERTICES;
}
@After
public void after() throws Exception {
compareResultsByLinesInMemory(expected, resultPath);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册