Licensed to the Apache Software Foundation (ASF) under one
......@@ -677,7 +676,7 @@ The vertex-centric model, also known as "think like a vertex" model, expresses c
* <strong>Messaging</strong>: produce the messages that a vertex will send to other vertices.
* <strong>Value Update</strong>: update the vertex value using the received messages.
Gelly wraps Flink's [Spargel API](spargel_guide.html) to provide methods for vertex-centric iterations. The user only needs to implement two functions, corresponding to the phases above: a `VertexUpdateFunction`, which defines how a vertex will update its value based on the received messages and a `MessagingFunction`, which allows a vertex to send out messages for the next superstep.
Gelly provides methods for vertex-centric iterations. The user only needs to implement two functions, corresponding to the phases above: a `VertexUpdateFunction`, which defines how a vertex will update its value based on the received messages and a `MessagingFunction`, which allows a vertex to send out messages for the next superstep.
These functions and the maximum number of iterations to run are given as parameters to Gelly's `runVertexCentricIteration`. This method will execute the vertex-centric iteration on the input Graph and return a new Graph, with updated vertex values.
A vertex-centric iteration can be extended with information such as the total number of vertices, the in degree and out degree.
......@@ -1430,114 +1429,3 @@ env.execute
......@@ -17,13 +17,14 @@
package org.apache.flink.spargel.java;
package org.apache.flink.graph.spargel;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.operators.util.FieldList;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
......@@ -31,6 +32,9 @@ import org.apache.flink.optimizer.util.CompilerTestBase;
import org.junit.Test;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.optimizer.plan.DualInputPlanNode;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.PlanNode;
......@@ -38,13 +42,16 @@ import org.apache.flink.optimizer.plan.SinkPlanNode;
import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.apache.flink.runtime.operators.util.LocalStrategy;
import org.apache.flink.spargel.java.examples.SpargelConnectedComponents.CCMessager;
import org.apache.flink.spargel.java.examples.SpargelConnectedComponents.CCUpdater;
import org.apache.flink.spargel.java.examples.SpargelConnectedComponents.IdAssigner;
import org.apache.flink.types.NullValue;
import org.apache.flink.graph.library.ConnectedComponents;
import org.apache.flink.graph.utils.Tuple2ToVertexMap;
public class SpargelCompilerTest extends CompilerTestBase {
private static final long serialVersionUID = 1L;
public void testSpargelCompiler() {
try {
......@@ -52,15 +59,27 @@ public class SpargelCompilerTest extends CompilerTestBase {
// compose test program
DataSet<Long> vertexIds = env.generateSequence(1, 2);
DataSet<Tuple2<Long, Long>> edges = env.fromElements(new Tuple2<Long, Long>(1L, 2L));
DataSet<Vertex<Long, Long>> initialVertices = env.fromElements(
new Tuple2<Long, Long>(1L, 1L), new Tuple2<Long, Long>(2L, 2L))
.map(new Tuple2ToVertexMap<Long, Long>());
DataSet<Edge<Long, NullValue>> edges = env.fromElements(new Tuple2<Long, Long>(1L, 2L))
.map(new MapFunction<Tuple2<Long,Long>, Edge<Long, NullValue>>() {
public Edge<Long, NullValue> map(Tuple2<Long, Long> edge) {
return new Edge<Long, NullValue>(edge.f0, edge.f1, NullValue.getInstance());
Graph<Long, Long, NullValue> graph = Graph.fromDataSet(initialVertices, edges, env);
DataSet<Tuple2<Long, Long>> initialVertices = vertexIds.map(new IdAssigner());
DataSet<Tuple2<Long, Long>> result = initialVertices.runOperation(VertexCentricIteration.withPlainEdges(edges, new CCUpdater(), new CCMessager(), 100));
DataSet<Vertex<Long, Long>> result = graph.runVertexCentricIteration(
new ConnectedComponents.CCUpdater<Long>(),
new ConnectedComponents.CCMessenger<Long>(), 100)
result.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
result.output(new DiscardingOutputFormat<Vertex<Long, Long>>());
Plan p = env.createProgramPlan("Spargel Connected Components");
......@@ -110,6 +129,7 @@ public class SpargelCompilerTest extends CompilerTestBase {
public void testSpargelCompilerWithBroadcastVariable() {
try {
......@@ -121,21 +141,32 @@ public class SpargelCompilerTest extends CompilerTestBase {
// compose test program
DataSet<Long> bcVar = env.fromElements(1L);
DataSet<Long> vertexIds = env.generateSequence(1, 2);
DataSet<Tuple2<Long, Long>> edges = env.fromElements(new Tuple2<Long, Long>(1L, 2L));
DataSet<Tuple2<Long, Long>> initialVertices = vertexIds.map(new IdAssigner());
VertexCentricIteration<Long, Long, Long, ?> vcIter = VertexCentricIteration.withPlainEdges(edges, new CCUpdater(), new CCMessager(), 100);
vcIter.addBroadcastSetForMessagingFunction(BC_VAR_NAME, bcVar);
vcIter.addBroadcastSetForUpdateFunction(BC_VAR_NAME, bcVar);
DataSet<Tuple2<Long, Long>> result = initialVertices.runOperation(vcIter);
result.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
DataSet<Vertex<Long, Long>> initialVertices = env.fromElements(
new Tuple2<Long, Long>(1L, 1L), new Tuple2<Long, Long>(2L, 2L))
.map(new Tuple2ToVertexMap<Long, Long>());
DataSet<Edge<Long, NullValue>> edges = env.fromElements(new Tuple2<Long, Long>(1L, 2L))
.map(new MapFunction<Tuple2<Long,Long>, Edge<Long, NullValue>>() {
public Edge<Long, NullValue> map(Tuple2<Long, Long> edge) {
return new Edge<Long, NullValue>(edge.f0, edge.f1, NullValue.getInstance());
Graph<Long, Long, NullValue> graph = Graph.fromDataSet(initialVertices, edges, env);
VertexCentricConfiguration parameters = new VertexCentricConfiguration();
parameters.addBroadcastSetForMessagingFunction(BC_VAR_NAME, bcVar);
parameters.addBroadcastSetForUpdateFunction(BC_VAR_NAME, bcVar);
DataSet<Vertex<Long, Long>> result = graph.runVertexCentricIteration(
new ConnectedComponents.CCUpdater<Long>(),
new ConnectedComponents.CCMessenger<Long>(), 100)
result.output(new DiscardingOutputFormat<Vertex<Long, Long>>());
Plan p = env.createProgramPlan("Spargel Connected Components");
......@@ -17,7 +17,7 @@
package org.apache.flink.spargel.java;
package org.apache.flink.graph.spargel;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
......@@ -26,12 +26,18 @@ import static org.junit.Assert.fail;
import org.junit.Test;
import org.apache.flink.api.common.aggregators.LongSumAggregator;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.java.operators.DeltaIterationResultSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.TwoInputUdfOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.types.NullValue;
public class SpargelTranslationTest {
......@@ -57,29 +63,36 @@ public class SpargelTranslationTest {
DataSet<Long> bcMessaging = env.fromElements(1L);
DataSet<Long> bcUpdate = env.fromElements(1L);
DataSet<Tuple2<String, Double>> result;
DataSet<Vertex<String, Double>> result;
// ------------ construct the test program ------------------
DataSet<Tuple2<String, Double>> initialVertices = env.fromElements(new Tuple2<String, Double>("abc", 3.44));
DataSet<Tuple2<String, String>> edges = env.fromElements(new Tuple2<String, String>("a", "c"));
VertexCentricIteration<String, Double, Long, ?> vertexIteration =
VertexCentricIteration.withPlainEdges(edges, new UpdateFunction(), new MessageFunctionNoEdgeValue(), NUM_ITERATIONS);
vertexIteration.addBroadcastSetForMessagingFunction(BC_SET_MESSAGES_NAME, bcMessaging);
vertexIteration.addBroadcastSetForUpdateFunction(BC_SET_UPDATES_NAME, bcUpdate);
vertexIteration.registerAggregator(AGGREGATOR_NAME, new LongSumAggregator());
result = initialVertices.runOperation(vertexIteration);
Graph<String, Double, NullValue> graph = Graph.fromTupleDataSet(initialVertices,
edges.map(new MapFunction<Tuple2<String,String>, Tuple3<String, String, NullValue>>() {
public Tuple3<String, String, NullValue> map(
Tuple2<String, String> edge) {
return new Tuple3<String, String, NullValue>(edge.f0, edge.f1, NullValue.getInstance());
}), env);
VertexCentricConfiguration parameters = new VertexCentricConfiguration();
parameters.addBroadcastSetForMessagingFunction(BC_SET_MESSAGES_NAME, bcMessaging);
parameters.addBroadcastSetForUpdateFunction(BC_SET_UPDATES_NAME, bcUpdate);
parameters.registerAggregator(AGGREGATOR_NAME, new LongSumAggregator());
result = graph.runVertexCentricIteration(new UpdateFunction(), new MessageFunctionNoEdgeValue(),
NUM_ITERATIONS, parameters).getVertices();
result.output(new DiscardingOutputFormat<Vertex<String, Double>>());
......@@ -136,29 +149,36 @@ public class SpargelTranslationTest {
DataSet<Long> bcVar = env.fromElements(1L);
DataSet<Tuple2<String, Double>> result;
DataSet<Vertex<String, Double>> result;
// ------------ construct the test program ------------------
DataSet<Tuple2<String, Double>> initialVertices = env.fromElements(new Tuple2<String, Double>("abc", 3.44));
DataSet<Tuple2<String, String>> edges = env.fromElements(new Tuple2<String, String>("a", "c"));
Graph<String, Double, NullValue> graph = Graph.fromTupleDataSet(initialVertices,
edges.map(new MapFunction<Tuple2<String,String>, Tuple3<String, String, NullValue>>() {
public Tuple3<String, String, NullValue> map(
Tuple2<String, String> edge) {
return new Tuple3<String, String, NullValue>(edge.f0, edge.f1, NullValue.getInstance());
}), env);
VertexCentricConfiguration parameters = new VertexCentricConfiguration();
parameters.addBroadcastSetForMessagingFunction(BC_SET_MESSAGES_NAME, bcVar);
parameters.addBroadcastSetForUpdateFunction(BC_SET_UPDATES_NAME, bcVar);
parameters.registerAggregator(AGGREGATOR_NAME, new LongSumAggregator());
VertexCentricIteration<String, Double, Long, ?> vertexIteration =
VertexCentricIteration.withPlainEdges(edges, new UpdateFunction(), new MessageFunctionNoEdgeValue(), NUM_ITERATIONS);
vertexIteration.addBroadcastSetForMessagingFunction(BC_SET_MESSAGES_NAME, bcVar);
vertexIteration.addBroadcastSetForUpdateFunction(BC_SET_UPDATES_NAME, bcVar);
vertexIteration.registerAggregator(AGGREGATOR_NAME, new LongSumAggregator());
result = initialVertices.runOperation(vertexIteration);
result = graph.runVertexCentricIteration(new UpdateFunction(), new MessageFunctionNoEdgeValue(),
NUM_ITERATIONS, parameters).getVertices();
result.output(new DiscardingOutputFormat<Vertex<String, Double>>());
......@@ -200,12 +220,12 @@ public class SpargelTranslationTest {
public static class UpdateFunction extends VertexUpdateFunction<String, Double, Long> {
public void updateVertex(String vertexKey, Double vertexValue, MessageIterator<Long> inMessages) {}
public void updateVertex(Vertex<String, Double> vertex, MessageIterator<Long> inMessages) {}
public static class MessageFunctionNoEdgeValue extends MessagingFunction<String, Double, Long, Object> {
public static class MessageFunctionNoEdgeValue extends MessagingFunction<String, Double, Long, NullValue> {
public void sendMessages(String vertexKey, Double vertexValue) {}
public void sendMessages(Vertex<String, Double> vertex) {}
<?xml version="1.0" encoding="UTF-8"?>
package org.apache.flink.spargel.java;
import java.util.Iterator;
import org.apache.flink.api.java.tuple.Tuple2;
* An iterator that returns messages. The iterator is {@link java.lang.Iterable} at the same time to support
* the <i>foreach</i> syntax.
public final class MessageIterator<Message> implements Iterator<Message>, Iterable<Message>, java.io.Serializable {
private static final long serialVersionUID = 1L;
private transient Iterator<Tuple2<?, Message>> source;
final void setSource(Iterator<Tuple2<?, Message>> source) {
this.source = source;
public final boolean hasNext() {
return this.source.hasNext();
public final Message next() {
return this.source.next().f1;
public final void remove() {
throw new UnsupportedOperationException();
public Iterator<Message> iterator() {
return this;
package org.apache.flink.spargel.java;
import java.io.Serializable;
import java.util.Collection;
import java.util.Iterator;
import org.apache.flink.api.common.aggregators.Aggregator;
import org.apache.flink.api.common.functions.IterationRuntimeContext;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.spargel.java.OutgoingEdge;
import org.apache.flink.types.Value;
import org.apache.flink.util.Collector;
* The base class for functions that produce messages between vertices as a part of a {@link VertexCentricIteration}.
* @param <VertexKey> The type of the vertex key (the vertex identifier).
* @param <VertexValue> The type of the vertex value (the state of the vertex).
* @param <Message> The type of the message sent between vertices along the edges.
* @param <EdgeValue> The type of the values that are associated with the edges.
public abstract class MessagingFunction<VertexKey extends Comparable<VertexKey>, VertexValue, Message, EdgeValue> implements Serializable {
private static final long serialVersionUID = 1L;
// --------------------------------------------------------------------------------------------
// Public API Methods
// --------------------------------------------------------------------------------------------
* This method is invoked once per superstep for each vertex that was changed in that superstep.
* It needs to produce the messages that will be received by vertices in the next superstep.
* @param vertexKey The key of the vertex that was changed.
* @param vertexValue The value (state) of the vertex that was changed.
* @throws Exception The computation may throw exceptions, which causes the superstep to fail.
public abstract void sendMessages(VertexKey vertexKey, VertexValue vertexValue) throws Exception;
* This method is executed one per superstep before the vertex update function is invoked for each vertex.
* @throws Exception Exceptions in the pre-superstep phase cause the superstep to fail.
public void preSuperstep() throws Exception {}
* This method is executed one per superstep after the vertex update function has been invoked for each vertex.
* @throws Exception Exceptions in the post-superstep phase cause the superstep to fail.
public void postSuperstep() throws Exception {}
* Gets an {@link java.lang.Iterable} with all outgoing edges. This method is mutually exclusive with
* {@link #sendMessageToAllNeighbors(Object)} and may be called only once.
* @return An iterator with all outgoing edges.
public Iterable<OutgoingEdge<VertexKey, EdgeValue>> getOutgoingEdges() {
if (edgesUsed) {
throw new IllegalStateException("Can use either 'getOutgoingEdges()' or 'sendMessageToAllTargets()' exactly once.");
edgesUsed = true;
if (this.edgeWithValueIter != null) {
this.edgeWithValueIter.set((Iterator<Tuple3<VertexKey, VertexKey, EdgeValue>>) edges);
return this.edgeWithValueIter;
} else {
this.edgeNoValueIter.set((Iterator<Tuple2<VertexKey, VertexKey>>) edges);
return this.edgeNoValueIter;
* Sends the given message to all vertices that are targets of an outgoing edge of the changed vertex.
* This method is mutually exclusive to the method {@link #getOutgoingEdges()} and may be called only once.
* @param m The message to send.
public void sendMessageToAllNeighbors(Message m) {
if (edgesUsed) {
throw new IllegalStateException("Can use either 'getOutgoingEdges()' or 'sendMessageToAllTargets()' exactly once.");
edgesUsed = true;
outValue.f1 = m;
while (edges.hasNext()) {
Tuple next = (Tuple) edges.next();
VertexKey k = next.getField(1);
outValue.f0 = k;
* Sends the given message to the vertex identified by the given key. If the target vertex does not exist,
* the next superstep will cause an exception due to a non-deliverable message.
* @param target The key (id) of the target vertex to message.
* @param m The message.
public void sendMessageTo(VertexKey target, Message m) {
outValue.f0 = target;
outValue.f1 = m;
// --------------------------------------------------------------------------------------------
* Gets the number of the superstep, starting at <tt>1</tt>.
* @return The number of the current superstep.
public int getSuperstepNumber() {
return this.runtimeContext.getSuperstepNumber();
* Gets the iteration aggregator registered under the given name. The iteration aggregator is combines
* all aggregates globally once per superstep and makes them available in the next superstep.
* @param name The name of the aggregator.
* @return The aggregator registered under this name, or null, if no aggregator was registered.
public <T extends Aggregator<?>> T getIterationAggregator(String name) {
return this.runtimeContext.<T>getIterationAggregator(name);
* Get the aggregated value that an aggregator computed in the previous iteration.
* @param name The name of the aggregator.
* @return The aggregated value of the previous iteration.
public <T extends Value> T getPreviousIterationAggregate(String name) {
return this.runtimeContext.<T>getPreviousIterationAggregate(name);
* Gets the broadcast data set registered under the given name. Broadcast data sets
* are available on all parallel instances of a function. They can be registered via
* {@link VertexCentricIteration#addBroadcastSetForMessagingFunction(String, org.apache.flink.api.java.DataSet)}.
* @param name The name under which the broadcast set is registered.
* @return The broadcast data set.
public <T> Collection<T> getBroadcastSet(String name) {
return this.runtimeContext.<T>getBroadcastVariable(name);
// --------------------------------------------------------------------------------------------
// internal methods and state
// --------------------------------------------------------------------------------------------
private Tuple2<VertexKey, Message> outValue;
private IterationRuntimeContext runtimeContext;
private Iterator<?> edges;
private Collector<Tuple2<VertexKey, Message>> out;
private EdgesIteratorNoEdgeValue<VertexKey, EdgeValue> edgeNoValueIter;
private EdgesIteratorWithEdgeValue<VertexKey, EdgeValue> edgeWithValueIter;
private boolean edgesUsed;
void init(IterationRuntimeContext context, boolean hasEdgeValue) {
this.runtimeContext = context;
this.outValue = new Tuple2<VertexKey, Message>();
if (hasEdgeValue) {
this.edgeWithValueIter = new EdgesIteratorWithEdgeValue<VertexKey, EdgeValue>();
} else {
this.edgeNoValueIter = new EdgesIteratorNoEdgeValue<VertexKey, EdgeValue>();
void set(Iterator<?> edges, Collector<Tuple2<VertexKey, Message>> out) {
this.edges = edges;
this.out = out;
this.edgesUsed = false;
private static final class EdgesIteratorNoEdgeValue<VertexKey extends Comparable<VertexKey>, EdgeValue>
implements Iterator<OutgoingEdge<VertexKey, EdgeValue>>, Iterable<OutgoingEdge<VertexKey, EdgeValue>>
private Iterator<Tuple2<VertexKey, VertexKey>> input;
private OutgoingEdge<VertexKey, EdgeValue> edge = new OutgoingEdge<VertexKey, EdgeValue>();
void set(Iterator<Tuple2<VertexKey, VertexKey>> input) {
this.input = input;
public boolean hasNext() {
return input.hasNext();
public OutgoingEdge<VertexKey, EdgeValue> next() {
Tuple2<VertexKey, VertexKey> next = input.next();
edge.set(next.f1, null);
return edge;
public void remove() {
throw new UnsupportedOperationException();
public Iterator<OutgoingEdge<VertexKey, EdgeValue>> iterator() {
return this;
private static final class EdgesIteratorWithEdgeValue<VertexKey extends Comparable<VertexKey>, EdgeValue>
implements Iterator<OutgoingEdge<VertexKey, EdgeValue>>, Iterable<OutgoingEdge<VertexKey, EdgeValue>>
private Iterator<Tuple3<VertexKey, VertexKey, EdgeValue>> input;
private OutgoingEdge<VertexKey, EdgeValue> edge = new OutgoingEdge<VertexKey, EdgeValue>();
void set(Iterator<Tuple3<VertexKey, VertexKey, EdgeValue>> input) {
this.input = input;
public boolean hasNext() {
return input.hasNext();
public OutgoingEdge<VertexKey, EdgeValue> next() {
Tuple3<VertexKey, VertexKey, EdgeValue> next = input.next();
edge.set(next.f1, next.f2);
return edge;
public void remove() {
throw new UnsupportedOperationException();
public Iterator<OutgoingEdge<VertexKey, EdgeValue>> iterator() {
return this;
package org.apache.flink.spargel.java;
* <tt>Edge</tt> objects represent edges between vertices. Edges are defined by their source and target
* vertex id. Edges may have an associated value (for example a weight or a distance), if the
* graph algorithm was initialized with the
* {@link VertexCentricIteration#withValuedEdges(org.apache.flink.api.java.DataSet, VertexUpdateFunction, MessagingFunction, int)}
* method.
* @param <VertexKey> The type of the vertex key.
* @param <EdgeValue> The type of the value associated with the edge. For scenarios where the edges do not hold
* value, this type may be arbitrary.
public final class OutgoingEdge<VertexKey extends Comparable<VertexKey>, EdgeValue> implements java.io.Serializable {
private static final long serialVersionUID = 1L;
private VertexKey target;
private EdgeValue edgeValue;
void set(VertexKey target, EdgeValue edgeValue) {
this.target = target;
this.edgeValue = edgeValue;
* Gets the target vertex id.
* @return The target vertex id.
public VertexKey target() {
return target;
* Gets the value associated with the edge. The value may be null if the iteration was initialized with
* an edge data set without edge values.
* Typical examples of edge values are weights or distances of the path represented by the edge.
* @return The value associated with the edge.
public EdgeValue edgeValue() {
return edgeValue;
package org.apache.flink.spargel.java;
import java.io.Serializable;
import java.util.Collection;
import org.apache.flink.api.common.aggregators.Aggregator;
import org.apache.flink.api.common.functions.IterationRuntimeContext;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.types.Value;
import org.apache.flink.util.Collector;
* This class must be extended by functions that compute the state of the vertex depending on the old state and the
* incoming messages. The central method is {@link #updateVertex(Comparable, Object, MessageIterator)}, which is
* invoked once per vertex per superstep.
* <VertexKey> The vertex key type.
* <VertexValue> The vertex value type.
* <Message> The message type.
public abstract class VertexUpdateFunction<VertexKey extends Comparable<VertexKey>, VertexValue, Message> implements Serializable {
private static final long serialVersionUID = 1L;
// --------------------------------------------------------------------------------------------
// Public API Methods
// --------------------------------------------------------------------------------------------
* This method is invoked once per vertex per superstep. It receives the current state of the vertex, as well as
* the incoming messages. It may set a new vertex state via {@link #setNewVertexValue(Object)}. If the vertex
* state is changed, it will trigger the sending of messages via the {@link MessagingFunction}.
* @param vertexKey The key (identifier) of the vertex.
* @param vertexValue The value (state) of the vertex.
* @param inMessages The incoming messages to this vertex.
* @throws Exception The computation may throw exceptions, which causes the superstep to fail.
public abstract void updateVertex(VertexKey vertexKey, VertexValue vertexValue, MessageIterator<Message> inMessages) throws Exception;
* This method is executed one per superstep before the vertex update function is invoked for each vertex.
* @throws Exception Exceptions in the pre-superstep phase cause the superstep to fail.
public void preSuperstep() throws Exception {}
* This method is executed one per superstep after the vertex update function has been invoked for each vertex.
* @throws Exception Exceptions in the post-superstep phase cause the superstep to fail.
public void postSuperstep() throws Exception {}
* Sets the new value of this vertex. Setting a new value triggers the sending of outgoing messages from this vertex.
* @param newValue The new vertex value.
public void setNewVertexValue(VertexValue newValue) {
outVal.f1 = newValue;
* Gets the number of the superstep, starting at <tt>1</tt>.
* @return The number of the current superstep.
public int getSuperstepNumber() {
return this.runtimeContext.getSuperstepNumber();
* Gets the iteration aggregator registered under the given name. The iteration aggregator is combines
* all aggregates globally once per superstep and makes them available in the next superstep.
* @param name The name of the aggregator.
* @return The aggregator registered under this name, or null, if no aggregator was registered.
public <T extends Aggregator<?>> T getIterationAggregator(String name) {
return this.runtimeContext.<T>getIterationAggregator(name);
* Get the aggregated value that an aggregator computed in the previous iteration.
* @param name The name of the aggregator.
* @return The aggregated value of the previous iteration.
public <T extends Value> T getPreviousIterationAggregate(String name) {
return this.runtimeContext.<T>getPreviousIterationAggregate(name);
* Gets the broadcast data set registered under the given name. Broadcast data sets
* are available on all parallel instances of a function. They can be registered via
* {@link VertexCentricIteration#addBroadcastSetForUpdateFunction(String, org.apache.flink.api.java.DataSet)}.
* @param name The name under which the broadcast set is registered.
* @return The broadcast data set.
public <T> Collection<T> getBroadcastSet(String name) {
return this.runtimeContext.<T>getBroadcastVariable(name);
// --------------------------------------------------------------------------------------------
// internal methods
// --------------------------------------------------------------------------------------------
private IterationRuntimeContext runtimeContext;
private Collector<Tuple2<VertexKey, VertexValue>> out;
private Tuple2<VertexKey, VertexValue> outVal;
void init(IterationRuntimeContext context) {
this.runtimeContext = context;
void setOutput(Tuple2<VertexKey, VertexValue> val, Collector<Tuple2<VertexKey, VertexValue>> out) {
this.out = out;
this.outVal = val;
package org.apache.flink.spargel.java.examples;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.spargel.java.MessageIterator;
import org.apache.flink.spargel.java.MessagingFunction;
import org.apache.flink.spargel.java.VertexCentricIteration;
import org.apache.flink.spargel.java.VertexUpdateFunction;
import org.apache.flink.types.NullValue;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
@SuppressWarnings({"serial", "unchecked"})
public class SpargelConnectedComponents {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Long> vertexIds = env.generateSequence(0, 10);
DataSet<Tuple2<Long, Long>> edges = env.fromElements(new Tuple2<Long, Long>(0L, 2L), new Tuple2<Long, Long>(2L, 4L), new Tuple2<Long, Long>(4L, 8L),
new Tuple2<Long, Long>(1L, 5L), new Tuple2<Long, Long>(3L, 7L), new Tuple2<Long, Long>(3L, 9L));
DataSet<Tuple2<Long, Long>> initialVertices = vertexIds.map(new IdAssigner());
DataSet<Tuple2<Long, Long>> result = initialVertices.runOperation(VertexCentricIteration.withPlainEdges(edges, new CCUpdater(), new CCMessager(), 100));
env.execute("Spargel Connected Components");
public static final class CCUpdater extends VertexUpdateFunction<Long, Long, Long> {
public void updateVertex(Long vertexKey, Long vertexValue, MessageIterator<Long> inMessages) {
long min = Long.MAX_VALUE;
for (long msg : inMessages) {
min = Math.min(min, msg);
if (min < vertexValue) {
public static final class CCMessager extends MessagingFunction<Long, Long, Long, NullValue> {
public void sendMessages(Long vertexId, Long componentId) {
* A map function that takes a Long value and creates a 2-tuple out of it:
* <pre>(Long value) -> (value, value)</pre>
public static final class IdAssigner implements MapFunction<Long, Tuple2<Long, Long>> {
public Tuple2<Long, Long> map(Long value) {
return new Tuple2<Long, Long>(value, value);
package org.apache.flink.spargel.java.examples;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.spargel.java.MessageIterator;
import org.apache.flink.spargel.java.MessagingFunction;
import org.apache.flink.spargel.java.OutgoingEdge;
import org.apache.flink.spargel.java.VertexCentricIteration;
import org.apache.flink.spargel.java.VertexUpdateFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
* An implementation of the basic PageRank algorithm in the vertex-centric API (spargel).
* In this implementation, the edges carry a weight (the transition probability).
public class SpargelPageRank {
private static final double BETA = 0.85;
public static void main(String[] args) throws Exception {
final int numVertices = 100;
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// enumerate some sample edges and assign an initial uniform probability (rank)
DataSet<Tuple2<Long, Double>> intialRanks = env.generateSequence(1, numVertices)
.map(new MapFunction<Long, Tuple2<Long, Double>>() {
public Tuple2<Long, Double> map(Long value) {
return new Tuple2<Long, Double>(value, 1.0/numVertices);
// generate some random edges. the transition probability on each edge is 1/num-out-edges of the source vertex
DataSet<Tuple3<Long, Long, Double>> edgesWithProbability = env.generateSequence(1, numVertices)
.flatMap(new FlatMapFunction<Long, Tuple3<Long, Long, Double>>() {
public void flatMap(Long value, Collector<Tuple3<Long, Long, Double>> out) {
int numOutEdges = (int) (Math.random() * (numVertices / 2));
for (int i = 0; i < numOutEdges; i++) {
long target = (long) (Math.random() * numVertices) + 1;
out.collect(new Tuple3<Long, Long, Double>(value, target, 1.0/numOutEdges));
DataSet<Tuple2<Long, Double>> result = intialRanks.runOperation(
new VertexRankUpdater(numVertices, BETA), new RankMessenger(), 20));
env.execute("Spargel PageRank");
* Function that updates the rank of a vertex by summing up the partial ranks from all incoming messages
* and then applying the dampening formula.
public static final class VertexRankUpdater extends VertexUpdateFunction<Long, Double, Double> {
private final long numVertices;
private final double beta;
public VertexRankUpdater(long numVertices, double beta) {
this.numVertices = numVertices;
this.beta = beta;
public void updateVertex(Long vertexKey, Double vertexValue, MessageIterator<Double> inMessages) {
double rankSum = 0.0;
for (double msg : inMessages) {
rankSum += msg;
// apply the dampening factor / random jump
double newRank = (beta * rankSum) + (1-BETA)/numVertices;
* Distributes the rank of a vertex among all target vertices according to the transition probability,
* which is associated with an edge as the edge value.
public static final class RankMessenger extends MessagingFunction<Long, Double, Double, Double> {
public void sendMessages(Long vertexId, Double newRank) {
for (OutgoingEdge<Long, Double> edge : getOutgoingEdges()) {
sendMessageTo(edge.target(), newRank * edge.edgeValue());
package org.apache.flink.spargel.java.examples;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.spargel.java.MessageIterator;
import org.apache.flink.spargel.java.MessagingFunction;
import org.apache.flink.spargel.java.OutgoingEdge;
import org.apache.flink.spargel.java.VertexCentricIteration;
import org.apache.flink.spargel.java.VertexUpdateFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
* An implementation of the basic PageRank algorithm in the vertex-centric API (spargel).
* In this implementation, the edges carry a weight (the transition probability).
public class SpargelPageRankCountingVertices {
private static final double BETA = 0.85;
public static void main(String[] args) throws Exception {
final int NUM_VERTICES = 100;
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// a list of vertices
DataSet<Long> vertices = env.generateSequence(1, NUM_VERTICES);
// generate some random edges. the transition probability on each edge is 1/num-out-edges of the source vertex
DataSet<Tuple3<Long, Long, Double>> edgesWithProbability = env.generateSequence(1, NUM_VERTICES)
.flatMap(new FlatMapFunction<Long, Tuple3<Long, Long, Double>>() {
public void flatMap(Long value, Collector<Tuple3<Long, Long, Double>> out) {
int numOutEdges = (int) (Math.random() * (NUM_VERTICES / 2));
for (int i = 0; i < numOutEdges; i++) {
long target = (long) (Math.random() * NUM_VERTICES) + 1;
out.collect(new Tuple3<Long, Long, Double>(value, target, 1.0/numOutEdges));
// ---------- start of the algorithm ---------------
// count the number of vertices
DataSet<Long> count = vertices
.map(new MapFunction<Long, Long>() {
public Long map(Long value) {
return 1L;
.reduce(new ReduceFunction<Long>() {
public Long reduce(Long value1, Long value2) {
return value1 + value2;
// enumerate some sample edges and assign an initial uniform probability (rank)
DataSet<Tuple2<Long, Double>> intialRanks = vertices
.map(new RichMapFunction<Long, Tuple2<Long, Double>>() {
private long numVertices;
public void open(Configuration parameters) {
numVertices = getRuntimeContext().<Long>getBroadcastVariable("count").iterator().next();
public Tuple2<Long, Double> map(Long value) {
return new Tuple2<Long, Double>(value, 1.0/numVertices);
}).withBroadcastSet(count, "count");
VertexCentricIteration<Long, Double, Double, Double> iteration = VertexCentricIteration.withValuedEdges(edgesWithProbability,
new VertexRankUpdater(BETA), new RankMessenger(), 20);
iteration.addBroadcastSetForUpdateFunction("count", count);
DataSet<Tuple2<Long, Double>> result = intialRanks.runOperation(iteration);
env.execute("Spargel PageRank");
* Function that updates the rank of a vertex by summing up the partial ranks from all incoming messages
* and then applying the dampening formula.
public static final class VertexRankUpdater extends VertexUpdateFunction<Long, Double, Double> {
private final double beta;
private long numVertices;
public VertexRankUpdater(double beta) {
this.beta = beta;
public void preSuperstep() {
numVertices = this.<Long>getBroadcastSet("count").iterator().next();
public void updateVertex(Long vertexKey, Double vertexValue, MessageIterator<Double> inMessages) {
double rankSum = 0.0;
for (double msg : inMessages) {
rankSum += msg;
// apply the dampening factor / random jump
double newRank = (beta * rankSum) + (1-BETA)/numVertices;
* Distributes the rank of a vertex among all target vertices according to the transition probability,
* which is associated with an edge as the edge value.
public static final class RankMessenger extends MessagingFunction<Long, Double, Double, Double> {
public void sendMessages(Long vertexId, Double newRank) {
for (OutgoingEdge<Long, Double> edge : getOutgoingEdges()) {
sendMessageTo(edge.target(), newRank * edge.edgeValue());
package org.apache.flink.spargel.java.record;
import org.apache.flink.types.Key;
import org.apache.flink.types.Value;
public final class Edge<VertexKey extends Key<VertexKey>, EdgeValue extends Value> {
private VertexKey target;
private EdgeValue edgeValue;
void set(VertexKey target, EdgeValue edgeValue) {
this.target = target;
this.edgeValue = edgeValue;
public VertexKey target() {
return target;
public EdgeValue edgeValue() {
return edgeValue;
package org.apache.flink.spargel.java.record;
import java.util.Iterator;
import org.apache.flink.types.Record;
import org.apache.flink.types.Value;
public final class MessageIterator<Message extends Value> implements Iterator<Message>, Iterable<Message> {
private final Message instance;
private Iterator<Record> source;
public MessageIterator(Message instance) {
this.instance = instance;
public final void setSource(Iterator<Record> source) {
this.source = source;
public final boolean hasNext() {
return this.source.hasNext();
public final Message next() {
this.source.next().getFieldInto(1, this.instance);
return this.instance;
public final void remove() {
throw new UnsupportedOperationException();
public Iterator<Message> iterator() {
return this;
package org.apache.flink.spargel.java.record;
import java.io.Serializable;
import java.util.Iterator;
import org.apache.flink.api.common.aggregators.Aggregator;
import org.apache.flink.api.common.functions.IterationRuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.types.Key;
import org.apache.flink.types.Record;
import org.apache.flink.types.Value;
import org.apache.flink.util.Collector;
public abstract class MessagingFunction<VertexKey extends Key<VertexKey>, VertexValue extends Value, Message extends Value, EdgeValue extends Value> implements Serializable {
// --------------------------------------------------------------------------------------------
// Public API Methods
// --------------------------------------------------------------------------------------------
public abstract void sendMessages(VertexKey vertexKey, VertexValue vertexValue) throws Exception;
public void setup(Configuration config) throws Exception {}
public void preSuperstep() throws Exception {}
public void postSuperstep() throws Exception {}
public Iterator<Edge<VertexKey, EdgeValue>> getOutgoingEdges() {
if (edgesUsed) {
throw new IllegalStateException("Can use either 'getOutgoingEdges()' or 'sendMessageToAllTargets()'.");
edgesUsed = true;
return edgeIter;
public void sendMessageToAllNeighbors(Message m) {
if (edgesUsed) {
throw new IllegalStateException("Can use either 'getOutgoingEdges()' or 'sendMessageToAllTargets()'.");
edgesUsed = true;
while (edges.hasNext()) {
Record next = edges.next();
VertexKey k = next.getField(1, this.keyClass);
outValue.setField(0, k);
outValue.setField(1, m);
public void sendMessageTo(VertexKey target, Message m) {
outValue.setField(0, target);
outValue.setField(1, m);
// --------------------------------------------------------------------------------------------
public int getSuperstep() {
return this.runtimeContext.getSuperstepNumber();
public <T extends Aggregator<?>> T getIterationAggregator(String name) {
return this.runtimeContext.<T>getIterationAggregator(name);
public <T extends Value> T getPreviousIterationAggregate(String name) {
return this.runtimeContext.<T>getPreviousIterationAggregate(name);
// --------------------------------------------------------------------------------------------
// internal methods and state
// --------------------------------------------------------------------------------------------
private Record outValue;
private IterationRuntimeContext runtimeContext;
private Iterator<Record> edges;
private Collector<Record> out;
private EdgesIterator<VertexKey, EdgeValue> edgeIter;
private Class<VertexKey> keyClass;
private boolean edgesUsed;
void init(IterationRuntimeContext context, VertexKey keyHolder, EdgeValue edgeValueHolder) {
this.runtimeContext = context;
this.edgeIter = new EdgesIterator<VertexKey, EdgeValue>(keyHolder, edgeValueHolder);
this.outValue = new Record();
this.keyClass = (Class<VertexKey>) keyHolder.getClass();
void set(Iterator<Record> edges, Collector<Record> out) {
this.edges = edges;
this.out = out;
this.edgesUsed = false;
private static final long serialVersionUID = 1L;
private static final class EdgesIterator<VertexKey extends Key<VertexKey>, EdgeValue extends Value> implements Iterator<Edge<VertexKey, EdgeValue>> {
private Iterator<Record> input;
private VertexKey keyHolder;
private EdgeValue edgeValueHolder;
private Edge<VertexKey, EdgeValue> edge = new Edge<VertexKey, EdgeValue>();
EdgesIterator(VertexKey keyHolder, EdgeValue edgeValueHolder) {
this.keyHolder = keyHolder;
this.edgeValueHolder = edgeValueHolder;
void set(Iterator<Record> input) {
this.input = input;
public boolean hasNext() {
return input.hasNext();
public Edge<VertexKey, EdgeValue> next() {
Record next = input.next();
next.getFieldInto(0, keyHolder);
next.getFieldInto(1, edgeValueHolder);
edge.set(keyHolder, edgeValueHolder);
return edge;
public void remove() {
throw new UnsupportedOperationException();
package org.apache.flink.spargel.java.record;
import java.io.IOException;
import java.util.Iterator;
import org.apache.flink.api.common.aggregators.AggregatorRegistry;
import org.apache.flink.api.common.operators.Operator;
import org.apache.flink.api.java.record.functions.CoGroupFunction;
import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFieldsFirst;
import org.apache.flink.api.java.record.operators.CoGroupOperator;
import org.apache.flink.api.java.record.operators.DeltaIteration;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.types.Key;
import org.apache.flink.types.Record;
import org.apache.flink.types.Value;
import org.apache.flink.util.Collector;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.ReflectionUtil;
public class SpargelIteration {
private static final String DEFAULT_NAME = "<unnamed vertex-centric iteration>";
private final DeltaIteration iteration;
private final Class<? extends Key<?>> vertexKey;
private final Class<? extends Value> vertexValue;
private final Class<? extends Value> messageType;
private final Class<? extends Value> edgeValue;
private final CoGroupOperator vertexUpdater;
private final CoGroupOperator messager;
// ----------------------------------------------------------------------------------
public <VertexKey extends Key<VertexKey>, VertexValue extends Value, Message extends Value, EdgeValue extends Value>
SpargelIteration(MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> mf,
VertexUpdateFunction<VertexKey, VertexValue, Message> uf)
this(mf, uf, DEFAULT_NAME);
public <VertexKey extends Key<VertexKey>, VertexValue extends Value, Message extends Value, EdgeValue extends Value> SpargelIteration(
MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> mf, VertexUpdateFunction<VertexKey, VertexValue, Message> uf,
String name)
// get the types
this.vertexKey = ReflectionUtil.getTemplateType1(mf.getClass());
this.vertexValue = ReflectionUtil.getTemplateType2(mf.getClass());
this.messageType = ReflectionUtil.getTemplateType3(mf.getClass());
this.edgeValue = ReflectionUtil.getTemplateType4(mf.getClass());
if (vertexKey == null || vertexValue == null || messageType == null || edgeValue == null) {
throw new RuntimeException();
// instantiate the data flow
this.iteration = new DeltaIteration(0, name);
this.messager = CoGroupOperator.builder(MessagingDriver.class, vertexKey, 0, 0)
.name("Message Sender")
this.vertexUpdater = CoGroupOperator.builder(VertexUpdateDriver.class, vertexKey, 0, 0)
.name("Vertex Updater")
// parameterize the data flow
try {
Configuration vertexUdfParams = vertexUpdater.getParameters();
InstantiationUtil.writeObjectToConfig(uf, vertexUdfParams, VertexUpdateDriver.UDF_PARAM);
vertexUdfParams.setClass(VertexUpdateDriver.KEY_PARAM, vertexKey);
vertexUdfParams.setClass(VertexUpdateDriver.VALUE_PARAM, vertexValue);
vertexUdfParams.setClass(VertexUpdateDriver.MESSAGE_PARAM, messageType);
Configuration messageUdfParams = messager.getParameters();
InstantiationUtil.writeObjectToConfig(mf, messageUdfParams, MessagingDriver.UDF_PARAM);
messageUdfParams.setClass(MessagingDriver.KEY_PARAM, vertexKey);
messageUdfParams.setClass(MessagingDriver.VALUE_PARAM, vertexValue);
messageUdfParams.setClass(MessagingDriver.MESSAGE_PARAM, messageType);
messageUdfParams.setClass(MessagingDriver.EDGE_PARAM, edgeValue);
catch (IOException e) {
throw new RuntimeException("Could not serialize the UDFs for distribution" +
(e.getMessage() == null ? '.' : ": " + e.getMessage()), e);
// ----------------------------------------------------------------------------------
// inputs and outputs
// ----------------------------------------------------------------------------------
public void setVertexInput(Operator<Record> c) {
public void setEdgesInput(Operator<Record> c) {
public Operator<?> getOutput() {
return this.iteration;
public void setParallelism(int parallelism) {
public void setNumberOfIterations(int iterations) {
public AggregatorRegistry getAggregators() {
return this.iteration.getAggregators();
// --------------------------------------------------------------------------------------------
// Wrapping UDFs
// --------------------------------------------------------------------------------------------
public static final class VertexUpdateDriver<K extends Key<K>, V extends Value, M extends Value> extends CoGroupFunction {
private static final long serialVersionUID = 1L;
private static final String UDF_PARAM = "spargel.udf";
private static final String KEY_PARAM = "spargel.key-type";
private static final String VALUE_PARAM = "spargel.value-type";
private static final String MESSAGE_PARAM = "spargel.message-type";
private VertexUpdateFunction<K, V, M> vertexUpdateFunction;
private K vertexKey;
private V vertexValue;
private MessageIterator<M> messageIter;
public void coGroup(Iterator<Record> messages, Iterator<Record> vertex, Collector<Record> out) throws Exception {
if (vertex.hasNext()) {
Record first = vertex.next();
first.getFieldInto(0, vertexKey);
first.getFieldInto(1, vertexValue);
vertexUpdateFunction.setOutput(first, out);
vertexUpdateFunction.updateVertex(vertexKey, vertexValue, messageIter);
} else {
if (messages.hasNext()) {
String message = "Target vertex does not exist!.";
try {
Record next = messages.next();
next.getFieldInto(0, vertexKey);
message = "Target vertex '" + vertexKey + "' does not exist!.";
} catch (Throwable t) {}
throw new Exception(message);
} else {
throw new Exception();
public void open(Configuration parameters) throws Exception {
// instantiate only the first time
if (vertexUpdateFunction == null) {
ClassLoader cl = getRuntimeContext().getUserCodeClassLoader();
Class<K> vertexKeyClass = parameters.getClass(KEY_PARAM, null, cl);
Class<V> vertexValueClass = parameters.getClass(VALUE_PARAM, null, cl);
Class<M> messageClass = parameters.getClass(MESSAGE_PARAM, null, cl);
vertexKey = InstantiationUtil.instantiate(vertexKeyClass, Key.class);
vertexValue = InstantiationUtil.instantiate(vertexValueClass, Value.class);
messageIter = new MessageIterator<M>(InstantiationUtil.instantiate(messageClass, Value.class));
ClassLoader ucl = getRuntimeContext().getUserCodeClassLoader();
try {
this.vertexUpdateFunction = (VertexUpdateFunction<K, V, M>) InstantiationUtil.readObjectFromConfig(parameters, UDF_PARAM, ucl);
} catch (Exception e) {
String message = e.getMessage() == null ? "." : ": " + e.getMessage();
throw new Exception("Could not instantiate VertexUpdateFunction" + message, e);
public void close() throws Exception {
public static final class MessagingDriver<K extends Key<K>, V extends Value, M extends Value, E extends Value> extends CoGroupFunction {
private static final long serialVersionUID = 1L;
private static final String UDF_PARAM = "spargel.udf";
private static final String KEY_PARAM = "spargel.key-type";
private static final String VALUE_PARAM = "spargel.value-type";
private static final String MESSAGE_PARAM = "spargel.message-type";
private static final String EDGE_PARAM = "spargel.edge-value";
private MessagingFunction<K, V, M, E> messagingFunction;
private K vertexKey;
private V vertexValue;
public void coGroup(Iterator<Record> edges, Iterator<Record> state, Collector<Record> out) throws Exception {
if (state.hasNext()) {
Record first = state.next();
first.getFieldInto(0, vertexKey);
first.getFieldInto(1, vertexValue);
messagingFunction.set(edges, out);
messagingFunction.sendMessages(vertexKey, vertexValue);
public void open(Configuration parameters) throws Exception {
// instantiate only the first time
if (messagingFunction == null) {
ClassLoader cl = getRuntimeContext().getUserCodeClassLoader();
Class<K> vertexKeyClass = parameters.getClass(KEY_PARAM, null, cl);
Class<V> vertexValueClass = parameters.getClass(VALUE_PARAM, null, cl);
// Class<M> messageClass = parameters.getClass(MESSAGE_PARAM, null, Value.class);
Class<E> edgeClass = parameters.getClass(EDGE_PARAM, null, cl);
vertexKey = InstantiationUtil.instantiate(vertexKeyClass, Key.class);
vertexValue = InstantiationUtil.instantiate(vertexValueClass, Value.class);
K edgeKeyHolder = InstantiationUtil.instantiate(vertexKeyClass, Key.class);
E edgeValueHolder = InstantiationUtil.instantiate(edgeClass, Value.class);
ClassLoader ucl = getRuntimeContext().getUserCodeClassLoader();
try {
this.messagingFunction = (MessagingFunction<K, V, M, E>) InstantiationUtil.readObjectFromConfig(parameters, UDF_PARAM, ucl);
} catch (Exception e) {
String message = e.getMessage() == null ? "." : ": " + e.getMessage();
throw new Exception("Could not instantiate MessagingFunction" + message, e);
this.messagingFunction.init(getIterationRuntimeContext(), edgeKeyHolder, edgeValueHolder);
public void close() throws Exception {
package org.apache.flink.spargel.java.record;
import java.io.Serializable;
import org.apache.flink.api.common.aggregators.Aggregator;
import org.apache.flink.api.common.functions.IterationRuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.types.Key;
import org.apache.flink.types.Record;
import org.apache.flink.types.Value;
import org.apache.flink.util.Collector;
* <VertexKey> The vertex key type.
* <VertexValue> The vertex value type.
* <Message> The message type.
public abstract class VertexUpdateFunction<VertexKey extends Key<VertexKey>, VertexValue extends Value, Message extends Value> implements Serializable {
// --------------------------------------------------------------------------------------------
// Public API Methods
// --------------------------------------------------------------------------------------------
public abstract void updateVertex(VertexKey vertexKey, VertexValue vertexValue, MessageIterator<Message> inMessages) throws Exception;
public void setup(Configuration config) throws Exception {}
public void preSuperstep() throws Exception {}
public void postSuperstep() throws Exception {}
public void setNewVertexValue(VertexValue newValue) {
outVal.setField(1, newValue);
public int getSuperstep() {
return this.runtimeContext.getSuperstepNumber();
public <T extends Aggregator<?>> T getIterationAggregator(String name) {
return this.runtimeContext.<T>getIterationAggregator(name);
public <T extends Value> T getPreviousIterationAggregate(String name) {
return this.runtimeContext.<T>getPreviousIterationAggregate(name);
// --------------------------------------------------------------------------------------------
// internal methods
// --------------------------------------------------------------------------------------------
private IterationRuntimeContext runtimeContext;
private Collector<Record> out;
private Record outVal;
void init(IterationRuntimeContext context) {
this.runtimeContext = context;
void setOutput(Record val, Collector<Record> out) {
this.out = out;
this.outVal = val;
// serializability
private static final long serialVersionUID = 1L;
package org.apache.flink.test.spargel;
import java.io.BufferedReader;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.spargel.java.VertexCentricIteration;
import org.apache.flink.spargel.java.examples.SpargelConnectedComponents.CCMessager;
import org.apache.flink.spargel.java.examples.SpargelConnectedComponents.CCUpdater;
import org.apache.flink.spargel.java.examples.SpargelConnectedComponents.IdAssigner;
import org.apache.flink.test.testdata.ConnectedComponentsData;
import org.apache.flink.test.util.JavaProgramTestBase;
public class SpargelConnectedComponentsITCase extends JavaProgramTestBase {
private static final long SEED = 9487520347802987L;
private static final int NUM_VERTICES = 1000;
private static final int NUM_EDGES = 10000;
private String resultPath;
protected void preSubmit() throws Exception {
resultPath = getTempFilePath("results");
protected void testProgram() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Long> vertexIds = env.generateSequence(1, NUM_VERTICES);
DataSet<String> edgeString = env.fromElements(ConnectedComponentsData.getRandomOddEvenEdges(NUM_EDGES, NUM_VERTICES, SEED).split("\n"));
DataSet<Tuple2<Long, Long>> edges = edgeString.map(new EdgeParser());
DataSet<Tuple2<Long, Long>> initialVertices = vertexIds.map(new IdAssigner());
DataSet<Tuple2<Long, Long>> result = initialVertices.runOperation(VertexCentricIteration.withPlainEdges(edges, new CCUpdater(), new CCMessager(), 100));
result.writeAsCsv(resultPath, "\n", " ");
env.execute("Spargel Connected Components");
protected void postSubmit() throws Exception {
for (BufferedReader reader : getResultReader(resultPath)) {
public static final class EdgeParser extends RichMapFunction<String, Tuple2<Long, Long>> {
public Tuple2<Long, Long> map(String value) {
String[] nums = value.split(" ");
return new Tuple2<Long, Long>(Long.parseLong(nums[0]), Long.parseLong(nums[1]));
