提交 e2f28d47 编写于 作者: V vasia

[FLINK-1523] [gelly] added getIn/Out degrees methods in update and messaging functions;

    deleted VertexWithValue type;
    deleted InaccessibleMethodException; if the options are not set, -1 is returned;
    added missing javadocs;
    added tests;
    renamed type parameters VertexKey -> K, VertexValue -> VV, EdgeValue -> EV.
上级 e1720673
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.graph;
/**
* The exception that gets thrown when the degree option or the number of vertices
* option in {@link org.apache.flink.graph.spargel.IterationConfiguration} was not set.
*/
public class InaccessibleMethodException extends Exception {
public InaccessibleMethodException() {}
public InaccessibleMethodException(String text) {
super(text);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.graph;
import java.io.Serializable;
/**
* Represents the graph's nodes. It carries an ID and a value as well as the vertex inDegree and outDegree.
* For vertices with no value, use {@link org.apache.flink.types.NullValue} as the value type.
*
* @param <K>
* @param <V>
*/
public class VertexWithDegrees<K extends Comparable<K> & Serializable, V extends Serializable>
extends Vertex<K, V> {
private long inDegree;
private long outDegree;
public VertexWithDegrees() {
super();
inDegree = -1l;
outDegree = -1l;
}
public VertexWithDegrees(K k, V v) {
super(k,v);
inDegree = 0l;
outDegree = 0l;
}
public Long getInDegree() throws Exception{
if(inDegree == -1) {
throw new InaccessibleMethodException("The degree option was not set. To access the degrees, " +
"call iterationConfiguration.setOptDegrees(true).");
}
return inDegree;
}
public void setInDegree(Long inDegree) {
this.inDegree = inDegree;
}
public Long getOutDegree() throws Exception{
if(outDegree == -1) {
throw new InaccessibleMethodException("The degree option was not set. To access the degrees, " +
"call iterationConfiguration.setOptDegrees(true).");
}
return outDegree;
}
public void setOutDegree(Long outDegree) {
this.outDegree = outDegree;
}
}
......@@ -26,11 +26,10 @@ import org.apache.flink.graph.Edge;
import org.apache.flink.graph.EdgeDirection;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.VertexWithDegrees;
import org.apache.flink.graph.example.utils.IncrementalSSSPData;
import org.apache.flink.graph.spargel.IterationConfiguration;
import org.apache.flink.graph.spargel.MessageIterator;
import org.apache.flink.graph.spargel.MessagingFunction;
import org.apache.flink.graph.spargel.VertexCentricConfiguration;
import org.apache.flink.graph.spargel.VertexUpdateFunction;
import org.apache.flink.graph.utils.Tuple2ToVertexMap;
import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
......@@ -95,7 +94,7 @@ public class IncrementalSSSPExample implements ProgramDescription {
graph.removeEdge(edgeToBeRemoved);
// configure the iteration
IterationConfiguration parameters = new IterationConfiguration();
VertexCentricConfiguration parameters = new VertexCentricConfiguration();
if(isInSSSP(edgeToBeRemoved, edgesInSSSP)) {
......@@ -160,7 +159,7 @@ public class IncrementalSSSPExample implements ProgramDescription {
@Override
public void updateVertex(Vertex<Long, Double> vertex, MessageIterator<Double> inMessages) throws Exception {
if (inMessages.hasNext()) {
Long outDegree = ((VertexWithDegrees)vertex).getOutDegree() - 1;
Long outDegree = getOutDegree() - 1;
// check if the vertex has another SP-Edge
if (outDegree > 0) {
// there is another shortest path from the source to this vertex
......
......@@ -20,6 +20,7 @@ package org.apache.flink.graph.library;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.GraphAlgorithm;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.spargel.MessageIterator;
import org.apache.flink.graph.spargel.MessagingFunction;
import org.apache.flink.graph.spargel.VertexUpdateFunction;
......@@ -60,7 +61,7 @@ public class ConnectedComponentsAlgorithm implements GraphAlgorithm<Long, Long,
public static final class CCUpdater extends VertexUpdateFunction<Long, Long, Long> {
@Override
public void updateVertex(Long id, Long currentMin, MessageIterator<Long> messages) throws Exception {
public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> messages) throws Exception {
long min = Long.MAX_VALUE;
for (long msg : messages) {
......@@ -68,7 +69,7 @@ public class ConnectedComponentsAlgorithm implements GraphAlgorithm<Long, Long,
}
// update vertex value, if new minimum
if (min < currentMin) {
if (min < vertex.getValue()) {
setNewVertexValue(min);
}
}
......@@ -80,9 +81,9 @@ public class ConnectedComponentsAlgorithm implements GraphAlgorithm<Long, Long,
public static final class CCMessenger extends MessagingFunction<Long, Long, Long, NullValue> {
@Override
public void sendMessages(Long id, Long currentMin) throws Exception {
public void sendMessages(Vertex<Long, Long> vertex) throws Exception {
// send current minimum to neighbors
sendMessageToAllNeighbors(currentMin);
sendMessageToAllNeighbors(vertex.getValue());
}
}
}
......@@ -26,24 +26,21 @@ import org.apache.flink.api.common.aggregators.Aggregator;
import org.apache.flink.api.common.functions.IterationRuntimeContext;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.EdgeDirection;
import org.apache.flink.graph.InaccessibleMethodException;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.VertexWithDegrees;
import org.apache.flink.types.Value;
import org.apache.flink.util.Collector;
/**
* The base class for functions that produce messages between vertices as a part of a {@link VertexCentricIteration}.
*
* @param <VertexKey> The type of the vertex key (the vertex identifier).
* @param <VertexValue> The type of the vertex value (the state of the vertex).
* @param <K> The type of the vertex key (the vertex identifier).
* @param <VV> The type of the vertex value (the state of the vertex).
* @param <Message> The type of the message sent between vertices along the edges.
* @param <EdgeValue> The type of the values that are associated with the edges.
* @param <EV> The type of the values that are associated with the edges.
*/
public abstract class MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> implements Serializable {
public abstract class MessagingFunction<K, VV, Message, EV> implements Serializable {
private static final long serialVersionUID = 1L;
......@@ -54,11 +51,12 @@ public abstract class MessagingFunction<VertexKey, VertexValue, Message, EdgeVal
private long numberOfVertices = -1L;
public long getNumberOfVertices() throws Exception{
if (numberOfVertices == -1) {
throw new InaccessibleMethodException("The number of vertices option is not set. " +
"To access the number of vertices, call iterationConfiguration.setOptNumVertices(true).");
}
/**
* Retrieves the number of vertices in the graph.
* @return the number of vertices if the {@link IterationConfiguration#setOptNumVertices(boolean)}
* option has been set; -1 otherwise.
*/
public long getNumberOfVertices() {
return numberOfVertices;
}
......@@ -73,11 +71,15 @@ public abstract class MessagingFunction<VertexKey, VertexValue, Message, EdgeVal
private EdgeDirection direction;
/**
* Retrieves the edge direction in which messages are propagated in the vertex-centric iteration.
* @return the messaging {@link EdgeDirection}
*/
public EdgeDirection getDirection() {
return direction;
}
public void setDirection(EdgeDirection direction) {
void setDirection(EdgeDirection direction) {
this.direction = direction;
}
......@@ -93,7 +95,7 @@ public abstract class MessagingFunction<VertexKey, VertexValue, Message, EdgeVal
*
* @throws Exception The computation may throw exceptions, which causes the superstep to fail.
*/
public abstract void sendMessages(Vertex<VertexKey, VertexValue> vertex) throws Exception;
public abstract void sendMessages(Vertex<K, VV> vertex) throws Exception;
/**
* This method is executed one per superstep before the vertex update function is invoked for each vertex.
......@@ -117,12 +119,12 @@ public abstract class MessagingFunction<VertexKey, VertexValue, Message, EdgeVal
* @return An iterator with all outgoing edges.
*/
@SuppressWarnings("unchecked")
public Iterable<Edge<VertexKey, EdgeValue>> getEdges() {
public Iterable<Edge<K, EV>> getEdges() {
if (edgesUsed) {
throw new IllegalStateException("Can use either 'getEdges()' or 'sendMessageToAllTargets()' exactly once.");
}
edgesUsed = true;
this.edgeIterator.set((Iterator<Edge<VertexKey, EdgeValue>>) edges);
this.edgeIterator.set((Iterator<Edge<K, EV>>) edges);
return this.edgeIterator;
}
......@@ -143,7 +145,7 @@ public abstract class MessagingFunction<VertexKey, VertexValue, Message, EdgeVal
while (edges.hasNext()) {
Tuple next = (Tuple) edges.next();
VertexKey k = next.getField(1);
K k = next.getField(1);
outValue.f0 = k;
out.collect(outValue);
}
......@@ -156,7 +158,7 @@ public abstract class MessagingFunction<VertexKey, VertexValue, Message, EdgeVal
* @param target The key (id) of the target vertex to message.
* @param m The message.
*/
public void sendMessageTo(VertexKey target, Message m) {
public void sendMessageTo(K target, Message m) {
outValue.f0 = target;
outValue.f1 = m;
out.collect(outValue);
......@@ -210,39 +212,42 @@ public abstract class MessagingFunction<VertexKey, VertexValue, Message, EdgeVal
// internal methods and state
// --------------------------------------------------------------------------------------------
private Tuple2<VertexKey, Message> outValue;
private Tuple2<K, Message> outValue;
private IterationRuntimeContext runtimeContext;
private Iterator<?> edges;
private Collector<Tuple2<VertexKey, Message>> out;
private Collector<Tuple2<K, Message>> out;
private EdgesIterator<VertexKey, EdgeValue> edgeIterator;
private EdgesIterator<K, EV> edgeIterator;
private boolean edgesUsed;
private long inDegree = -1;
private long outDegree = -1;
void init(IterationRuntimeContext context) {
this.runtimeContext = context;
this.outValue = new Tuple2<VertexKey, Message>();
this.edgeIterator = new EdgesIterator<VertexKey, EdgeValue>();
this.outValue = new Tuple2<K, Message>();
this.edgeIterator = new EdgesIterator<K, EV>();
}
void set(Iterator<?> edges, Collector<Tuple2<VertexKey, Message>> out) {
void set(Iterator<?> edges, Collector<Tuple2<K, Message>> out) {
this.edges = edges;
this.out = out;
this.edgesUsed = false;
}
private static final class EdgesIterator<VertexKey, EdgeValue>
implements Iterator<Edge<VertexKey, EdgeValue>>, Iterable<Edge<VertexKey, EdgeValue>>
private static final class EdgesIterator<K, EV>
implements Iterator<Edge<K, EV>>, Iterable<Edge<K, EV>>
{
private Iterator<Edge<VertexKey, EdgeValue>> input;
private Iterator<Edge<K, EV>> input;
private Edge<VertexKey, EdgeValue> edge = new Edge<VertexKey, EdgeValue>();
private Edge<K, EV> edge = new Edge<K, EV>();
void set(Iterator<Edge<VertexKey, EdgeValue>> input) {
void set(Iterator<Edge<K, EV>> input) {
this.input = input;
}
......@@ -252,8 +257,8 @@ public abstract class MessagingFunction<VertexKey, VertexValue, Message, EdgeVal
}
@Override
public Edge<VertexKey, EdgeValue> next() {
Edge<VertexKey, EdgeValue> next = input.next();
public Edge<K, EV> next() {
Edge<K, EV> next = input.next();
edge.setSource(next.f0);
edge.setTarget(next.f1);
edge.setValue(next.f2);
......@@ -265,28 +270,34 @@ public abstract class MessagingFunction<VertexKey, VertexValue, Message, EdgeVal
throw new UnsupportedOperationException();
}
@Override
public Iterator<Edge<VertexKey, EdgeValue>> iterator() {
public Iterator<Edge<K, EV>> iterator() {
return this;
}
}
/**
* In order to hide the Tuple3(actualValue, inDegree, outDegree) vertex value from the user,
* another function will be called from {@link org.apache.flink.graph.spargel.VertexCentricIteration}.
*
* This function will retrieve the vertex from the vertexState and will set its degrees, afterwards calling
* the regular sendMessages function.
*
* @param newVertexState
* @throws Exception
* Retrieves the vertex in-degree (number of in-coming edges).
* @return The in-degree of this vertex if the {@link IterationConfiguration#setOptDegrees(boolean)}
* option has been set; -1 otherwise.
*/
void sendMessagesFromVertexCentricIteration(Vertex<VertexKey, Tuple3<VertexValue, Long, Long>> newVertexState)
throws Exception {
VertexWithDegrees<VertexKey, VertexValue> vertex = new VertexWithDegrees<VertexKey, VertexValue>(newVertexState.getId(),
newVertexState.getValue().f0);
vertex.setInDegree(newVertexState.getValue().f1);
vertex.setOutDegree(newVertexState.getValue().f2);
public long getInDegree() {
return inDegree;
}
void setInDegree(long inDegree) {
this.inDegree = inDegree;
}
/**
* Retrieve the vertex out-degree (number of out-going edges).
* @return The out-degree of this vertex if the {@link IterationConfiguration#setOptDegrees(boolean)}
* option has been set; -1 otherwise.
*/
public long getOutDegree() {
return outDegree;
}
sendMessages(vertex);
void setOutDegree(long outDegree) {
this.outDegree = outDegree;
}
}
......@@ -29,10 +29,8 @@ import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.common.functions.RichCoGroupFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.operators.CoGroupOperator;
import org.apache.flink.api.java.operators.CustomUnaryOperation;
import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
......@@ -43,7 +41,6 @@ import org.apache.flink.graph.Edge;
import org.apache.flink.graph.EdgeDirection;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.VertexWithDegrees;
import org.apache.flink.util.Collector;
import com.google.common.base.Preconditions;
......@@ -72,34 +69,33 @@ import com.google.common.base.Preconditions;
* Vertex-centric graph iterations are are run by calling
* {@link Graph#runVertexCentricIteration(VertexUpdateFunction, MessagingFunction, int)}.
*
* @param <VertexKey> The type of the vertex key (the vertex identifier).
* @param <VertexValue> The type of the vertex value (the state of the vertex).
* @param <K> The type of the vertex key (the vertex identifier).
* @param <VV> The type of the vertex value (the state of the vertex).
* @param <Message> The type of the message sent between vertices along the edges.
* @param <EdgeValue> The type of the values that are associated with the edges.
* @param <EV> The type of the values that are associated with the edges.
*/
public class VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue>
implements CustomUnaryOperation<Vertex<VertexKey, VertexValue>, Vertex<VertexKey, VertexValue>>
public class VertexCentricIteration<K, VV, Message, EV>
implements CustomUnaryOperation<Vertex<K, VV>, Vertex<K, VV>>
{
private final VertexUpdateFunction<VertexKey, VertexValue, Message> updateFunction;
private final VertexUpdateFunction<K, VV, Message> updateFunction;
private final MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> messagingFunction;
private final MessagingFunction<K, VV, Message, EV> messagingFunction;
private final DataSet<Edge<VertexKey, EdgeValue>> edgesWithValue;
private final DataSet<Edge<K, EV>> edgesWithValue;
private final int maximumNumberOfIterations;
private final TypeInformation<Message> messageType;
private DataSet<Vertex<VertexKey, VertexValue>> initialVertices;
private DataSet<Vertex<K, VV>> initialVertices;
private VertexCentricConfiguration configuration;
private DataSet<Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>> verticesWithDegrees;
// ----------------------------------------------------------------------------------
private VertexCentricIteration(VertexUpdateFunction<VertexKey, VertexValue, Message> uf,
MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> mf,
DataSet<Edge<VertexKey, EdgeValue>> edgesWithValue,
private VertexCentricIteration(VertexUpdateFunction<K, VV, Message> uf,
MessagingFunction<K, VV, Message, EV> mf,
DataSet<Edge<K, EV>> edgesWithValue,
int maximumNumberOfIterations)
{
Preconditions.checkNotNull(uf);
......@@ -114,7 +110,7 @@ public class VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue>
this.messageType = getMessageType(mf);
}
private TypeInformation<Message> getMessageType(MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> mf) {
private TypeInformation<Message> getMessageType(MessagingFunction<K, VV, Message, EV> mf) {
return TypeExtractor.createTypeInfo(MessagingFunction.class, mf.getClass(), 2, null, null);
}
......@@ -132,7 +128,7 @@ public class VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue>
* @see org.apache.flink.api.java.operators.CustomUnaryOperation#setInput(org.apache.flink.api.java.DataSet)
*/
@Override
public void setInput(DataSet<Vertex<VertexKey, VertexValue>> inputData) {
public void setInput(DataSet<Vertex<K, VV>> inputData) {
this.initialVertices = inputData;
}
......@@ -142,17 +138,17 @@ public class VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue>
* @return The operator that represents this vertex-centric graph computation.
*/
@Override
public DataSet<Vertex<VertexKey, VertexValue>> createResult() {
public DataSet<Vertex<K, VV>> createResult() {
if (this.initialVertices == null) {
throw new IllegalStateException("The input data set has not been set.");
}
// prepare some type information
TypeInformation<VertexKey> keyType = ((TupleTypeInfo<?>) initialVertices.getType()).getTypeAt(0);
TypeInformation<Tuple2<VertexKey, Message>> messageTypeInfo = new TupleTypeInfo<Tuple2<VertexKey,Message>>(keyType, messageType);
TypeInformation<K> keyType = ((TupleTypeInfo<?>) initialVertices.getType()).getTypeAt(0);
TypeInformation<Tuple2<K, Message>> messageTypeInfo = new TupleTypeInfo<Tuple2<K,Message>>(keyType, messageType);
// create a graph
Graph<VertexKey, VertexValue, EdgeValue> graph =
Graph<K, VV, EV> graph =
Graph.fromDataSet(initialVertices, edgesWithValue, ExecutionEnvironment.getExecutionEnvironment());
// check whether the numVertices option is set and, if so, compute the total number of vertices
......@@ -194,21 +190,21 @@ public class VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue>
* @param uf The function that updates the state of the vertices from the incoming messages.
* @param mf The function that turns changed vertex states into messages along the edges.
*
* @param <VertexKey> The type of the vertex key (the vertex identifier).
* @param <VertexValue> The type of the vertex value (the state of the vertex).
* @param <K> The type of the vertex key (the vertex identifier).
* @param <VV> The type of the vertex value (the state of the vertex).
* @param <Message> The type of the message sent between vertices along the edges.
* @param <EdgeValue> The type of the values that are associated with the edges.
* @param <EV> The type of the values that are associated with the edges.
*
* @return An in stance of the vertex-centric graph computation operator.
*/
public static final <VertexKey, VertexValue, Message, EdgeValue>
VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue> withEdges(
DataSet<Edge<VertexKey, EdgeValue>> edgesWithValue,
VertexUpdateFunction<VertexKey, VertexValue, Message> uf,
MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> mf,
public static final <K, VV, Message, EV>
VertexCentricIteration<K, VV, Message, EV> withEdges(
DataSet<Edge<K, EV>> edgesWithValue,
VertexUpdateFunction<K, VV, Message> uf,
MessagingFunction<K, VV, Message, EV> mf,
int maximumNumberOfIterations)
{
return new VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue>(uf, mf, edgesWithValue, maximumNumberOfIterations);
return new VertexCentricIteration<K, VV, Message, EV>(uf, mf, edgesWithValue, maximumNumberOfIterations);
}
/**
......@@ -231,21 +227,21 @@ public class VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue>
// Wrapping UDFs
// --------------------------------------------------------------------------------------------
private static final class VertexUpdateUdf<VertexKey, VertexValue, Message>
extends RichCoGroupFunction<Tuple2<VertexKey, Message>, Vertex<VertexKey, VertexValue>, Vertex<VertexKey, VertexValue>>
implements ResultTypeQueryable<Vertex<VertexKey, VertexValue>>
private static abstract class VertexUpdateUdf<K, VVWithDegrees, Message> extends RichCoGroupFunction<
Tuple2<K, Message>, Vertex<K, VVWithDegrees>, Vertex<K, VVWithDegrees>>
implements ResultTypeQueryable<Vertex<K, VVWithDegrees>>
{
private static final long serialVersionUID = 1L;
final VertexUpdateFunction<VertexKey, VertexValue, Message> vertexUpdateFunction;
final VertexUpdateFunction<K, VVWithDegrees, Message> vertexUpdateFunction;
final MessageIterator<Message> messageIter = new MessageIterator<Message>();
private transient TypeInformation<Vertex<VertexKey, VV>> resultType;
private transient TypeInformation<Vertex<K, VVWithDegrees>> resultType;
private VertexUpdateUdf(VertexUpdateFunction<VertexKey, VertexValue, Message> vertexUpdateFunction,
TypeInformation<Vertex<VertexKey, VV>> resultType)
private VertexUpdateUdf(VertexUpdateFunction<K, VVWithDegrees, Message> vertexUpdateFunction,
TypeInformation<Vertex<K, VVWithDegrees>> resultType)
{
this.vertexUpdateFunction = vertexUpdateFunction;
this.resultType = resultType;
......@@ -265,27 +261,26 @@ public class VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue>
}
@Override
public TypeInformation<Vertex<VertexKey, VV>> getProducedType() {
public TypeInformation<Vertex<K, VVWithDegrees>> getProducedType() {
return this.resultType;
}
}
private static final class VertexUpdateUdfSimpleVertexValue<VertexKey, VertexValue, Message>
extends VertexUpdateUdf<VertexKey, VertexValue, Message> {
@SuppressWarnings("serial")
private static final class VertexUpdateUdfSimpleVV<K, VV, Message> extends VertexUpdateUdf<K, VV, Message> {
private VertexUpdateUdfSimpleVertexValue(VertexUpdateFunction<VertexKey, VertexValue, Message> vertexUpdateFunction, TypeInformation<Vertex<VertexKey, VertexValue>> resultType) {
private VertexUpdateUdfSimpleVV(VertexUpdateFunction<K, VV, Message> vertexUpdateFunction, TypeInformation<Vertex<K, VV>> resultType) {
super(vertexUpdateFunction, resultType);
}
@Override
public void coGroup(Iterable<Tuple2<VertexKey, Message>> messages,
Iterable<Vertex<VertexKey, VertexValue>> vertex,
Collector<Vertex<VertexKey, VertexValue>> out) throws Exception {
final Iterator<Vertex<VertexKey, VertexValue>> vertexIter = vertex.iterator();
public void coGroup(Iterable<Tuple2<K, Message>> messages,
Iterable<Vertex<K, VV>> vertex,
Collector<Vertex<K, VV>> out) throws Exception {
final Iterator<Vertex<K, VV>> vertexIter = vertex.iterator();
if (vertexIter.hasNext()) {
Vertex<VertexKey, VertexValue> vertexState = vertexIter.next();
Vertex<K, VV> vertexState = vertexIter.next();
@SuppressWarnings("unchecked")
Iterator<Tuple2<?, Message>> downcastIter = (Iterator<Tuple2<?, Message>>) (Iterator<?>) messages.iterator();
......@@ -295,11 +290,11 @@ public class VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue>
vertexUpdateFunction.updateVertex(vertexState, messageIter);
}
else {
final Iterator<Tuple2<VertexKey, Message>> messageIter = messages.iterator();
final Iterator<Tuple2<K, Message>> messageIter = messages.iterator();
if (messageIter.hasNext()) {
String message = "Target vertex does not exist!.";
try {
Tuple2<VertexKey, Message> next = messageIter.next();
Tuple2<K, Message> next = messageIter.next();
message = "Target vertex '" + next.f0 + "' does not exist!.";
} catch (Throwable t) {}
throw new Exception(message);
......@@ -310,36 +305,39 @@ public class VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue>
}
}
private static final class VertexUpdateUdfVertexValueWithDegrees<VertexKey, VertexValue, Message> extends VertexUpdateUdf<VertexKey,
Tuple3<VertexValue, Long, Long>, VertexValue, Message> {
@SuppressWarnings("serial")
private static final class VertexUpdateUdfVVWithDegrees<K, VV, Message> extends VertexUpdateUdf<K, Tuple3<VV, Long, Long>, Message> {
private VertexUpdateUdfVertexValueWithDegrees(VertexUpdateFunction<VertexKey, VertexValue, Message> vertexUpdateFunction, TypeInformation<Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>> resultType) {
private VertexUpdateUdfVVWithDegrees(VertexUpdateFunction<K, Tuple3<VV, Long, Long>, Message> vertexUpdateFunction,
TypeInformation<Vertex<K, Tuple3<VV, Long, Long>>> resultType) {
super(vertexUpdateFunction, resultType);
}
@Override
public void coGroup(Iterable<Tuple2<VertexKey, Message>> messages,
Iterable<Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>> vertex,
Collector<Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>> out) throws Exception {
final Iterator<Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>> vertexIter = vertex.iterator();
public void coGroup(Iterable<Tuple2<K, Message>> messages, Iterable<Vertex<K, Tuple3<VV, Long, Long>>> vertex,
Collector<Vertex<K, Tuple3<VV, Long, Long>>> out) throws Exception {
final Iterator<Vertex<K, Tuple3<VV, Long, Long>>> vertexIter = vertex.iterator();
if (vertexIter.hasNext()) {
Vertex<VertexKey, Tuple3<VertexValue, Long, Long>> vertexState = vertexIter.next();
Vertex<K, Tuple3<VV, Long, Long>> vertexWithDegrees = vertexIter.next();
@SuppressWarnings("unchecked")
Iterator<Tuple2<?, Message>> downcastIter = (Iterator<Tuple2<?, Message>>) (Iterator<?>) messages.iterator();
messageIter.setSource(downcastIter);
vertexUpdateFunction.setOutputWithDegrees(vertexState, out);
vertexUpdateFunction.updateVertexFromVertexCentricIteration(vertexState, messageIter);
vertexUpdateFunction.setInDegree(vertexWithDegrees.f1.f1);
vertexUpdateFunction.setOutDegree(vertexWithDegrees.f1.f2);
vertexUpdateFunction.setOutputWithDegrees(vertexWithDegrees, out);
vertexUpdateFunction.updateVertexFromVertexCentricIteration(vertexWithDegrees, messageIter);
}
else {
final Iterator<Tuple2<VertexKey, Message>> messageIter = messages.iterator();
final Iterator<Tuple2<K, Message>> messageIter = messages.iterator();
if (messageIter.hasNext()) {
String message = "Target vertex does not exist!.";
try {
Tuple2<VertexKey, Message> next = messageIter.next();
Tuple2<K, Message> next = messageIter.next();
message = "Target vertex '" + next.f0 + "' does not exist!.";
} catch (Throwable t) {}
throw new Exception(message);
......@@ -353,19 +351,19 @@ public class VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue>
/*
* UDF that encapsulates the message sending function for graphs where the edges have an associated value.
*/
private static final class MessagingUdfWithEdgeValues<VertexKey, VertexValue, Message, EdgeValue>
extends RichCoGroupFunction<Edge<VertexKey, EdgeValue>, Vertex<VertexKey, VertexValue>, Tuple2<VertexKey, Message>>
implements ResultTypeQueryable<Tuple2<VertexKey, Message>>
private static abstract class MessagingUdfWithEdgeValues<K, VVWithDegrees, VV, Message, EV>
extends RichCoGroupFunction<Edge<K, EV>, Vertex<K, VVWithDegrees>, Tuple2<K, Message>>
implements ResultTypeQueryable<Tuple2<K, Message>>
{
private static final long serialVersionUID = 1L;
final MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> messagingFunction;
final MessagingFunction<K, VV, Message, EV> messagingFunction;
private transient TypeInformation<Tuple2<VertexKey, Message>> resultType;
private MessagingUdfWithEdgeValues(MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> messagingFunction,
TypeInformation<Tuple2<VertexKey, Message>> resultType)
private transient TypeInformation<Tuple2<K, Message>> resultType;
private MessagingUdfWithEdgeValues(MessagingFunction<K, VV, Message, EV> messagingFunction,
TypeInformation<Tuple2<K, Message>> resultType)
{
this.messagingFunction = messagingFunction;
this.resultType = resultType;
......@@ -386,54 +384,62 @@ public class VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue>
}
@Override
public TypeInformation<Tuple2<VertexKey, Message>> getProducedType() {
public TypeInformation<Tuple2<K, Message>> getProducedType() {
return this.resultType;
}
}
private static final class MessagingUdfWithEdgeValuesSimpleVertexValue<VertexKey, VertexValue, Message, EdgeValue>
extends MessagingUdfWithEdgeValues<VertexKey, VertexValue, VertexValue, Message, EdgeValue> {
@SuppressWarnings("serial")
private static final class MessagingUdfWithEVsSimpleVV<K, VV, Message, EV>
extends MessagingUdfWithEdgeValues<K, VV, VV, Message, EV> {
private MessagingUdfWithEdgeValuesSimpleVertexValue(MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> messagingFunction,
TypeInformation<Tuple2<VertexKey, Message>> resultType) {
private MessagingUdfWithEVsSimpleVV(MessagingFunction<K, VV, Message, EV> messagingFunction,
TypeInformation<Tuple2<K, Message>> resultType) {
super(messagingFunction, resultType);
}
@Override
public void coGroup(Iterable<Edge<VertexKey, EdgeValue>> edges,
Iterable<Vertex<VertexKey, VertexValue>> state,
Collector<Tuple2<VertexKey, Message>> out) throws Exception {
final Iterator<Vertex<VertexKey, VertexValue>> stateIter = state.iterator();
public void coGroup(Iterable<Edge<K, EV>> edges,
Iterable<Vertex<K, VV>> state,
Collector<Tuple2<K, Message>> out) throws Exception {
final Iterator<Vertex<K, VV>> stateIter = state.iterator();
if (stateIter.hasNext()) {
Vertex<VertexKey, VertexValue> newVertexState = stateIter.next();
Vertex<K, VV> newVertexState = stateIter.next();
messagingFunction.set((Iterator<?>) edges.iterator(), out);
messagingFunction.sendMessages(newVertexState);
}
}
}
private static final class MessagingUdfWithEdgeValuesVertexValueWithDegrees<VertexKey, VertexValue, Message, EdgeValue>
extends MessagingUdfWithEdgeValues<VertexKey, Tuple3<VertexValue, Long, Long>, VertexValue, Message, EdgeValue> {
@SuppressWarnings("serial")
private static final class MessagingUdfWithEVsVVWithDegrees<K, VV, Message, EV>
extends MessagingUdfWithEdgeValues<K, Tuple3<VV, Long, Long>, VV, Message, EV> {
private Vertex<K, VV> nextVertex = new Vertex<K, VV>();
private MessagingUdfWithEdgeValuesVertexValueWithDegrees
(MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> messagingFunction,
TypeInformation<Tuple2<VertexKey, Message>> resultType) {
private MessagingUdfWithEVsVVWithDegrees(MessagingFunction<K, VV, Message, EV> messagingFunction,
TypeInformation<Tuple2<K, Message>> resultType) {
super(messagingFunction, resultType);
}
@Override
public void coGroup(Iterable<Edge<VertexKey, EdgeValue>> edges,
Iterable<Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>> state,
Collector<Tuple2<VertexKey, Message>> out) throws Exception {
final Iterator<Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>> stateIter = state.iterator();
public void coGroup(Iterable<Edge<K, EV>> edges, Iterable<Vertex<K, Tuple3<VV, Long, Long>>> state,
Collector<Tuple2<K, Message>> out) throws Exception {
final Iterator<Vertex<K, Tuple3<VV, Long, Long>>> stateIter = state.iterator();
if (stateIter.hasNext()) {
Vertex<VertexKey, Tuple3<VertexValue, Long, Long>> newVertexState = stateIter.next();
Vertex<K, Tuple3<VV, Long, Long>> vertexWithDegrees = stateIter.next();
nextVertex.setField(vertexWithDegrees.f0, 0);
nextVertex.setField(vertexWithDegrees.f1.f0, 1);
messagingFunction.setInDegree(vertexWithDegrees.f1.f1);
messagingFunction.setOutDegree(vertexWithDegrees.f1.f2);
messagingFunction.set((Iterator<?>) edges.iterator(), out);
messagingFunction.sendMessagesFromVertexCentricIteration(newVertexState);
messagingFunction.sendMessages(nextVertex);
}
}
}
......@@ -454,15 +460,14 @@ public class VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue>
* @param equalToArg the argument for the equalTo within the coGroup
* @return the messaging function
*/
private CoGroupOperator<?, ?, Tuple2<VertexKey, Message>> buildMessagingFunction(
DeltaIteration<Vertex<VertexKey, VertexValue>, Vertex<VertexKey, VertexValue>> iteration,
TypeInformation<Tuple2<VertexKey, Message>> messageTypeInfo, int whereArg, int equalToArg) {
private CoGroupOperator<?, ?, Tuple2<K, Message>> buildMessagingFunction(
DeltaIteration<Vertex<K, VV>, Vertex<K, VV>> iteration,
TypeInformation<Tuple2<K, Message>> messageTypeInfo, int whereArg, int equalToArg) {
// build the messaging function (co group)
CoGroupOperator<?, ?, Tuple2<VertexKey, Message>> messages;
MessagingUdfWithEdgeValues<VertexKey, VertexValue, VertexValue, Message, EdgeValue> messenger =
new MessagingUdfWithEdgeValuesSimpleVertexValue<VertexKey, VertexValue, Message, EdgeValue>(
messagingFunction, messageTypeInfo);
CoGroupOperator<?, ?, Tuple2<K, Message>> messages;
MessagingUdfWithEdgeValues<K, VV, VV, Message, EV> messenger =
new MessagingUdfWithEVsSimpleVV<K, VV, Message, EV>(messagingFunction, messageTypeInfo);
messages = this.edgesWithValue.coGroup(iteration.getWorkset()).where(whereArg)
.equalTo(equalToArg).with(messenger);
......@@ -489,16 +494,14 @@ public class VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue>
* @param equalToArg the argument for the equalTo within the coGroup
* @return the messaging function
*/
private CoGroupOperator<?, ?, Tuple2<VertexKey, Message>> buildMessagingFunctionVerticesWithDegrees(
DeltaIteration<Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>,
Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>> iteration,
TypeInformation<Tuple2<VertexKey, Message>> messageTypeInfo, int whereArg, int equalToArg) {
private CoGroupOperator<?, ?, Tuple2<K, Message>> buildMessagingFunctionVerticesWithDegrees(
DeltaIteration<Vertex<K, Tuple3<VV, Long, Long>>, Vertex<K, Tuple3<VV, Long, Long>>> iteration,
TypeInformation<Tuple2<K, Message>> messageTypeInfo, int whereArg, int equalToArg) {
// build the messaging function (co group)
CoGroupOperator<?, ?, Tuple2<VertexKey, Message>> messages;
MessagingUdfWithEdgeValues<VertexKey, Tuple3<VertexValue, Long, Long>, VertexValue, Message, EdgeValue> messenger =
new MessagingUdfWithEdgeValuesVertexValueWithDegrees<VertexKey, VertexValue, Message, EdgeValue>(
messagingFunction, messageTypeInfo);
CoGroupOperator<?, ?, Tuple2<K, Message>> messages;
MessagingUdfWithEdgeValues<K, Tuple3<VV, Long, Long>, VV, Message, EV> messenger =
new MessagingUdfWithEVsVVWithDegrees<K, VV, Message, EV>(messagingFunction, messageTypeInfo);
messages = this.edgesWithValue.coGroup(iteration.getWorkset()).where(whereArg)
.equalTo(equalToArg).with(messenger);
......@@ -518,17 +521,10 @@ public class VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue>
/**
* Helper method which sets up an iteration with the given vertex value(either simple or with degrees)
*
* @param vertices
* @param <VV>
* @param iteration
*/
private <VV> DeltaIteration<Vertex<VertexKey, VV>, Vertex<VertexKey, VV>> setUpIteration(
DataSet<Vertex<VertexKey, VV>> vertices) {
final int[] zeroKeyPos = new int[] {0};
final DeltaIteration<Vertex<VertexKey, VV>, Vertex<VertexKey, VV>> iteration =
vertices.iterateDelta(vertices, this.maximumNumberOfIterations, zeroKeyPos);
private void setUpIteration(DeltaIteration<?, ?> iteration) {
// set up the iteration operator
if (this.configuration != null) {
......@@ -546,8 +542,6 @@ public class VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue>
// no configuration provided; set default name
iteration.name("Vertex-centric iteration (" + updateFunction + " | " + messagingFunction + ")");
}
return iteration;
}
/**
......@@ -557,14 +551,16 @@ public class VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue>
* @param messageTypeInfo
* @return the operator
*/
private DataSet<Vertex<VertexKey, VertexValue>> createResultSimpleVertex(EdgeDirection messagingDirection,
TypeInformation<Tuple2<VertexKey, Message>> messageTypeInfo) {
DataSet<Tuple2<VertexKey, Message>> messages;
private DataSet<Vertex<K, VV>> createResultSimpleVertex(EdgeDirection messagingDirection,
TypeInformation<Tuple2<K, Message>> messageTypeInfo) {
DataSet<Tuple2<K, Message>> messages;
TypeInformation<Vertex<VertexKey, VertexValue>> vertexTypes = initialVertices.getType();
TypeInformation<Vertex<K, VV>> vertexTypes = initialVertices.getType();
final DeltaIteration<Vertex<VertexKey, VertexValue>, Vertex<VertexKey, VertexValue>> iteration =
setUpIteration(this.initialVertices);
final DeltaIteration<Vertex<K, VV>, Vertex<K, VV>> iteration =
initialVertices.iterateDelta(initialVertices, this.maximumNumberOfIterations, 0);
setUpIteration(iteration);
switch (messagingDirection) {
case IN:
......@@ -581,11 +577,11 @@ public class VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue>
throw new IllegalArgumentException("Illegal edge direction");
}
VertexUpdateUdf<VertexKey, VertexValue, VertexValue, Message> updateUdf =
new VertexUpdateUdfSimpleVertexValue<VertexKey, VertexValue, Message>(updateFunction, vertexTypes);
VertexUpdateUdf<K, VV, Message> updateUdf =
new VertexUpdateUdfSimpleVV<K, VV, Message>(updateFunction, vertexTypes);
// build the update function (co group)
CoGroupOperator<?, ?, Vertex<VertexKey, VertexValue>> updates =
CoGroupOperator<?, ?, Vertex<K, VV>> updates =
messages.coGroup(iteration.getSolutionSet()).where(0).equalTo(0).with(updateUdf);
configureUpdateFunction(updates);
......@@ -602,46 +598,44 @@ public class VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue>
* @param messageTypeInfo
* @return the operator
*/
private DataSet<Vertex<VertexKey, VertexValue>> createResultVerticesWithDegrees(
Graph<VertexKey, VertexValue, EdgeValue> graph,
EdgeDirection messagingDirection,
TypeInformation<Tuple2<VertexKey, Message>> messageTypeInfo) {
@SuppressWarnings("serial")
private DataSet<Vertex<K, VV>> createResultVerticesWithDegrees(Graph<K, VV, EV> graph, EdgeDirection messagingDirection,
TypeInformation<Tuple2<K, Message>> messageTypeInfo) {
DataSet<Tuple2<VertexKey, Message>> messages;
DataSet<Tuple2<K, Message>> messages;
this.updateFunction.setOptDegrees(this.configuration.isOptDegrees());
DataSet<Tuple2<VertexKey, Long>> inDegrees = graph.inDegrees();
DataSet<Tuple2<VertexKey, Long>> outDegrees = graph.outDegrees();
DataSet<Tuple2<K, Long>> inDegrees = graph.inDegrees();
DataSet<Tuple2<K, Long>> outDegrees = graph.outDegrees();
DataSet<Tuple3<VertexKey, Long, Long>> degrees = inDegrees.join(outDegrees).where(0).equalTo(0)
.with(new FlatJoinFunction<Tuple2<VertexKey, Long>, Tuple2<VertexKey, Long>, Tuple3<VertexKey, Long, Long>>() {
DataSet<Tuple3<K, Long, Long>> degrees = inDegrees.join(outDegrees).where(0).equalTo(0)
.with(new FlatJoinFunction<Tuple2<K, Long>, Tuple2<K, Long>, Tuple3<K, Long, Long>>() {
@Override
public void join(Tuple2<VertexKey, Long> first, Tuple2<VertexKey, Long> second, Collector<Tuple3<VertexKey, Long, Long>> out) throws Exception {
out.collect(new Tuple3<VertexKey, Long, Long>(first.f0, first.f1, second.f1));
public void join(Tuple2<K, Long> first, Tuple2<K, Long> second, Collector<Tuple3<K, Long, Long>> out) {
out.collect(new Tuple3<K, Long, Long>(first.f0, first.f1, second.f1));
}
});
}).withForwardedFieldsFirst("f0;f1").withForwardedFieldsSecond("f1");
DataSet<Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>> verticesWithDegrees= initialVertices
DataSet<Vertex<K, Tuple3<VV, Long, Long>>> verticesWithDegrees = initialVertices
.join(degrees).where(0).equalTo(0)
.with(new FlatJoinFunction<Vertex<VertexKey,VertexValue>, Tuple3<VertexKey,Long,Long>, Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>>() {
.with(new FlatJoinFunction<Vertex<K,VV>, Tuple3<K,Long,Long>, Vertex<K, Tuple3<VV, Long, Long>>>() {
@Override
public void join(Vertex<VertexKey, VertexValue> vertex,
Tuple3<VertexKey, Long, Long> degrees,
Collector<Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>> out) throws Exception {
public void join(Vertex<K, VV> vertex, Tuple3<K, Long, Long> degrees,
Collector<Vertex<K, Tuple3<VV, Long, Long>>> out) throws Exception {
out.collect(new VertexWithDegrees<VertexKey, Tuple3<VertexValue, Long, Long>>(vertex.getId(),
new Tuple3<VertexValue, Long, Long>(vertex.getValue(), degrees.f1, degrees.f2)));
out.collect(new Vertex<K, Tuple3<VV, Long, Long>>(vertex.getId(),
new Tuple3<VV, Long, Long>(vertex.getValue(), degrees.f1, degrees.f2)));
}
});
}).withForwardedFieldsFirst("f0");
// add type info
TypeInformation<Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>> vertexTypes = verticesWithDegrees.getType();
TypeInformation<Vertex<K, Tuple3<VV, Long, Long>>> vertexTypes = verticesWithDegrees.getType();
final DeltaIteration<Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>,
Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>> iteration =
setUpIteration(verticesWithDegrees);
final DeltaIteration<Vertex<K, Tuple3<VV, Long, Long>>, Vertex<K, Tuple3<VV, Long, Long>>> iteration =
verticesWithDegrees.iterateDelta(verticesWithDegrees, this.maximumNumberOfIterations, 0);
setUpIteration(iteration);
switch (messagingDirection) {
case IN:
......@@ -658,24 +652,26 @@ public class VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue>
throw new IllegalArgumentException("Illegal edge direction");
}
VertexUpdateUdf<VertexKey, Tuple3<VertexValue, Long, Long>, VertexValue, Message> updateUdf =
new VertexUpdateUdfVertexValueWithDegrees<VertexKey, VertexValue, Message>(updateFunction, vertexTypes);
@SuppressWarnings({ "unchecked", "rawtypes" })
VertexUpdateUdf<K, Tuple3<VV, Long, Long>, Message> updateUdf =
new VertexUpdateUdfVVWithDegrees(updateFunction, vertexTypes);
// build the update function (co group)
CoGroupOperator<?, ?, Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>> updates =
CoGroupOperator<?, ?, Vertex<K, Tuple3<VV, Long, Long>>> updates =
messages.coGroup(iteration.getSolutionSet()).where(0).equalTo(0).with(updateUdf);
configureUpdateFunction(updates);
return iteration.closeWith(updates, updates).map(new MapFunction<Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>, Vertex<VertexKey, VertexValue>>() {
@Override
public Vertex<VertexKey, VertexValue> map(Vertex<VertexKey, Tuple3<VertexValue, Long, Long>> vertex) throws Exception {
return new Vertex<VertexKey, VertexValue>(vertex.getId(), vertex.getValue().f0);
}
});
return iteration.closeWith(updates, updates).map(
new MapFunction<Vertex<K, Tuple3<VV, Long, Long>>, Vertex<K, VV>>() {
public Vertex<K, VV> map(Vertex<K, Tuple3<VV, Long, Long>> vertex) {
return new Vertex<K, VV>(vertex.getId(), vertex.getValue().f0);
}
});
}
private <VV> void configureUpdateFunction(CoGroupOperator<?, ?, Vertex<VertexKey, VV>> updates) {
private <VVWithDegree> void configureUpdateFunction(CoGroupOperator<?, ?, Vertex<K, VVWithDegree>> updates) {
// configure coGroup update function with name and broadcast variables
updates = updates.name("Vertex State Updates");
......@@ -688,4 +684,4 @@ public class VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue>
// let the operator know that we preserve the key field
updates.withForwardedFieldsFirst("0").withForwardedFieldsSecond("0");
}
}
}
\ No newline at end of file
......@@ -24,9 +24,7 @@ import java.util.Collection;
import org.apache.flink.api.common.aggregators.Aggregator;
import org.apache.flink.api.common.functions.IterationRuntimeContext;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.graph.InaccessibleMethodException;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.VertexWithDegrees;
import org.apache.flink.types.Value;
import org.apache.flink.util.Collector;
......@@ -35,11 +33,11 @@ import org.apache.flink.util.Collector;
* incoming messages. The central method is {@link #updateVertex(Comparable, Object, MessageIterator)}, which is
* invoked once per vertex per superstep.
*
* <VertexKey> The vertex key type.
* <VertexValue> The vertex value type.
* <K> The vertex key type.
* <VV> The vertex value type.
* <Message> The message type.
*/
public abstract class VertexUpdateFunction<VertexKey, VertexValue, Message> implements Serializable {
public abstract class VertexUpdateFunction<K, VV, Message> implements Serializable {
private static final long serialVersionUID = 1L;
......@@ -50,11 +48,12 @@ public abstract class VertexUpdateFunction<VertexKey, VertexValue, Message> impl
private long numberOfVertices = -1L;
public long getNumberOfVertices() throws Exception{
if (numberOfVertices == -1) {
throw new InaccessibleMethodException("The number of vertices option is not set. " +
"To access the number of vertices, call iterationConfiguration.setOptNumVertices(true).");
}
/**
* Retrieves the number of vertices in the graph.
* @return the number of vertices if the {@link IterationConfiguration#setOptNumVertices(boolean)}
* option has been set; -1 otherwise.
*/
public long getNumberOfVertices() {
return numberOfVertices;
}
......@@ -66,7 +65,7 @@ public abstract class VertexUpdateFunction<VertexKey, VertexValue, Message> impl
private boolean optDegrees;
public boolean isOptDegrees() {
boolean isOptDegrees() {
return optDegrees;
}
......@@ -80,7 +79,7 @@ public abstract class VertexUpdateFunction<VertexKey, VertexValue, Message> impl
/**
* This method is invoked once per vertex per superstep. It receives the current state of the vertex, as well as
* the incoming messages. It may set a new vertex state via {@link #setNewVertexValue(Object)}. If the vertex
* the incoming messages. It may set a new vertex state via {@link #setNewVV(Object)}. If the vertex
* state is changed, it will trigger the sending of messages via the {@link MessagingFunction}.
*
* @param vertex The vertex.
......@@ -88,7 +87,7 @@ public abstract class VertexUpdateFunction<VertexKey, VertexValue, Message> impl
*
* @throws Exception The computation may throw exceptions, which causes the superstep to fail.
*/
public abstract void updateVertex(Vertex<VertexKey, VertexValue> vertex, MessageIterator<Message> inMessages) throws Exception;
public abstract void updateVertex(Vertex<K, VV> vertex, MessageIterator<Message> inMessages) throws Exception;
/**
* This method is executed one per superstep before the vertex update function is invoked for each vertex.
......@@ -109,7 +108,7 @@ public abstract class VertexUpdateFunction<VertexKey, VertexValue, Message> impl
*
* @param newValue The new vertex value.
*/
public void setNewVertexValue(VertexValue newValue) {
public void setNewVertexValue(VV newValue) {
if(isOptDegrees()) {
outValWithDegrees.f1.f0 = newValue;
outWithDegrees.collect(outValWithDegrees);
......@@ -167,30 +166,58 @@ public abstract class VertexUpdateFunction<VertexKey, VertexValue, Message> impl
private IterationRuntimeContext runtimeContext;
private Collector<Vertex<VertexKey, VertexValue>> out;
private Collector<Vertex<K, VV>> out;
private Collector<Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>> outWithDegrees;
private Collector<Vertex<K, Tuple3<VV, Long, Long>>> outWithDegrees;
private Vertex<VertexKey, VertexValue> outVal;
private Vertex<K, VV> outVal;
private Vertex<VertexKey, Tuple3<VertexValue, Long, Long>> outValWithDegrees;
private Vertex<K, Tuple3<VV, Long, Long>> outValWithDegrees;
private long inDegree = -1;
private long outDegree = -1;
void init(IterationRuntimeContext context) {
this.runtimeContext = context;
}
void setOutput(Vertex<K, VV> outVal, Collector<Vertex<K, VV>> out) {
this.outVal = outVal;
this.out = out;
}
@SuppressWarnings({ "unchecked", "rawtypes" })
<ValueWithDegree> void setOutputWithDegrees(Vertex<K, ValueWithDegree> outVal,
Collector out) {
this.outValWithDegrees = (Vertex<K, Tuple3<VV, Long, Long>>) outVal;
this.outWithDegrees = out;
}
void setOutputWithDegrees(Vertex<VertexKey, Tuple3<VertexValue, Long, Long>> outValWithDegrees,
Collector<Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>> outWithDegrees) {
this.outValWithDegrees = outValWithDegrees;
this.outWithDegrees = outWithDegrees;
/**
* Retrieves the vertex in-degree (number of in-coming edges).
* @return The in-degree of this vertex if the {@link IterationConfiguration#setOptDegrees(boolean)}
* option has been set; -1 otherwise.
*/
public long getInDegree() {
return inDegree;
}
void setOutput(Vertex<VertexKey, VertexValue> outVal, Collector<Vertex<VertexKey, VertexValue>> out) {
this.outVal = outVal;
this.out = out;
void setInDegree(long inDegree) {
this.inDegree = inDegree;
}
/**
* Retrieve the vertex out-degree (number of out-going edges).
* @return The out-degree of this vertex if the {@link IterationConfiguration#setOptDegrees(boolean)}
* option has been set; -1 otherwise.
*/
public long getOutDegree() {
return outDegree;
}
void setOutDegree(long outDegree) {
this.outDegree = outDegree;
}
/**
......@@ -204,12 +231,12 @@ public abstract class VertexUpdateFunction<VertexKey, VertexValue, Message> impl
* @param inMessages
* @throws Exception
*/
void updateVertexFromVertexCentricIteration(Vertex<VertexKey, Tuple3<VertexValue, Long, Long>> vertexState,
@SuppressWarnings("unchecked")
<VertexWithDegree> void updateVertexFromVertexCentricIteration(Vertex<K, VertexWithDegree> vertexState,
MessageIterator<Message> inMessages) throws Exception {
VertexWithDegrees<VertexKey, VertexValue> vertex = new VertexWithDegrees<VertexKey, VertexValue>(vertexState.getId(),
vertexState.getValue().f0);
vertex.setInDegree(vertexState.getValue().f1);
vertex.setOutDegree(vertexState.getValue().f2);
Vertex<K, VV> vertex = new Vertex<K, VV>(vertexState.f0,
((Tuple3<VV, Long, Long>)vertexState.getValue()).f0);
updateVertex(vertex, inMessages);
}
......
......@@ -22,7 +22,6 @@ import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.spargel.MessageIterator;
......
......@@ -31,7 +31,6 @@ import org.apache.flink.graph.gsa.GatherFunction;
import org.apache.flink.graph.gsa.GatherSumApplyIteration;
import org.apache.flink.graph.gsa.Neighbor;
import org.apache.flink.graph.gsa.SumFunction;
import org.apache.flink.graph.IterationConfiguration;
import org.apache.flink.test.util.MultipleProgramsTestBase;
import org.apache.flink.types.LongValue;
import org.junit.After;
......
......@@ -26,13 +26,10 @@ import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.EdgeDirection;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.IterationConfiguration;
import org.apache.flink.graph.spargel.MessageIterator;
import org.apache.flink.graph.spargel.MessagingFunction;
import org.apache.flink.graph.spargel.VertexCentricConfiguration;
......@@ -48,6 +45,7 @@ import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.apache.flink.graph.utils.VertexToTuple2Map;
@RunWith(Parameterized.class)
public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
......@@ -88,6 +86,7 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
parameters.addBroadcastSetForUpdateFunction("updateBcastSet", env.fromElements(1, 2, 3));
parameters.addBroadcastSetForMessagingFunction("messagingBcastSet", env.fromElements(4, 5, 6));
parameters.registerAggregator("superstepAggregator", new LongSumAggregator());
parameters.setOptNumVertices(true);
Graph<Long, Long, Long> result = graph.runVertexCentricIteration(
new UpdateFunction(), new MessageFunction(), 10, parameters);
......@@ -135,6 +134,29 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
"5 15";
}
@Test
public void testDefaultConfiguration() throws Exception {
/*
* Test Graph's runVertexCentricIteration when configuration parameters are not provided
* i.e. degrees and numVertices will be -1, EdgeDirection will be OUT.
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(),
TestGraphUtils.getLongLongEdges(), env).mapVertices(new AssignOneMapper());
Graph<Long, Long, Long> result = graph.runVertexCentricIteration(
new UpdateFunctionDefault(), new MessageFunctionDefault(), 5);
result.getVertices().map(new VertexToTuple2Map<Long, Long>()).writeAsCsv(resultPath, "\n", "\t");
env.execute();
expectedResult = "1 6\n" +
"2 6\n" +
"3 6\n" +
"4 6\n" +
"5 6";
}
@Test
public void testIterationDefaultDirection() throws Exception {
......@@ -176,7 +198,7 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
.mapVertices(new InitialiseHashSetMapper());
// configure the iteration
IterationConfiguration parameters = new IterationConfiguration();
VertexCentricConfiguration parameters = new VertexCentricConfiguration();
parameters.setDirection(EdgeDirection.IN);
......@@ -208,7 +230,7 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
.mapVertices(new InitialiseHashSetMapper());
// configure the iteration
IterationConfiguration parameters = new IterationConfiguration();
VertexCentricConfiguration parameters = new VertexCentricConfiguration();
parameters.setDirection(EdgeDirection.ALL);
......@@ -226,12 +248,37 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
"5 [1, 3, 4]";
}
@Test
public void testNumVerticesNotSet() throws Exception {
/*
* Test that if the number of vertices option is not set, -1 is returned as value.
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(),
TestGraphUtils.getLongLongEdges(), env);
DataSet<Vertex<Long, Long>> verticesWithNumVertices = graph.runVertexCentricIteration(new UpdateFunctionNumVertices(),
new DummyMessageFunction(), 2).getVertices();
verticesWithNumVertices.writeAsCsv(resultPath, "\n", "\t");
env.execute();
expectedResult = "1 -1\n" +
"2 -1\n" +
"3 -1\n" +
"4 -1\n" +
"5 -1";
}
@Test
public void testInDegreesSet() throws Exception {
/*
* Test that if the degrees are set, the in degrees can be accessed in every superstep and the value
* is correctly computed.
* Test that if the degrees are set, they can be accessed in every superstep
* inside the update function and the value
* is correctly computed for degrees in the messaging function.
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
......@@ -239,14 +286,14 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
TestGraphUtils.getLongLongEdges(), env);
// configure the iteration
IterationConfiguration parameters = new IterationConfiguration();
VertexCentricConfiguration parameters = new VertexCentricConfiguration();
parameters.setOptDegrees(true);
DataSet<Vertex<Long, Long>> verticesWithInDegree = graph.runVertexCentricIteration(new UpdateFunctionInDegree(),
new DummyMessageFunction(), 5, parameters).getVertices();
DataSet<Vertex<Long, Long>> verticesWithDegrees = graph.runVertexCentricIteration(
new UpdateFunctionInDegrees(), new DegreesMessageFunction(), 5, parameters).getVertices();
verticesWithInDegree.writeAsCsv(resultPath, "\n", "\t");
verticesWithDegrees.writeAsCsv(resultPath, "\n", "\t");
env.execute();
expectedResult = "1 1\n" +
......@@ -257,41 +304,36 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
}
@Test
public void testOutDegreesSet() throws Exception {
public void testInDegreesNotSet() throws Exception {
/*
* Test that if the degrees are set, the out degrees can be accessed in every superstep and the value
* is correctly computed.
* Test that if the degrees option is not set, then -1 is returned as a value for in-degree.
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(),
TestGraphUtils.getLongLongEdges(), env);
// configure the iteration
IterationConfiguration parameters = new IterationConfiguration();
parameters.setOptDegrees(true);
DataSet<Vertex<Long, Long>> verticesWithDegrees = graph.runVertexCentricIteration(
new UpdateFunctionInDegrees(), new DummyMessageFunction(), 2).getVertices();
DataSet<Vertex<Long, Long>> verticesWithOutDegree = graph.runVertexCentricIteration(new UpdateFunctionOutDegree(),
new DummyMessageFunction(), 5, parameters).getVertices();
verticesWithOutDegree.writeAsCsv(resultPath, "\n", "\t");
verticesWithDegrees.writeAsCsv(resultPath, "\n", "\t");
env.execute();
expectedResult = "1 2\n" +
"2 1\n" +
"3 2\n" +
"4 1\n" +
"5 1";
expectedResult = "1 -1\n" +
"2 -1\n" +
"3 -1\n" +
"4 -1\n" +
"5 -1";
}
@Test
public void testNumVerticesSet() throws Exception {
public void testOutDegreesSet() throws Exception {
/*
* Test that if the number of vertices option is set, it can be accessed in every superstep and the value
* is correctly computed.
* Test that if the degrees are set, they can be accessed in every superstep
* inside the update function and the value
* is correctly computed for degrees in the messaging function.
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
......@@ -299,56 +341,45 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
TestGraphUtils.getLongLongEdges(), env);
// configure the iteration
IterationConfiguration parameters = new IterationConfiguration();
VertexCentricConfiguration parameters = new VertexCentricConfiguration();
parameters.setOptNumVertices(true);
parameters.setOptDegrees(true);
DataSet<Vertex<Long, Long>> verticesWithNumVertices = graph.runVertexCentricIteration(new UpdateFunctionNumVertices(),
new DummyMessageFunction(), 5, parameters).getVertices();
DataSet<Vertex<Long, Long>> verticesWithDegrees = graph.runVertexCentricIteration(
new UpdateFunctionOutDegrees(), new DegreesMessageFunction(), 5, parameters).getVertices();
verticesWithNumVertices.writeAsCsv(resultPath, "\n", "\t");
verticesWithDegrees.writeAsCsv(resultPath, "\n", "\t");
env.execute();
expectedResult = "1 5\n" +
"2 5\n" +
"3 5\n" +
"4 5\n" +
"5 5";
expectedResult = "1 2\n" +
"2 1\n" +
"3 2\n" +
"4 1\n" +
"5 1";
}
@Test
public void testDegrees() throws Exception {
public void testOutDegreesNotSet() throws Exception {
/*
* Test that if the degrees are set, they can be accessed in every superstep and the value
* is correctly computed for both in and out degrees.
* Test that if the degrees option is not set, then -1 is returned as a value for out-degree.
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, Tuple3<Long, Long, Boolean>, Long> graph = Graph.fromCollection(TestGraphUtils.getLongVerticesWithDegrees(),
Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(),
TestGraphUtils.getLongLongEdges(), env);
// configure the iteration
IterationConfiguration parameters = new IterationConfiguration();
parameters.setOptDegrees(true);
DataSet<Vertex<Long, Tuple3<Long, Long, Boolean>>> verticesWithDegrees = graph.runVertexCentricIteration(
new UpdateFunctionDegrees(), new DegreeMessageFunction(), 5, parameters).getVertices();
DataSet<Vertex<Long, Long>> verticesWithDegrees = graph.runVertexCentricIteration(
new UpdateFunctionInDegrees(), new DummyMessageFunction(), 2).getVertices();
verticesWithDegrees.map(new MapFunction<Vertex<Long,Tuple3<Long,Long,Boolean>>, Tuple2<Long, Boolean>>() {
@Override
public Tuple2<Long, Boolean> map(Vertex<Long, Tuple3<Long, Long, Boolean>> vertex) throws Exception {
return new Tuple2<Long, Boolean>(vertex.getId(), vertex.getValue().f2);
}
}).writeAsCsv(resultPath, "\n", "\t");
verticesWithDegrees.writeAsCsv(resultPath, "\n", "\t");
env.execute();
expectedResult = "1 true\n" +
"2 true\n" +
"3 true\n" +
"4 true\n" +
"5 true";
expectedResult = "1 -1\n" +
"2 -1\n" +
"3 -1\n" +
"4 -1\n" +
"5 -1";
}
@Test
......@@ -364,7 +395,7 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
TestGraphUtils.getLongLongEdges(), env);
// configure the iteration
IterationConfiguration parameters = new IterationConfiguration();
VertexCentricConfiguration parameters = new VertexCentricConfiguration();
parameters.setOptDegrees(true);
parameters.setDirection(EdgeDirection.ALL);
......@@ -399,12 +430,36 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
// test aggregator
aggregator = getIterationAggregator("superstepAggregator");
// test number of vertices
Assert.assertEquals(5, getNumberOfVertices());
}
@Override
public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages) {
long superstep = getSuperstepNumber();
aggregator.aggregate(superstep);
setNewVertexValue(vertex.getValue() + 1);
}
}
@SuppressWarnings("serial")
public static final class UpdateFunctionDefault extends VertexUpdateFunction<Long, Long, Long> {
LongSumAggregator aggregator = new LongSumAggregator();
@Override
public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages) {
// test number of vertices
Assert.assertEquals(-1, getNumberOfVertices());
// test degrees
Assert.assertEquals(-1, getInDegree());
Assert.assertEquals(-1, getOutDegree());
setNewVertexValue(vertex.getValue() + 1);
}
}
......@@ -421,6 +476,9 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
Assert.assertEquals(4, bcastSet.get(0));
Assert.assertEquals(5, bcastSet.get(1));
Assert.assertEquals(6, bcastSet.get(2));
// test number of vertices
Assert.assertEquals(5, getNumberOfVertices());
// test aggregator
if (getSuperstepNumber() == 2) {
......@@ -437,28 +495,18 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
}
@SuppressWarnings("serial")
public static final class UpdateFunctionInDegree extends VertexUpdateFunction<Long, Long, Long> {
public static final class MessageFunctionDefault extends MessagingFunction<Long, Long, Long, Long> {
@Override
public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages) {
try {
setNewVertexValue(((VertexWithDegrees) vertex).getInDegree());
} catch (Exception e) {
e.printStackTrace();
}
}
}
@SuppressWarnings("serial")
public static final class UpdateFunctionOutDegree extends VertexUpdateFunction<Long, Long, Long> {
public void sendMessages(Vertex<Long, Long> vertex) {
// test number of vertices
Assert.assertEquals(-1, getNumberOfVertices());
@Override
public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages) {
try {
setNewVertexValue(((VertexWithDegrees) vertex).getOutDegree());
} catch (Exception e) {
e.printStackTrace();
}
// test degrees
Assert.assertEquals(-1, getInDegree());
Assert.assertEquals(-1, getOutDegree());
//send message to keep vertices active
sendMessageToAllNeighbors(vertex.getValue());
}
}
......@@ -467,11 +515,7 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
@Override
public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages) {
try {
setNewVertexValue(getNumberOfVertices());
} catch (Exception e) {
e.printStackTrace();
}
}
}
......@@ -495,8 +539,25 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
}
@SuppressWarnings("serial")
public static final class VertexUpdateDirection extends VertexUpdateFunction<Long, HashSet<Long>,
Long> {
public static final class DegreesMessageFunction extends MessagingFunction<Long, Long, Long, Long> {
@Override
public void sendMessages(Vertex<Long, Long> vertex) {
if (vertex.getId().equals(1)) {
Assert.assertEquals(2, getOutDegree());
Assert.assertEquals(1, getInDegree());
}
else if(vertex.getId().equals(3)) {
Assert.assertEquals(2, getOutDegree());
Assert.assertEquals(2, getInDegree());
}
//send message to keep vertices active
sendMessageToAllNeighbors(vertex.getValue());
}
}
@SuppressWarnings("serial")
public static final class VertexUpdateDirection extends VertexUpdateFunction<Long, HashSet<Long>, Long> {
@Override
public void updateVertex(Vertex<Long, HashSet<Long>> vertex, MessageIterator<Long> messages) throws Exception {
......@@ -510,6 +571,26 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
}
}
@SuppressWarnings("serial")
public static final class UpdateFunctionInDegrees extends VertexUpdateFunction<Long, Long, Long> {
@Override
public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages) {
long inDegree = getInDegree();
setNewVertexValue(inDegree);
}
}
@SuppressWarnings("serial")
public static final class UpdateFunctionOutDegrees extends VertexUpdateFunction<Long, Long, Long> {
@Override
public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages) {
long outDegree = getOutDegree();
setNewVertexValue(outDegree);
}
}
@SuppressWarnings("serial")
public static final class VertexUpdateNumNeighbors extends VertexUpdateFunction<Long, Boolean,
Long> {
......@@ -519,25 +600,21 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
long count = 0;
for(long msg : messages) {
for(@SuppressWarnings("unused") long msg : messages) {
count++;
}
setNewVertexValue(count == (((VertexWithDegrees)vertex).getInDegree() + ((VertexWithDegrees)vertex).getOutDegree()));
setNewVertexValue(count == (getInDegree() + getOutDegree()));
}
}
@SuppressWarnings("serial")
public static final class UpdateFunctionDegrees extends VertexUpdateFunction<Long, Tuple3<Long, Long, Boolean>, Long> {
public static final class UpdateFunctionDegrees extends VertexUpdateFunction<Long, Long, Long> {
@Override
public void updateVertex(Vertex<Long, Tuple3<Long, Long, Boolean>> vertex, MessageIterator<Long> inMessages) {
try {
setNewVertexValue(new Tuple3(vertex.getValue().f0, vertex.getValue().f1, (((VertexWithDegrees)vertex).getInDegree() == vertex.getValue().f0)
&& (((VertexWithDegrees)vertex).getOutDegree() == vertex.getValue().f1) && vertex.getValue().f2));
} catch (Exception e) {
e.printStackTrace();
}
public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages) {
long inDegree = getInDegree();
long outDegree = getOutDegree();
setNewVertexValue(inDegree + outDegree);
}
}
......@@ -593,16 +670,6 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
}
}
@SuppressWarnings("serial")
public static final class DegreeMessageFunction extends MessagingFunction<Long, Tuple3<Long, Long, Boolean>, Long, Long> {
@Override
public void sendMessages(Vertex<Long, Tuple3<Long, Long, Boolean>> vertex) {
//send message to keep vertices active
sendMessageToAllNeighbors(vertex.getValue().f0);
}
}
@SuppressWarnings("serial")
public static final class AssignOneMapper implements MapFunction<Vertex<Long, Long>, Long> {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.graph.test;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.VertexWithDegrees;
import org.apache.flink.graph.spargel.MessageIterator;
import org.apache.flink.graph.spargel.MessagingFunction;
import org.apache.flink.graph.spargel.VertexUpdateFunction;
import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.junit.Assert.fail;
public class VertexCentricConfigurationWithExceptionITCase {
private static final int PARALLELISM = 4;
private static ForkableFlinkMiniCluster cluster;
@BeforeClass
public static void setupCluster() {
try {
Configuration config = new Configuration();
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM);
cluster = new ForkableFlinkMiniCluster(config, false);
}
catch (Exception e) {
e.printStackTrace();
fail("Error starting test cluster: " + e.getMessage());
}
}
@AfterClass
public static void tearDownCluster() {
try {
cluster.stop();
}
catch (Throwable t) {
t.printStackTrace();
fail("Cluster shutdown caused an exception: " + t.getMessage());
}
}
@Test
public void testOutDegreesNotSet() throws Exception {
/*
* Test that if the degrees are not set, the out degrees cannot be accessed - an
* exception is thrown.
*/
final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
"localhost", cluster.getJobManagerRPCPort());
env.setParallelism(PARALLELISM);
env.getConfig().disableSysoutLogging();
Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(),
TestGraphUtils.getLongLongEdges(), env);
try {
DataSet<Vertex<Long, Long>> verticesWithOutDegrees = graph.runVertexCentricIteration(new UpdateFunctionOutDegree(),
new DummyMessageFunction(), 5).getVertices();
verticesWithOutDegrees.output(new DiscardingOutputFormat<Vertex<Long, Long>>());
env.execute();
fail("The degree option not set test did not fail");
} catch (Exception e) {
// We expect the job to fail with an exception
}
}
@Test
public void testInDegreesNotSet() throws Exception {
/*
* Test that if the degrees are not set, the in degrees cannot be accessed - an
* exception is thrown.
*/
final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
"localhost", cluster.getJobManagerRPCPort());
env.setParallelism(PARALLELISM);
env.getConfig().disableSysoutLogging();
Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(),
TestGraphUtils.getLongLongEdges(), env);
try {
DataSet<Vertex<Long, Long>> verticesWithInDegrees = graph.runVertexCentricIteration(new UpdateFunctionInDegree(),
new DummyMessageFunction(), 5).getVertices();
verticesWithInDegrees.output(new DiscardingOutputFormat<Vertex<Long, Long>>());
env.execute();
fail("The degree option not set test did not fail");
} catch (Exception e) {
// We expect the job to fail with an exception
}
}
@Test
public void testNumVerticesNotSet() throws Exception {
/*
* Test that if the number of vertices option is not set, this number cannot be accessed -
* an exception id thrown.
*/
final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
"localhost", cluster.getJobManagerRPCPort());
env.setParallelism(PARALLELISM);
env.getConfig().disableSysoutLogging();
Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(),
TestGraphUtils.getLongLongEdges(), env);
try {
DataSet<Vertex<Long, Long>> verticesWithNumVertices = graph.runVertexCentricIteration(new UpdateFunctionNumVertices(),
new DummyMessageFunction(), 5).getVertices();
verticesWithNumVertices.output(new DiscardingOutputFormat<Vertex<Long, Long>>());
env.execute();
fail("The num vertices option not set test did not fail");
} catch (Exception e) {
// We expect the job to fail with an exception
}
}
@SuppressWarnings("serial")
public static final class UpdateFunctionInDegree extends VertexUpdateFunction<Long, Long, Long> {
@Override
public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages) throws Exception {
setNewVertexValue(((VertexWithDegrees) vertex).getInDegree());
}
}
@SuppressWarnings("serial")
public static final class UpdateFunctionOutDegree extends VertexUpdateFunction<Long, Long, Long> {
@Override
public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages) throws Exception {
setNewVertexValue(((VertexWithDegrees)vertex).getOutDegree());
}
}
@SuppressWarnings("serial")
public static final class UpdateFunctionNumVertices extends VertexUpdateFunction<Long, Long, Long> {
@Override
public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages) throws Exception {
setNewVertexValue(getNumberOfVertices());
}
}
@SuppressWarnings("serial")
public static final class DummyMessageFunction extends MessagingFunction<Long, Long, Long, Long> {
@Override
public void sendMessages(Vertex<Long, Long> vertex) {
//send message to keep vertices active
sendMessageToAllNeighbors(vertex.getValue());
}
}
}
......@@ -66,6 +66,7 @@ public class ConnectedComponentsWithRandomisedEdgesITCase extends JavaProgramTes
result.writeAsCsv(resultPath, "\n", " ");
env.execute();
}
/**
* A map function that takes a Long value and creates a 2-tuple out of it:
* <pre>(Long value) -> (value, value)</pre>
......
......@@ -28,7 +28,7 @@ import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.example.IncrementalSSSPExample;
import org.apache.flink.graph.example.utils.IncrementalSSSPData;
import org.apache.flink.graph.spargel.IterationConfiguration;
import org.apache.flink.graph.spargel.VertexCentricConfiguration;
import org.apache.flink.test.util.MultipleProgramsTestBase;
import org.junit.After;
import org.junit.Before;
......@@ -101,7 +101,7 @@ public class IncrementalSSSPITCase extends MultipleProgramsTestBase {
graph.removeEdge(edgeToBeRemoved);
// configure the iteration
IterationConfiguration parameters = new IterationConfiguration();
VertexCentricConfiguration parameters = new VertexCentricConfiguration();
if(IncrementalSSSPExample.isInSSSP(edgeToBeRemoved, edgesInSSSP)) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册